diff options
author | Sage Weil <sage@inktank.com> | 2013-02-12 13:39:52 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-02-12 13:39:52 -0800 |
commit | 102a519632f1b7a0fede9a3fbd4a5c1df0e732a5 (patch) | |
tree | fb3b1f984299de76d46fdbfc894e8f94245b3a24 | |
parent | 2c6afa058e8b1738c1400392320482945834de86 (diff) | |
parent | 2ebf4d065af3dc2e581a25b921071af3efb57f8a (diff) | |
download | ceph-102a519632f1b7a0fede9a3fbd4a5c1df0e732a5.tar.gz |
Merge remote-tracking branch 'gh/wip-bobtail-osd-msgr' into bobtail
-rw-r--r-- | src/msg/Messenger.h | 4 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 4 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 1 | ||||
-rw-r--r-- | src/osd/OSD.cc | 93 | ||||
-rw-r--r-- | src/osd/OSD.h | 4 |
5 files changed, 38 insertions, 68 deletions
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 6ece6536ad7..b75e4420f66 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -440,6 +440,10 @@ public: */ virtual Connection *get_connection(const entity_inst_t& dest) = 0; /** + * Get the Connection object associated with ourselves. + */ + virtual Connection *get_loopback_connection() = 0; + /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying * connection has broken, this function does nothing. diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 0e8ed27c7f4..dd496947151 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -380,6 +380,10 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) } } +Connection *SimpleMessenger::get_loopback_connection() +{ + return (Connection*)local_connection->get(); +} void SimpleMessenger::submit_message(Message *m, Connection *con, const entity_addr_t& dest_addr, int dest_type, bool lazy) diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index fb392e8f741..cc946e3d25a 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -294,6 +294,7 @@ public: * @return The requested Connection, as a pointer whose reference you own. */ virtual Connection *get_connection(const entity_inst_t& dest); + virtual Connection *get_loopback_connection(); /** * Send a "keepalive" ping to the given dest, if it has a working Connection. * If the Messenger doesn't already have a Connection, or if the underlying diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 4a0b4ef6a9b..661947b4fdf 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3141,47 +3141,45 @@ void OSD::forget_peer_epoch(int peer, epoch_t as_of) } -bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch, - Session* session) +bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch, Session* session) { bool shared = false; - dout(20) << "_share_map_incoming " << inst << " " << epoch << dendl; + dout(20) << "_share_map_incoming " << name << " " << con->get_peer_addr() << " " << epoch << dendl; //assert(osd_lock.is_locked()); assert(is_active()); // does client have old map? - if (inst.name.is_client()) { + if (name.is_client()) { bool sendmap = epoch < osdmap->get_epoch(); if (sendmap && session) { - if ( session->last_sent_epoch < osdmap->get_epoch() ) { + if (session->last_sent_epoch < osdmap->get_epoch()) { session->last_sent_epoch = osdmap->get_epoch(); - } - else { + } else { sendmap = false; //we don't need to send it out again - dout(15) << inst.name << " already sent incremental to update from epoch "<< epoch << dendl; + dout(15) << name << " already sent incremental to update from epoch "<< epoch << dendl; } } if (sendmap) { - dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl; - send_incremental_map(epoch, inst); + dout(10) << name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl; + send_incremental_map(epoch, con); shared = true; } } // does peer have old map? - if (inst.name.is_osd() && - osdmap->is_up(inst.name.num()) && - (osdmap->get_cluster_inst(inst.name.num()) == inst || - osdmap->get_hb_inst(inst.name.num()) == inst)) { + if (name.is_osd() && + osdmap->is_up(name.num()) && + (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || + osdmap->get_hb_addr(name.num()) == con->get_peer_addr())) { // remember - epoch_t has = note_peer_epoch(inst.name.num(), epoch); + epoch_t has = note_peer_epoch(name.num(), epoch); // share? if (has < osdmap->get_epoch()) { - dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl; - note_peer_epoch(inst.name.num(), osdmap->get_epoch()); - send_incremental_map(epoch, osdmap->get_cluster_inst(inst.name.num())); + dout(10) << name << " " << con->get_peer_addr() << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl; + note_peer_epoch(name.num(), osdmap->get_epoch()); + send_incremental_map(epoch, con); shared = true; } } @@ -3229,6 +3227,14 @@ bool OSD::heartbeat_dispatch(Message *m) handle_osd_ping((MOSDPing*)m); break; + case CEPH_MSG_OSD_MAP: + { + Connection *self = cluster_messenger->get_loopback_connection(); + cluster_messenger->send_message(m, self); + self->put(); + } + break; + default: return false; } @@ -4327,17 +4333,6 @@ MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to) return m; } -void OSD::send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy) -{ - Messenger *msgr = client_messenger; - if (entity_name_t::TYPE_OSD == inst.name._type) - msgr = cluster_messenger; - if (lazy) - msgr->lazy_send_message(m, inst); // only if we already have an open connection - else - msgr->send_message(m, inst); -} - void OSD::send_map(MOSDMap *m, Connection *con) { Messenger *msgr = client_messenger; @@ -4346,32 +4341,6 @@ void OSD::send_map(MOSDMap *m, Connection *con) msgr->send_message(m, con); } -void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy) -{ - dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() - << " to " << inst << dendl; - - if (since < superblock.oldest_map) { - // just send latest full map - MOSDMap *m = new MOSDMap(monc->get_fsid()); - m->oldest_map = superblock.oldest_map; - m->newest_map = superblock.newest_map; - epoch_t e = osdmap->get_epoch(); - get_map_bl(e, m->maps[e]); - send_map(m, inst, lazy); - return; - } - - while (since < osdmap->get_epoch()) { - epoch_t to = osdmap->get_epoch(); - if (to - since > (epoch_t)g_conf->osd_map_message_max) - to = since + g_conf->osd_map_message_max; - MOSDMap *m = build_incremental_map_msg(since, to); - send_map(m, inst, lazy); - since = to; - } -} - void OSD::send_incremental_map(epoch_t since, Connection *con) { dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() @@ -4538,13 +4507,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) if (!osdmap->have_inst(from) || osdmap->get_cluster_addr(from) != m->get_source_inst().addr) { dout(0) << "from dead osd." << from << ", dropping, sharing map" << dendl; - send_incremental_map(epoch, m->get_source_inst(), true); - - // close after we send the map; don't reconnect - Connection *con = m->get_connection(); - cluster_messenger->mark_down_on_empty(con); - cluster_messenger->mark_disposable(con); - + send_incremental_map(epoch, m->get_connection()); return false; } } @@ -5892,7 +5855,7 @@ void OSD::handle_op(OpRequestRef op) return; } // share our map with sender, if they're old - _share_map_incoming(m->get_source_inst(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), (Session *)m->get_connection()->get_priv()); int r = init_op_flags(m); if (r) { @@ -6007,7 +5970,7 @@ void OSD::handle_sub_op(OpRequestRef op) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source_inst(), m->map_epoch, + _share_map_incoming(m->get_source(), m->get_connection(), m->map_epoch, (Session*)m->get_connection()->get_priv()); if (service.splitting(pgid)) { @@ -6044,7 +6007,7 @@ void OSD::handle_sub_op_reply(OpRequestRef op) if (!require_same_or_newer_map(op, m->get_map_epoch())) return; // share our map with sender, if they're old - _share_map_incoming(m->get_source_inst(), m->get_map_epoch(), + _share_map_incoming(m->get_source(), m->get_connection(), m->get_map_epoch(), (Session*)m->get_connection()->get_priv()); PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index b36566d90e7..c3bc0b96839 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -786,7 +786,7 @@ private: epoch_t note_peer_epoch(int p, epoch_t e); void forget_peer_epoch(int p, epoch_t e); - bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch, + bool _share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch, Session *session = 0); void _share_map_outgoing(int peer, Connection *con, OSDMapRef map = OSDMapRef()); @@ -833,9 +833,7 @@ private: } MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to); - void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false); void send_incremental_map(epoch_t since, Connection *con); - void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy); void send_map(MOSDMap *m, Connection *con); protected: |