diff options
author | Sage Weil <sage@inktank.com> | 2012-11-05 06:01:53 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-11-30 13:17:26 -0800 |
commit | 2494499295424b032ac7f79d72aca1128cd92678 (patch) | |
tree | 8aa56ab2bae70a44614bfd10dd92c4e38c47dded | |
parent | 246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd (diff) | |
download | ceph-2494499295424b032ac7f79d72aca1128cd92678.tar.gz |
hb msgr trainwreck
-rw-r--r-- | src/ceph_osd.cc | 10 | ||||
-rw-r--r-- | src/osd/OSD.cc | 36 | ||||
-rw-r--r-- | src/osd/OSD.h | 7 |
3 files changed, 38 insertions, 15 deletions
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 8fbeb753b89..a630c7594b5 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -323,12 +323,8 @@ int main(int argc, const char **argv) Messenger *messenger_hbclient = Messenger::create(g_ceph_context, entity_name_t::OSD(whoami), "hbclient", getpid()); - Messenger *messenger_hbserver = Messenger::create(g_ceph_context, - entity_name_t::OSD(whoami), "hbserver", - getpid()); cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL); messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL); - messenger_hbserver->set_cluster_protocol(CEPH_OSD_PROTOCOL); cout << "starting osd." << whoami << " at " << client_messenger->get_myaddr() @@ -369,8 +365,6 @@ int main(int argc, const char **argv) messenger_hbclient->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0, 0)); - messenger_hbserver->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::stateless_server(0, 0)); r = client_messenger->bind(g_conf->public_addr); if (r < 0) @@ -379,6 +373,8 @@ int main(int argc, const char **argv) if (r < 0) exit(1); + Messenger *messenger_hbserver = OSD::create_hbserver_messenger(whoami, getpid()); + // hb should bind to same ip as cluster_addr (if specified) entity_addr_t hb_addr = g_conf->osd_heartbeat_addr; if (hb_addr.is_blank_ip()) { @@ -409,7 +405,7 @@ int main(int argc, const char **argv) return -1; global_init_chdir(g_ceph_context); - osd = new OSD(whoami, cluster_messenger, client_messenger, + osd = new OSD(whoami, getpid(), cluster_messenger, client_messenger, messenger_hbclient, messenger_hbserver, &mc, g_conf->osd_data, g_conf->osd_journal); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7297548ebe7..aa7f9c4e447 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -689,7 +689,8 @@ int OSD::peek_journal_fsid(string path, uuid_d& fsid) // cons/des -OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, +OSD::OSD(int id, uint64_t nonce, + Messenger *internal_messenger, Messenger *external_messenger, Messenger *hbclientm, Messenger *hbserverm, MonClient *mc, const std::string &dev, const std::string &jdev) : Dispatcher(external_messenger->cct), @@ -710,6 +711,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, store(NULL), clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS), whoami(id), + nonce(nonce), dev_path(dev), journal_path(jdev), dispatch_running(false), osd_compat(get_osd_compat_set()), @@ -722,7 +724,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0), hbclient_messenger(hbclientm), - hbserver_messenger(hbserverm), + hbserver_messenger(hbserverm), hbserver_messenger_previous(NULL), heartbeat_thread(this), heartbeat_dispatcher(this), stat_lock("OSD::stat_lock"), @@ -776,6 +778,18 @@ void OSD::handle_signal(int signum) suicide(0); } +Messenger *OSD::create_hbserver_messenger(int whoami, uint64_t nonce) +{ + Messenger *msgr = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "hbserver", + nonce); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::stateless_server(0, 0)); + + return msgr; +} + int OSD::pre_init() { Mutex::Locker lock(osd_lock); @@ -3760,12 +3774,22 @@ void OSD::handle_osd_map(MOSDMap *m) if (r != 0) do_shutdown = true; // FIXME: do_restart? - r = hbserver_messenger->rebind(cport); - if (r != 0) - do_shutdown = true; // FIXME: do_restart? - hbclient_messenger->mark_down_all(); + if (hbserver_messenger_previous) { + hbserver_messenger_prevoius->mark_down_all(); + hbserver_messenger_previous->shutdown(); + // FIXME: don't leak! + } + hbserver_messenger_previous = hbserver_messenger; + hbserver_messenger = create_hbserver_messenger(whoami, nonce); + entity_addr_t hb_addr = hbserver_messenger_previous->get_addr(); + hb_addr.set_port(0); + r = hbserver_messenger->bind(hb_addr); + if (r != 0) + do_shutdown = true; + hbserver_messenger->add_dispatcher_head(&heartbeat_dispatcher); + reset_heartbeat_peers(); } } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 12d50c6f9c4..662b023c8fc 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -385,6 +385,7 @@ protected: LogClient clog; int whoami; + uint64_t nonce; std::string dev_path, journal_path; class C_Tick : public Context { @@ -522,7 +523,7 @@ private: epoch_t heartbeat_epoch; ///< last epoch we updated our heartbeat peers map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo utime_t last_mon_heartbeat; - Messenger *hbclient_messenger, *hbserver_messenger; + Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous; void _add_heartbeat_peer(int p); bool heartbeat_reset(Connection *con); @@ -1330,7 +1331,7 @@ protected: public: /* internal and external can point to the same messenger, they will still * be cleaned up properly*/ - OSD(int id, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout, + OSD(int id, uint64_t nonce, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout, MonClient *mc, const std::string &dev, const std::string &jdev); ~OSD(); @@ -1376,6 +1377,8 @@ public: void suicide(int exitcode); int shutdown(); + static Messenger *create_hbserver_messenger(); + void handle_signal(int signum); void handle_rep_scrub(MOSDRepScrub *m); |