summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-05-23 10:17:14 -0700
committerSage Weil <sage@inktank.com>2013-05-23 10:17:14 -0700
commit8b2fbf21b8d51e2eb6e017ecef7384b5afb4ce55 (patch)
tree37fdb81f1c2eb6e894cdd6970fc451db14aaab04 /src
parente8d0dc762f6659afe6d2a903e76850deddbf7844 (diff)
parent27381c0c6259ac89f5f9c592b4bfb585937a1cfc (diff)
downloadceph-8b2fbf21b8d51e2eb6e017ecef7384b5afb4ce55.tar.gz
Merge pull request #312 from ceph/wip-osd-hb
Reviewed-by: Samuel Just <sam.just@inktank.com>
Diffstat (limited to 'src')
-rw-r--r--src/ceph_osd.cc50
-rw-r--r--src/messages/MOSDBoot.h19
-rw-r--r--src/mon/Monitor.cc2
-rw-r--r--src/mon/OSDMonitor.cc14
-rw-r--r--src/msg/Accepter.cc15
-rw-r--r--src/msg/Accepter.h4
-rw-r--r--src/msg/Message.h9
-rw-r--r--src/msg/Messenger.h2
-rw-r--r--src/msg/Pipe.cc2
-rw-r--r--src/msg/SimpleMessenger.cc11
-rw-r--r--src/msg/SimpleMessenger.h2
-rw-r--r--src/osd/OSD.cc260
-rw-r--r--src/osd/OSD.h25
-rw-r--r--src/osd/OSDMap.cc65
-rw-r--r--src/osd/OSDMap.h24
15 files changed, 336 insertions, 168 deletions
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index b778161dcdb..b485133514e 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -325,12 +325,16 @@ int main(int argc, const char **argv)
Messenger *messenger_hbclient = Messenger::create(g_ceph_context,
entity_name_t::OSD(whoami), "hbclient",
getpid());
- Messenger *messenger_hbserver = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "hbserver",
+ Messenger *messenger_hb_back_server = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hb_back_server",
+ getpid());
+ Messenger *messenger_hb_front_server = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hb_front_server",
getpid());
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hbserver->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ messenger_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ messenger_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cout << "starting osd." << whoami
<< " at " << client_messenger->get_myaddr()
@@ -376,9 +380,11 @@ int main(int argc, const char **argv)
Messenger::Policy::stateless_server(0, 0));
messenger_hbclient->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::lossy_client(0, 0));
- messenger_hbserver->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0, 0));
+ Messenger::Policy::lossy_client(0, 0));
+ messenger_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
+ messenger_hb_front_server->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
r = client_messenger->bind(g_conf->public_addr);
if (r < 0)
@@ -387,17 +393,24 @@ int main(int argc, const char **argv)
if (r < 0)
exit(1);
- // hb should bind to same ip as cluster_addr (if specified)
- entity_addr_t hb_addr = g_conf->osd_heartbeat_addr;
- if (hb_addr.is_blank_ip()) {
- hb_addr = g_conf->cluster_addr;
- if (hb_addr.is_ip())
- hb_addr.set_port(0);
+ // hb back should bind to same ip as cluster_addr (if specified)
+ entity_addr_t hb_back_addr = g_conf->osd_heartbeat_addr;
+ if (hb_back_addr.is_blank_ip()) {
+ hb_back_addr = g_conf->cluster_addr;
+ if (hb_back_addr.is_ip())
+ hb_back_addr.set_port(0);
}
- r = messenger_hbserver->bind(hb_addr);
+ r = messenger_hb_back_server->bind(hb_back_addr);
if (r < 0)
exit(1);
+ // hb front should bind to same ip as public_addr
+ entity_addr_t hb_front_addr = g_conf->public_addr;
+ if (hb_front_addr.is_ip())
+ hb_front_addr.set_port(0);
+ r = messenger_hb_front_server->bind(hb_front_addr);
+ if (r < 0)
+ exit(1);
// Set up crypto, daemonize, etc.
global_init_daemonize(g_ceph_context, 0);
@@ -418,7 +431,7 @@ int main(int argc, const char **argv)
global_init_chdir(g_ceph_context);
osd = new OSD(whoami, cluster_messenger, client_messenger,
- messenger_hbclient, messenger_hbserver,
+ messenger_hbclient, messenger_hb_front_server, messenger_hb_back_server,
&mc,
g_conf->osd_data, g_conf->osd_journal);
@@ -434,7 +447,8 @@ int main(int argc, const char **argv)
client_messenger->start();
messenger_hbclient->start();
- messenger_hbserver->start();
+ messenger_hb_front_server->start();
+ messenger_hb_back_server->start();
cluster_messenger->start();
// install signal handlers
@@ -453,7 +467,8 @@ int main(int argc, const char **argv)
client_messenger->wait();
messenger_hbclient->wait();
- messenger_hbserver->wait();
+ messenger_hb_front_server->wait();
+ messenger_hb_back_server->wait();
cluster_messenger->wait();
unregister_async_signal_handler(SIGHUP, sighup_handler);
@@ -465,7 +480,8 @@ int main(int argc, const char **argv)
delete osd;
delete client_messenger;
delete messenger_hbclient;
- delete messenger_hbserver;
+ delete messenger_hb_front_server;
+ delete messenger_hb_back_server;
delete cluster_messenger;
client_byte_throttler.reset();
client_msg_throttler.reset();
diff --git a/src/messages/MOSDBoot.h b/src/messages/MOSDBoot.h
index 354ea6b0430..d18d56c66f0 100644
--- a/src/messages/MOSDBoot.h
+++ b/src/messages/MOSDBoot.h
@@ -22,12 +22,12 @@
class MOSDBoot : public PaxosServiceMessage {
- static const int HEAD_VERSION = 3;
+ static const int HEAD_VERSION = 4;
static const int COMPAT_VERSION = 2;
public:
OSDSuperblock sb;
- entity_addr_t hb_addr;
+ entity_addr_t hb_back_addr, hb_front_addr;
entity_addr_t cluster_addr;
epoch_t boot_epoch; // last epoch this daemon was added to the map (if any)
@@ -35,11 +35,15 @@ class MOSDBoot : public PaxosServiceMessage {
: PaxosServiceMessage(MSG_OSD_BOOT, 0, HEAD_VERSION, COMPAT_VERSION),
boot_epoch(0)
{ }
- MOSDBoot(OSDSuperblock& s, epoch_t be, const entity_addr_t& hb_addr_ref,
+ MOSDBoot(OSDSuperblock& s, epoch_t be,
+ const entity_addr_t& hb_back_addr_ref,
+ const entity_addr_t& hb_front_addr_ref,
const entity_addr_t& cluster_addr_ref)
: PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch, HEAD_VERSION, COMPAT_VERSION),
sb(s),
- hb_addr(hb_addr_ref), cluster_addr(cluster_addr_ref),
+ hb_back_addr(hb_back_addr_ref),
+ hb_front_addr(hb_front_addr_ref),
+ cluster_addr(cluster_addr_ref),
boot_epoch(be)
{ }
@@ -55,19 +59,22 @@ public:
void encode_payload(uint64_t features) {
paxos_encode();
::encode(sb, payload);
- ::encode(hb_addr, payload);
+ ::encode(hb_back_addr, payload);
::encode(cluster_addr, payload);
::encode(boot_epoch, payload);
+ ::encode(hb_front_addr, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(sb, p);
- ::decode(hb_addr, p);
+ ::decode(hb_back_addr, p);
if (header.version >= 2)
::decode(cluster_addr, p);
if (header.version >= 3)
::decode(boot_epoch, p);
+ if (header.version >= 4)
+ ::decode(hb_front_addr, p);
}
};
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index f1d16aa69e8..acfeb65da67 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2887,7 +2887,7 @@ void Monitor::handle_forward(MForward *m)
dout(0) << "forward from entity with insufficient caps! "
<< session->caps << dendl;
} else {
- Connection *c = new Connection;
+ Connection *c = new Connection(NULL);
MonSession *s = new MonSession(m->msg->get_source_inst(), c);
c->set_priv(s);
c->set_peer_addr(m->client.addr);
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index 5326ca450e3..897ca7f2d47 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -248,8 +248,8 @@ bool OSDMonitor::thrash()
dout(5) << "thrash_map osd." << o << " up" << dendl;
pending_inc.new_state[o] = CEPH_OSD_UP;
pending_inc.new_up_client[o] = entity_addr_t();
- pending_inc.new_up_internal[o] = entity_addr_t();
- pending_inc.new_hb_up[o] = entity_addr_t();
+ pending_inc.new_up_cluster[o] = entity_addr_t();
+ pending_inc.new_hb_back_up[o] = entity_addr_t();
pending_inc.new_weight[o] = CEPH_OSD_IN;
thrash_last_up_osd = o;
}
@@ -1090,7 +1090,9 @@ bool OSDMonitor::preprocess_boot(MOSDBoot *m)
bool OSDMonitor::prepare_boot(MOSDBoot *m)
{
dout(7) << "prepare_boot from " << m->get_orig_source_inst() << " sb " << m->sb
- << " cluster_addr " << m->cluster_addr << " hb_addr " << m->hb_addr
+ << " cluster_addr " << m->cluster_addr
+ << " hb_back_addr " << m->hb_back_addr
+ << " hb_front_addr " << m->hb_front_addr
<< dendl;
assert(m->get_orig_source().is_osd());
@@ -1126,8 +1128,10 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m)
// mark new guy up.
pending_inc.new_up_client[from] = m->get_orig_source_addr();
if (!m->cluster_addr.is_blank_ip())
- pending_inc.new_up_internal[from] = m->cluster_addr;
- pending_inc.new_hb_up[from] = m->hb_addr;
+ pending_inc.new_up_cluster[from] = m->cluster_addr;
+ pending_inc.new_hb_back_up[from] = m->hb_back_addr;
+ if (!m->hb_front_addr.is_blank_ip())
+ pending_inc.new_hb_front_up[from] = m->hb_front_addr;
// mark in?
if ((g_conf->mon_osd_auto_mark_auto_out_in && (oldstate & CEPH_OSD_AUTOOUT)) ||
diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc
index 90c68df6cf3..4d13be8fdca 100644
--- a/src/msg/Accepter.cc
+++ b/src/msg/Accepter.cc
@@ -37,7 +37,7 @@
* Accepter
*/
-int Accepter::bind(const entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
+int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
const md_config_t *conf = msgr->cct->_conf;
// bind to a socket
@@ -92,7 +92,7 @@ int Accepter::bind(const entity_addr_t &bind_addr, int avoid_port1, int avoid_po
} else {
// try a range of ports
for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
- if (port == avoid_port1 || port == avoid_port2)
+ if (avoid_ports.count(port))
continue;
listen_addr.set_port(port);
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());
@@ -151,9 +151,9 @@ int Accepter::bind(const entity_addr_t &bind_addr, int avoid_port1, int avoid_po
return 0;
}
-int Accepter::rebind(int avoid_port)
+int Accepter::rebind(const set<int>& avoid_ports)
{
- ldout(msgr->cct,1) << "accepter.rebind avoid " << avoid_port << dendl;
+ ldout(msgr->cct,1) << "accepter.rebind avoid " << avoid_ports << dendl;
stop();
@@ -161,11 +161,12 @@ int Accepter::rebind(int avoid_port)
msgr->unlearn_addr();
entity_addr_t addr = msgr->get_myaddr();
- int old_port = addr.get_port();
+ set<int> new_avoid = avoid_ports;
+ new_avoid.insert(addr.get_port());
addr.set_port(0);
- ldout(msgr->cct,10) << " will try " << addr << dendl;
- int r = bind(addr, old_port, avoid_port);
+ ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
+ int r = bind(addr, new_avoid);
if (r == 0)
start();
return r;
diff --git a/src/msg/Accepter.h b/src/msg/Accepter.h
index 07d766b32cd..4b1421f9e11 100644
--- a/src/msg/Accepter.h
+++ b/src/msg/Accepter.h
@@ -35,8 +35,8 @@ public:
void *entry();
void stop();
- int bind(const entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
- int rebind(int avoid_port);
+ int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
+ int rebind(const set<int>& avoid_port);
int start();
};
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 33d26b2e7da..18a64c1d02e 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -157,9 +157,11 @@
// abstract Connection, for keeping per-connection state
+class Messenger;
struct Connection : public RefCountedObject {
Mutex lock;
+ Messenger *msgr;
RefCountedObject *priv;
int peer_type;
entity_addr_t peer_addr;
@@ -171,8 +173,9 @@ struct Connection : public RefCountedObject {
map<tid_t,pair<bufferlist,int> > rx_buffers;
public:
- Connection()
+ Connection(Messenger *m)
: lock("Connection::lock"),
+ msgr(m),
priv(NULL),
peer_type(-1),
features(0),
@@ -244,6 +247,10 @@ public:
return pipe != NULL;
}
+ Messenger *get_messenger() {
+ return msgr;
+ }
+
int get_peer_type() { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index ca80dd1c5be..13d34611e19 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -341,7 +341,7 @@ public:
*
* @param avoid_port An additional port to avoid binding to.
*/
- virtual int rebind(int avoid_port) { return -EOPNOTSUPP; }
+ virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
/**
* @} // Configuration
*/
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index f4100bc483b..42d461ac2f8 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -75,7 +75,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
connection_state = con->get();
connection_state->reset_pipe(this);
} else {
- connection_state = new Connection();
+ connection_state = new Connection(msgr);
connection_state->pipe = get();
}
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 46e51dcf9f2..c9764fac324 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -51,7 +51,7 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname, cct->_conf->ms_dispatch_throttle_bytes),
reaper_started(false), reaper_stop(false),
timeout(0),
- local_connection(new Connection)
+ local_connection(new Connection(this))
{
pthread_spin_init(&global_seq_lock, PTHREAD_PROCESS_PRIVATE);
init_local_connection();
@@ -262,18 +262,19 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr)
lock.Unlock();
// bind to a socket
- int r = accepter.bind(bind_addr);
+ set<int> avoid_ports;
+ int r = accepter.bind(bind_addr, avoid_ports);
if (r >= 0)
did_bind = true;
return r;
}
-int SimpleMessenger::rebind(int avoid_port)
+int SimpleMessenger::rebind(const set<int>& avoid_ports)
{
- ldout(cct,1) << "rebind avoid " << avoid_port << dendl;
+ ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
mark_down_all();
assert(did_bind);
- return accepter.rebind(avoid_port);
+ return accepter.rebind(avoid_ports);
}
int SimpleMessenger::start()
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 6be1a0a9539..0d54d174965 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -197,7 +197,7 @@ public:
*
* @param avoid_port An additional port to avoid binding to.
*/
- int rebind(int avoid_port);
+ int rebind(const set<int>& avoid_ports);
/** @} Configuration functions */
/**
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 192f5d7a60c..e725e97e822 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -868,7 +868,10 @@ int OSD::peek_journal_fsid(string path, uuid_d& fsid)
// cons/des
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
- Messenger *hbclientm, Messenger *hbserverm, MonClient *mc,
+ Messenger *hb_clientm,
+ Messenger *hb_front_serverm,
+ Messenger *hb_back_serverm,
+ MonClient *mc,
const std::string &dev, const std::string &jdev) :
Dispatcher(external_messenger->cct),
osd_lock("OSD::osd_lock"),
@@ -900,8 +903,9 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
paused_recovery(false),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
- hbclient_messenger(hbclientm),
- hbserver_messenger(hbserverm),
+ hbclient_messenger(hb_clientm),
+ hb_front_server_messenger(hb_front_serverm),
+ hb_back_server_messenger(hb_back_serverm),
heartbeat_thread(this),
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
@@ -1120,7 +1124,8 @@ int OSD::init()
cluster_messenger->add_dispatcher_head(this);
hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
- hbserver_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
r = monc->init();
@@ -1449,7 +1454,8 @@ int OSD::shutdown()
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
- hbserver_messenger->shutdown();
+ hb_front_server_messenger->shutdown();
+ hb_back_server_messenger->shutdown();
peering_wq.clear();
return r;
}
@@ -2244,16 +2250,24 @@ void OSD::_add_heartbeat_peer(int p)
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
- ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
- if (!con)
+ pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
+ if (!cons.first)
return;
hi = &heartbeat_peers[p];
- hi->con = con.get();
- hi->con->get();
hi->peer = p;
- hi->con->set_priv(new HeartbeatSession(p));
+ HeartbeatSession *s = new HeartbeatSession(p);
+ hi->con_back = cons.first.get();
+ hi->con_back->get();
+ hi->con_back->set_priv(s);
+ if (cons.second) {
+ hi->con_front = cons.second.get();
+ hi->con_front->get();
+ hi->con_front->set_priv(s->get());
+ }
dout(10) << "_add_heartbeat_peer: new peer osd." << p
- << " " << hi->con->get_peer_addr() << dendl;
+ << " " << hi->con_back->get_peer_addr()
+ << " " << (hi->con_front ? hi->con_front->get_peer_addr() : entity_addr_t())
+ << dendl;
} else {
hi = &i->second;
}
@@ -2304,10 +2318,15 @@ void OSD::maybe_update_heartbeat_peers()
while (p != heartbeat_peers.end()) {
if (p->second.epoch < osdmap->get_epoch()) {
dout(20) << " removing heartbeat peer osd." << p->first
- << " " << p->second.con->get_peer_addr()
+ << " " << p->second.con_back->get_peer_addr()
+ << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t())
<< dendl;
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_front);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p++);
} else {
++p;
@@ -2322,8 +2341,13 @@ void OSD::reset_heartbeat_peers()
dout(10) << "reset_heartbeat_peers" << dendl;
Mutex::Locker l(heartbeat_lock);
while (!heartbeat_peers.empty()) {
- hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
- heartbeat_peers.begin()->second.con->put();
+ HeartbeatInfo& hi = heartbeat_peers.begin()->second;
+ hbclient_messenger->mark_down(hi.con_back);
+ hi.con_back->put();
+ if (hi.con_front) {
+ hbclient_messenger->mark_down(hi.con_front);
+ hi.con_front->put();
+ }
heartbeat_peers.erase(heartbeat_peers.begin());
}
failure_queue.clear();
@@ -2383,7 +2407,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
curmap->get_epoch(),
MOSDPing::PING_REPLY,
m->stamp);
- hbserver_messenger->send_message(r, m->get_connection());
+ m->get_connection()->get_messenger()->send_message(r, m->get_connection());
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
@@ -2401,12 +2425,26 @@ void OSD::handle_osd_ping(MOSDPing *m)
{
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
if (i != heartbeat_peers.end()) {
- dout(25) << "handle_osd_ping got reply from osd." << from
- << " first_rx " << i->second.first_tx
- << " last_tx " << i->second.last_tx
- << " last_rx " << i->second.last_rx << " -> " << m->stamp
- << dendl;
- i->second.last_rx = m->stamp;
+ if (m->get_connection() == i->second.con_back) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
+ << " last_rx_front " << i->second.last_rx_front
+ << dendl;
+ i->second.last_rx_back = m->stamp;
+ // if there is no front con, set both stamps.
+ if (i->second.con_front == NULL)
+ i->second.last_rx_front = m->stamp;
+ } else if (m->get_connection() == i->second.con_front) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back
+ << " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
+ << dendl;
+ i->second.last_rx_front = m->stamp;
+ }
}
if (m->map_epoch &&
@@ -2420,12 +2458,19 @@ void OSD::handle_osd_ping(MOSDPing *m)
}
}
- // Cancel false reports
- if (failure_queue.count(from))
- failure_queue.erase(from);
- if (failure_pending.count(from)) {
- send_still_alive(curmap->get_epoch(), failure_pending[from]);
- failure_pending.erase(from);
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_grace;
+ if (i->second.is_healthy(cutoff)) {
+ // Cancel false reports
+ if (failure_queue.count(from)) {
+ dout(10) << "handle_osd_ping canceling queued failure report for osd." << from<< dendl;
+ failure_queue.erase(from);
+ }
+ if (failure_pending.count(from)) {
+ dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << from<< dendl;
+ send_still_alive(curmap->get_epoch(), failure_pending[from]);
+ failure_pending.erase(from);
+ }
}
}
break;
@@ -2480,27 +2525,25 @@ void OSD::heartbeat_check()
dout(25) << "heartbeat_check osd." << p->first
<< " first_tx " << p->second.first_tx
<< " last_tx " << p->second.last_tx
- << " last_rx " << p->second.last_rx
+ << " last_rx_back " << p->second.last_rx_back
+ << " last_rx_front " << p->second.last_rx_front
<< dendl;
- if (p->second.last_rx == utime_t()) {
- if (p->second.last_tx == utime_t() ||
- p->second.first_tx > cutoff)
- continue; // just started sending recently
- derr << "heartbeat_check: no reply from osd." << p->first
- << " ever, first ping sent " << p->second.first_tx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_tx;
- } else {
- if (p->second.last_rx > cutoff)
- continue; // got recent reply
- derr << "heartbeat_check: no reply from osd." << p->first
- << " since " << p->second.last_rx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_rx;
+ if (!p->second.is_healthy(cutoff)) {
+ if (p->second.last_rx_back == utime_t() ||
+ p->second.last_rx_front == utime_t()) {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " ever on either front or back, first ping sent " << p->second.first_tx
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = p->second.last_tx;
+ } else {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " since back " << p->second.last_rx_back
+ << " front " << p->second.last_rx_front
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = MIN(p->second.last_rx_back, p->second.last_rx_front);
+ }
}
}
}
@@ -2531,16 +2574,21 @@ void OSD::heartbeat()
i != heartbeat_peers.end();
++i) {
int peer = i->first;
- dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
- Message *m = new MOSDPing(monc->get_fsid(),
- service.get_osdmap()->get_epoch(),
- MOSDPing::PING,
- now);
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
- hbclient_messenger->send_message(m, i->second.con);
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_back);
+ if (i->second.con_front)
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_front);
}
dout(30) << "heartbeat check" << dendl;
@@ -2574,20 +2622,30 @@ bool OSD::heartbeat_reset(Connection *con)
}
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
- p->second.con == con) {
- ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
- if (!newcon) {
+ p->second.con_back == con) {
+ pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ if (!newcon.first) {
dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
} else {
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = newcon.get();
- p->second.con->get();
- p->second.con->set_priv(s);
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back = newcon.first.get();
+ p->second.con_back->get();
+ p->second.con_back->set_priv(s);
+ if (p->second.con_front)
+ hbclient_messenger->mark_down(p->second.con_front);
+ if (newcon.second) {
+ p->second.con_front = newcon.second.get();
+ p->second.con_front->get();
+ p->second.con_front->set_priv(s->get());
+ } else {
+ p->second.con_front = NULL;
+ }
}
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
+ hbclient_messenger->mark_down(con);
}
- hbclient_messenger->mark_down(con);
heartbeat_lock.Unlock();
s->put();
}
@@ -3023,18 +3081,28 @@ void OSD::_send_boot()
cluster_messenger->set_addr_unknowns(cluster_addr);
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
}
- entity_addr_t hb_addr = hbserver_messenger->get_myaddr();
- if (hb_addr.is_blank_ip()) {
- int port = hb_addr.get_port();
- hb_addr = cluster_addr;
- hb_addr.set_port(port);
- hbserver_messenger->set_addr_unknowns(hb_addr);
- dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
+ entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
+ if (hb_back_addr.is_blank_ip()) {
+ int port = hb_back_addr.get_port();
+ hb_back_addr = cluster_addr;
+ hb_back_addr.set_port(port);
+ hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
+ dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
}
- MOSDBoot *mboot = new MOSDBoot(superblock, boot_epoch, hb_addr, cluster_addr);
+ entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
+ if (hb_front_addr.is_blank_ip()) {
+ int port = hb_front_addr.get_port();
+ hb_front_addr = client_messenger->get_myaddr();
+ hb_front_addr.set_port(port);
+ hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
+ dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
+ }
+
+ MOSDBoot *mboot = new MOSDBoot(superblock, boot_epoch, hb_back_addr, hb_front_addr, cluster_addr);
dout(10) << " client_addr " << client_messenger->get_myaddr()
<< ", cluster_addr " << cluster_addr
- << ", hb addr " << hb_addr
+ << ", hb_back_addr " << hb_back_addr
+ << ", hb_front_addr " << hb_front_addr
<< dendl;
monc->send_mon_message(mboot);
}
@@ -3105,20 +3173,23 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
return ret;
}
-ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
// service map is always newer/newest
assert(from_epoch <= next_osdmap->get_epoch());
+ pair<ConnectionRef,ConnectionRef> ret;
if (next_osdmap->is_down(peer) ||
next_osdmap->get_info(peer).up_from > from_epoch) {
- return NULL;
+ return ret;
}
- ConnectionRef ret(
- osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)));
- ret->put(); // Ref from get_connection
+ ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
+ ret.first->put(); // Ref from get_connection
+ ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
+ if (ret.second)
+ ret.second->put(); // Ref from get_connection
return ret;
}
@@ -3601,7 +3672,7 @@ bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch
if (name.is_osd() &&
osdmap->is_up(name.num()) &&
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_addr(name.num()) == con->get_peer_addr())) {
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
// remember
epoch_t has = note_peer_epoch(name.num(), epoch);
@@ -4199,8 +4270,12 @@ void OSD::note_down_osd(int peer)
failure_pending.erase(peer);
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
if (p != heartbeat_peers.end()) {
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p);
}
heartbeat_lock.Unlock();
@@ -4414,7 +4489,8 @@ void OSD::handle_osd_map(MOSDMap *m)
} else if (!osdmap->is_up(whoami) ||
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
- !osdmap->get_hb_addr(whoami).probably_equals(hbserver_messenger->get_myaddr())) {
+ !osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()) ||
+ !osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr())) {
if (!osdmap->is_up(whoami)) {
if (service.is_preparing_to_stop()) {
service.got_stop_ack();
@@ -4431,10 +4507,14 @@ void OSD::handle_osd_map(MOSDMap *m)
clog.error() << "map e" << osdmap->get_epoch()
<< " had wrong cluster addr (" << osdmap->get_cluster_addr(whoami)
<< " != my " << cluster_messenger->get_myaddr() << ")";
- else if (!osdmap->get_hb_addr(whoami).probably_equals(hbserver_messenger->get_myaddr()))
+ else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()))
+ clog.error() << "map e" << osdmap->get_epoch()
+ << " had wrong hb back addr (" << osdmap->get_hb_back_addr(whoami)
+ << " != my " << hb_back_server_messenger->get_myaddr() << ")";
+ else if (!osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))
clog.error() << "map e" << osdmap->get_epoch()
- << " had wrong hb addr (" << osdmap->get_hb_addr(whoami)
- << " != my " << hbserver_messenger->get_myaddr() << ")";
+ << " had wrong hb front addr (" << osdmap->get_hb_front_addr(whoami)
+ << " != my " << hb_front_server_messenger->get_myaddr() << ")";
if (!service.is_stopping()) {
state = STATE_BOOTING;
@@ -4442,14 +4522,20 @@ void OSD::handle_osd_map(MOSDMap *m)
do_restart = true;
bind_epoch = osdmap->get_epoch();
- int cport = cluster_messenger->get_myaddr().get_port();
- int hbport = hbserver_messenger->get_myaddr().get_port();
+ set<int> avoid_ports;
+ avoid_ports.insert(cluster_messenger->get_myaddr().get_port());
+ avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port());
+ avoid_ports.insert(hb_front_server_messenger->get_myaddr().get_port());
+
+ int r = cluster_messenger->rebind(avoid_ports);
+ if (r != 0)
+ do_shutdown = true; // FIXME: do_restart?
- int r = cluster_messenger->rebind(hbport);
+ r = hb_back_server_messenger->rebind(avoid_ports);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
- r = hbserver_messenger->rebind(cport);
+ r = hb_front_server_messenger->rebind(avoid_ports);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index bc6ae94f15e..428284c85ab 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -295,7 +295,7 @@ public:
next_osdmap = map;
}
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
- ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
+ pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
void send_message_osd_cluster(Message *m, Connection *con) {
cluster_messenger->send_message(m, con);
@@ -696,11 +696,23 @@ private:
/// information about a heartbeat peer
struct HeartbeatInfo {
int peer; ///< peer
- Connection *con; ///< peer connection
+ Connection *con_front; ///< peer connection (front)
+ Connection *con_back; ///< peer connection (back)
utime_t first_tx; ///< time we sent our first ping request
utime_t last_tx; ///< last time we sent a ping request
- utime_t last_rx; ///< last time we got a ping reply
+ utime_t last_rx_front; ///< last time we got a ping reply on the front side
+ utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
+
+ bool is_healthy(utime_t cutoff) {
+ return
+ (last_rx_front > cutoff ||
+ (last_rx_front == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))) &&
+ (last_rx_back > cutoff ||
+ (last_rx_back == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff)));
+ }
};
/// state attached to outgoing heartbeat connections
struct HeartbeatSession : public RefCountedObject {
@@ -715,7 +727,9 @@ private:
epoch_t heartbeat_epoch; ///< last epoch we updated our heartbeat peers
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
- Messenger *hbclient_messenger, *hbserver_messenger;
+ Messenger *hbclient_messenger;
+ Messenger *hb_front_server_messenger;
+ Messenger *hb_back_server_messenger;
void _add_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
@@ -1568,7 +1582,8 @@ protected:
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
- OSD(int id, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout,
+ OSD(int id, Messenger *internal, Messenger *external,
+ Messenger *hb_client, Messenger *hb_front_server, Messenger *hb_back_server,
MonClient *mc, const std::string &dev, const std::string &jdev);
~OSD();
diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc
index 8e0474eb781..e768c3a30da 100644
--- a/src/osd/OSDMap.cc
+++ b/src/osd/OSDMap.cc
@@ -315,18 +315,19 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
::encode(new_pg_temp, bl);
// extended
- __u16 ev = 9;
+ __u16 ev = 10;
::encode(ev, bl);
- ::encode(new_hb_up, bl);
+ ::encode(new_hb_back_up, bl);
::encode(new_up_thru, bl);
::encode(new_last_clean_interval, bl);
::encode(new_lost, bl);
::encode(new_blacklist, bl);
::encode(old_blacklist, bl);
- ::encode(new_up_internal, bl);
+ ::encode(new_up_cluster, bl);
::encode(cluster_snapshot, bl);
::encode(new_uuid, bl);
::encode(new_xinfo, bl);
+ ::encode(new_hb_front_up, bl);
}
void OSDMap::Incremental::decode(bufferlist::iterator &p)
@@ -402,7 +403,7 @@ void OSDMap::Incremental::decode(bufferlist::iterator &p)
__u16 ev = 0;
if (v >= 5)
::decode(ev, p);
- ::decode(new_hb_up, p);
+ ::decode(new_hb_back_up, p);
if (v < 5)
::decode(new_pool_names, p);
::decode(new_up_thru, p);
@@ -411,13 +412,15 @@ void OSDMap::Incremental::decode(bufferlist::iterator &p)
::decode(new_blacklist, p);
::decode(old_blacklist, p);
if (ev >= 6)
- ::decode(new_up_internal, p);
+ ::decode(new_up_cluster, p);
if (ev >= 7)
::decode(cluster_snapshot, p);
if (ev >= 8)
::decode(new_uuid, p);
if (ev >= 9)
::decode(new_xinfo, p);
+ if (ev >= 10)
+ ::decode(new_hb_front_up, p);
}
void OSDMap::Incremental::dump(Formatter *f) const
@@ -468,8 +471,9 @@ void OSDMap::Incremental::dump(Formatter *f) const
f->open_object_section("osd");
f->dump_int("osd", p->first);
f->dump_stream("public_addr") << p->second;
- f->dump_stream("cluster_addr") << new_up_internal.find(p->first)->second;
- f->dump_stream("heartbeat_addr") << new_hb_up.find(p->first)->second;
+ f->dump_stream("cluster_addr") << new_up_cluster.find(p->first)->second;
+ f->dump_stream("heartbeat_back_addr") << new_hb_back_up.find(p->first)->second;
+ f->dump_stream("heartbeat_front_addr") << new_hb_front_up.find(p->first)->second;
f->close_section();
}
f->close_section();
@@ -623,7 +627,8 @@ void OSDMap::set_max_osd(int m)
osd_xinfo.resize(m);
osd_addrs->client_addr.resize(m);
osd_addrs->cluster_addr.resize(m);
- osd_addrs->hb_addr.resize(m);
+ osd_addrs->hb_back_addr.resize(m);
+ osd_addrs->hb_front_addr.resize(m);
osd_uuid->resize(m);
calc_num_osds();
@@ -758,9 +763,14 @@ void OSDMap::dedup(const OSDMap *o, OSDMap *n)
n->osd_addrs->cluster_addr[i] = o->osd_addrs->cluster_addr[i];
else
diff++;
- if ( n->osd_addrs->hb_addr[i] && o->osd_addrs->hb_addr[i] &&
- *n->osd_addrs->hb_addr[i] == *o->osd_addrs->hb_addr[i])
- n->osd_addrs->hb_addr[i] = o->osd_addrs->hb_addr[i];
+ if ( n->osd_addrs->hb_back_addr[i] && o->osd_addrs->hb_back_addr[i] &&
+ *n->osd_addrs->hb_back_addr[i] == *o->osd_addrs->hb_back_addr[i])
+ n->osd_addrs->hb_back_addr[i] = o->osd_addrs->hb_back_addr[i];
+ else
+ diff++;
+ if ( n->osd_addrs->hb_front_addr[i] && o->osd_addrs->hb_front_addr[i] &&
+ *n->osd_addrs->hb_front_addr[i] == *o->osd_addrs->hb_front_addr[i])
+ n->osd_addrs->hb_front_addr[i] = o->osd_addrs->hb_front_addr[i];
else
diff++;
}
@@ -869,15 +879,18 @@ int OSDMap::apply_incremental(const Incremental &inc)
++i) {
osd_state[i->first] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
osd_addrs->client_addr[i->first].reset(new entity_addr_t(i->second));
- if (inc.new_hb_up.empty())
- osd_addrs->hb_addr[i->first].reset(new entity_addr_t(i->second)); //this is a backward-compatibility hack
+ if (inc.new_hb_back_up.empty())
+ osd_addrs->hb_back_addr[i->first].reset(new entity_addr_t(i->second)); //this is a backward-compatibility hack
else
- osd_addrs->hb_addr[i->first].reset(
- new entity_addr_t(inc.new_hb_up.find(i->first)->second));
+ osd_addrs->hb_back_addr[i->first].reset(
+ new entity_addr_t(inc.new_hb_back_up.find(i->first)->second));
+ if (!inc.new_hb_front_up.empty())
+ osd_addrs->hb_front_addr[i->first].reset(
+ new entity_addr_t(inc.new_hb_front_up.find(i->first)->second));
osd_info[i->first].up_from = epoch;
}
- for (map<int32_t,entity_addr_t>::const_iterator i = inc.new_up_internal.begin();
- i != inc.new_up_internal.end();
+ for (map<int32_t,entity_addr_t>::const_iterator i = inc.new_up_cluster.begin();
+ i != inc.new_up_cluster.end();
++i)
osd_addrs->cluster_addr[i->first].reset(new entity_addr_t(i->second));
@@ -1184,9 +1197,9 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
::encode(cbl, bl);
// extended
- __u16 ev = 9;
+ __u16 ev = 10;
::encode(ev, bl);
- ::encode(osd_addrs->hb_addr, bl);
+ ::encode(osd_addrs->hb_back_addr, bl);
::encode(osd_info, bl);
::encode(blacklist, bl);
::encode(osd_addrs->cluster_addr, bl);
@@ -1194,6 +1207,7 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
::encode(cluster_snapshot, bl);
::encode(*osd_uuid, bl);
::encode(osd_xinfo, bl);
+ ::encode(osd_addrs->hb_front_addr, bl);
}
void OSDMap::decode(bufferlist& bl)
@@ -1277,7 +1291,7 @@ void OSDMap::decode(bufferlist::iterator& p)
__u16 ev = 0;
if (v >= 5)
::decode(ev, p);
- ::decode(osd_addrs->hb_addr, p);
+ ::decode(osd_addrs->hb_back_addr, p);
::decode(osd_info, p);
if (v < 5)
::decode(pool_name, p);
@@ -1303,6 +1317,11 @@ void OSDMap::decode(bufferlist::iterator& p)
else
osd_xinfo.resize(max_osd);
+ if (ev >= 10)
+ ::decode(osd_addrs->hb_front_addr, p);
+ else
+ osd_addrs->hb_front_addr.resize(osd_addrs->hb_back_addr.size());
+
// index pool names
name_pool.clear();
for (map<int64_t,string>::iterator i = pool_name.begin(); i != pool_name.end(); ++i)
@@ -1358,7 +1377,8 @@ void OSDMap::dump(Formatter *f) const
get_info(i).dump(f);
f->dump_stream("public_addr") << get_addr(i);
f->dump_stream("cluster_addr") << get_cluster_addr(i);
- f->dump_stream("heartbeat_addr") << get_hb_addr(i);
+ f->dump_stream("heartbeat_back_addr") << get_hb_back_addr(i);
+ f->dump_stream("heartbeat_front_addr") << get_hb_front_addr(i);
set<string> st;
get_state(i, st);
@@ -1504,7 +1524,8 @@ void OSDMap::print(ostream& out) const
out << " weight " << get_weightf(i);
const osd_info_t& info(get_info(i));
out << " " << info;
- out << " " << get_addr(i) << " " << get_cluster_addr(i) << " " << get_hb_addr(i);
+ out << " " << get_addr(i) << " " << get_cluster_addr(i) << " " << get_hb_back_addr(i)
+ << " " << get_hb_front_addr(i);
set<string> st;
get_state(i, st);
out << " " << st;
diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h
index 6588382971f..deebc376a91 100644
--- a/src/osd/OSDMap.h
+++ b/src/osd/OSDMap.h
@@ -127,7 +127,7 @@ public:
map<int64_t,string> new_pool_names;
set<int64_t> old_pools;
map<int32_t,entity_addr_t> new_up_client;
- map<int32_t,entity_addr_t> new_up_internal;
+ map<int32_t,entity_addr_t> new_up_cluster;
map<int32_t,uint8_t> new_state; // XORed onto previous state.
map<int32_t,uint32_t> new_weight;
map<pg_t,vector<int32_t> > new_pg_temp; // [] to remove
@@ -139,7 +139,8 @@ public:
map<entity_addr_t,utime_t> new_blacklist;
vector<entity_addr_t> old_blacklist;
- map<int32_t, entity_addr_t> new_hb_up;
+ map<int32_t, entity_addr_t> new_hb_back_up;
+ map<int32_t, entity_addr_t> new_hb_front_up;
string cluster_snapshot;
@@ -181,7 +182,8 @@ private:
struct addrs_s {
vector<std::tr1::shared_ptr<entity_addr_t> > client_addr;
vector<std::tr1::shared_ptr<entity_addr_t> > cluster_addr;
- vector<std::tr1::shared_ptr<entity_addr_t> > hb_addr;
+ vector<std::tr1::shared_ptr<entity_addr_t> > hb_back_addr;
+ vector<std::tr1::shared_ptr<entity_addr_t> > hb_front_addr;
entity_addr_t blank;
};
std::tr1::shared_ptr<addrs_s> osd_addrs;
@@ -343,9 +345,13 @@ private:
return get_addr(osd);
return *osd_addrs->cluster_addr[osd];
}
- const entity_addr_t &get_hb_addr(int osd) const {
+ const entity_addr_t &get_hb_back_addr(int osd) const {
assert(exists(osd));
- return osd_addrs->hb_addr[osd] ? *osd_addrs->hb_addr[osd] : osd_addrs->blank;
+ return osd_addrs->hb_back_addr[osd] ? *osd_addrs->hb_back_addr[osd] : osd_addrs->blank;
+ }
+ const entity_addr_t &get_hb_front_addr(int osd) const {
+ assert(exists(osd));
+ return osd_addrs->hb_front_addr[osd] ? *osd_addrs->hb_front_addr[osd] : osd_addrs->blank;
}
entity_inst_t get_inst(int osd) const {
assert(is_up(osd));
@@ -355,9 +361,13 @@ private:
assert(is_up(osd));
return entity_inst_t(entity_name_t::OSD(osd), get_cluster_addr(osd));
}
- entity_inst_t get_hb_inst(int osd) const {
+ entity_inst_t get_hb_back_inst(int osd) const {
+ assert(is_up(osd));
+ return entity_inst_t(entity_name_t::OSD(osd), get_hb_back_addr(osd));
+ }
+ entity_inst_t get_hb_front_inst(int osd) const {
assert(is_up(osd));
- return entity_inst_t(entity_name_t::OSD(osd), get_hb_addr(osd));
+ return entity_inst_t(entity_name_t::OSD(osd), get_hb_front_addr(osd));
}
const uuid_d& get_uuid(int osd) const {