summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-05 06:01:53 -0800
committerSage Weil <sage@inktank.com>2012-11-30 13:17:26 -0800
commit2494499295424b032ac7f79d72aca1128cd92678 (patch)
tree8aa56ab2bae70a44614bfd10dd92c4e38c47dded
parent246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd (diff)
downloadceph-2494499295424b032ac7f79d72aca1128cd92678.tar.gz
hb msgr trainwreck
-rw-r--r--src/ceph_osd.cc10
-rw-r--r--src/osd/OSD.cc36
-rw-r--r--src/osd/OSD.h7
3 files changed, 38 insertions, 15 deletions
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index 8fbeb753b89..a630c7594b5 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -323,12 +323,8 @@ int main(int argc, const char **argv)
Messenger *messenger_hbclient = Messenger::create(g_ceph_context,
entity_name_t::OSD(whoami), "hbclient",
getpid());
- Messenger *messenger_hbserver = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "hbserver",
- getpid());
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hbserver->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cout << "starting osd." << whoami
<< " at " << client_messenger->get_myaddr()
@@ -369,8 +365,6 @@ int main(int argc, const char **argv)
messenger_hbclient->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::lossy_client(0, 0));
- messenger_hbserver->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0, 0));
r = client_messenger->bind(g_conf->public_addr);
if (r < 0)
@@ -379,6 +373,8 @@ int main(int argc, const char **argv)
if (r < 0)
exit(1);
+ Messenger *messenger_hbserver = OSD::create_hbserver_messenger(whoami, getpid());
+
// hb should bind to same ip as cluster_addr (if specified)
entity_addr_t hb_addr = g_conf->osd_heartbeat_addr;
if (hb_addr.is_blank_ip()) {
@@ -409,7 +405,7 @@ int main(int argc, const char **argv)
return -1;
global_init_chdir(g_ceph_context);
- osd = new OSD(whoami, cluster_messenger, client_messenger,
+ osd = new OSD(whoami, getpid(), cluster_messenger, client_messenger,
messenger_hbclient, messenger_hbserver,
&mc,
g_conf->osd_data, g_conf->osd_journal);
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 7297548ebe7..aa7f9c4e447 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -689,7 +689,8 @@ int OSD::peek_journal_fsid(string path, uuid_d& fsid)
// cons/des
-OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
+OSD::OSD(int id, uint64_t nonce,
+ Messenger *internal_messenger, Messenger *external_messenger,
Messenger *hbclientm, Messenger *hbserverm, MonClient *mc,
const std::string &dev, const std::string &jdev) :
Dispatcher(external_messenger->cct),
@@ -710,6 +711,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
store(NULL),
clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
whoami(id),
+ nonce(nonce),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
osd_compat(get_osd_compat_set()),
@@ -722,7 +724,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
hbclient_messenger(hbclientm),
- hbserver_messenger(hbserverm),
+ hbserver_messenger(hbserverm), hbserver_messenger_previous(NULL),
heartbeat_thread(this),
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
@@ -776,6 +778,18 @@ void OSD::handle_signal(int signum)
suicide(0);
}
+Messenger *OSD::create_hbserver_messenger(int whoami, uint64_t nonce)
+{
+ Messenger *msgr = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hbserver",
+ nonce);
+ msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ msgr->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
+
+ return msgr;
+}
+
int OSD::pre_init()
{
Mutex::Locker lock(osd_lock);
@@ -3760,12 +3774,22 @@ void OSD::handle_osd_map(MOSDMap *m)
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
- r = hbserver_messenger->rebind(cport);
- if (r != 0)
- do_shutdown = true; // FIXME: do_restart?
-
hbclient_messenger->mark_down_all();
+ if (hbserver_messenger_previous) {
+ hbserver_messenger_prevoius->mark_down_all();
+ hbserver_messenger_previous->shutdown();
+ // FIXME: don't leak!
+ }
+ hbserver_messenger_previous = hbserver_messenger;
+ hbserver_messenger = create_hbserver_messenger(whoami, nonce);
+ entity_addr_t hb_addr = hbserver_messenger_previous->get_addr();
+ hb_addr.set_port(0);
+ r = hbserver_messenger->bind(hb_addr);
+ if (r != 0)
+ do_shutdown = true;
+ hbserver_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+
reset_heartbeat_peers();
}
}
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 12d50c6f9c4..662b023c8fc 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -385,6 +385,7 @@ protected:
LogClient clog;
int whoami;
+ uint64_t nonce;
std::string dev_path, journal_path;
class C_Tick : public Context {
@@ -522,7 +523,7 @@ private:
epoch_t heartbeat_epoch; ///< last epoch we updated our heartbeat peers
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
- Messenger *hbclient_messenger, *hbserver_messenger;
+ Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous;
void _add_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
@@ -1330,7 +1331,7 @@ protected:
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
- OSD(int id, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout,
+ OSD(int id, uint64_t nonce, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout,
MonClient *mc, const std::string &dev, const std::string &jdev);
~OSD();
@@ -1376,6 +1377,8 @@ public:
void suicide(int exitcode);
int shutdown();
+ static Messenger *create_hbserver_messenger();
+
void handle_signal(int signum);
void handle_rep_scrub(MOSDRepScrub *m);