diff options
author | Sage Weil <sage@inktank.com> | 2012-11-30 12:12:23 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-11-30 12:12:23 -0800 |
commit | 246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd (patch) | |
tree | cd0f74dc59ddca9a57b72d2456c5b7602cafeffe | |
parent | 7412bd3675e3e941eeaefeb7d48b32007751357a (diff) | |
parent | a928b6dbf630b63108aa7805adf9601253d40397 (diff) | |
download | ceph-246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd.tar.gz |
Merge remote-tracking branch 'gh/wip-osd-msgr'
-rw-r--r-- | src/msg/DispatchQueue.h | 3 | ||||
-rw-r--r-- | src/msg/Message.h | 6 | ||||
-rw-r--r-- | src/osd/OSD.cc | 184 | ||||
-rw-r--r-- | src/osd/OSD.h | 45 | ||||
-rw-r--r-- | src/osd/PG.cc | 143 | ||||
-rw-r--r-- | src/osd/PG.h | 2 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 62 |
7 files changed, 298 insertions, 147 deletions
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index ad4584829d1..ea44c165d56 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -33,9 +33,6 @@ class SimpleMessenger; class Message; class Connection; -typedef boost::intrusive_ptr<Connection> ConnectionRef; -typedef boost::intrusive_ptr<Message> MessageRef; - /** * The DispatchQueue contains all the Pipes which have Messages * they want to be dispatched, carefully organized by Message priority diff --git a/src/msg/Message.h b/src/msg/Message.h index b6a113f771f..fc434ed9b85 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -18,6 +18,10 @@ #include <stdlib.h> #include <ostream> +#include <boost/intrusive_ptr.hpp> +// Because intusive_ptr clobbers our assert... +#include "include/assert.h" + #include "include/types.h" #include "include/buffer.h" #include "common/Throttle.h" @@ -261,6 +265,7 @@ public: rx_buffers.erase(tid); } }; +typedef boost::intrusive_ptr<Connection> ConnectionRef; @@ -466,6 +471,7 @@ public: void encode(uint64_t features, bool datacrc); }; +typedef boost::intrusive_ptr<Message> MessageRef; extern Message *decode_message(CephContext *cct, ceph_msg_header &header, ceph_msg_footer& footer, bufferlist& front, diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 913157a8508..7297548ebe7 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -158,6 +158,7 @@ OSDService::OSDService(OSD *osd) : rep_scrub_wq(osd->rep_scrub_wq), class_handler(osd->class_handler), publish_lock("OSDService::publish_lock"), + pre_publish_lock("OSDService::pre_publish_lock"), sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0), scrubs_active(0), watch_lock("OSD::watch_lock"), @@ -1773,9 +1774,13 @@ void OSD::_add_heartbeat_peer(int p) map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p); if (i == heartbeat_peers.end()) { + ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch()); + if (!con) + return; hi = &heartbeat_peers[p]; - hi->inst = osdmap->get_hb_inst(p); - hi->con = hbclient_messenger->get_connection(hi->inst); + hi->con = con.get(); + hi->con->get(); + hi->peer = p; hi->con->set_priv(new HeartbeatSession(p)); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con->get_peer_addr() << dendl; @@ -1901,8 +1906,12 @@ void OSD::handle_osd_ping(MOSDPing *m) if (curmap->is_up(from)) { note_peer_epoch(from, m->map_epoch); - if (is_active()) - _share_map_outgoing(curmap->get_cluster_inst(from)); + if (is_active()) { + ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch()); + if (con) { + _share_map_outgoing(from, con.get()); + } + } } } break; @@ -1922,8 +1931,12 @@ void OSD::handle_osd_ping(MOSDPing *m) if (m->map_epoch && curmap->is_up(from)) { note_peer_epoch(from, m->map_epoch); - if (is_active()) - _share_map_outgoing(curmap->get_cluster_inst(from)); + if (is_active()) { + ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch()); + if (con) { + _share_map_outgoing(from, con.get()); + } + } } // Cancel false reports @@ -2066,8 +2079,15 @@ bool OSD::heartbeat_reset(Connection *con) map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer); if (p != heartbeat_peers.end() && p->second.con == con) { + ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); + if (!newcon) { + dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl; + s->put(); + return true; + } dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl; - p->second.con = hbclient_messenger->get_connection(p->second.inst); + p->second.con = newcon.get(); + p->second.con->get(); p->second.con->set_priv(s); } else { dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl; @@ -2468,6 +2488,55 @@ void OSD::send_alive() } } +void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch) +{ + Mutex::Locker l(pre_publish_lock); + + // service map is always newer/newest + assert(from_epoch <= next_osdmap->get_epoch()); + + if (next_osdmap->is_down(peer) || + next_osdmap->get_info(peer).up_from > from_epoch) { + m->put(); + return; + } + osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer)); +} + +ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) +{ + Mutex::Locker l(pre_publish_lock); + + // service map is always newer/newest + assert(from_epoch <= next_osdmap->get_epoch()); + + if (next_osdmap->is_down(peer) || + next_osdmap->get_info(peer).up_from > from_epoch) { + return NULL; + } + ConnectionRef ret( + osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer))); + ret->put(); // Ref from get_connection + return ret; +} + +ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) +{ + Mutex::Locker l(pre_publish_lock); + + // service map is always newer/newest + assert(from_epoch <= next_osdmap->get_epoch()); + + if (next_osdmap->is_down(peer) || + next_osdmap->get_info(peer).up_from > from_epoch) { + return NULL; + } + ConnectionRef ret( + osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer))); + ret->put(); // Ref from get_connection + return ret; +} + void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want) { Mutex::Locker l(pg_temp_lock); @@ -2960,25 +3029,21 @@ bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch, } -void OSD::_share_map_outgoing(const entity_inst_t& inst, - OSDMapRef map) +void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map) { if (!map) map = service.get_osdmap(); - assert(inst.name.is_osd()); - - int peer = inst.name.num(); // send map? epoch_t pe = get_peer_epoch(peer); if (pe) { if (pe < map->get_epoch()) { - send_incremental_map(pe, inst); + send_incremental_map(pe, con); note_peer_epoch(peer, map->get_epoch()); } else - dout(20) << "_share_map_outgoing " << inst << " already has epoch " << pe << dendl; + dout(20) << "_share_map_outgoing " << con << " already has epoch " << pe << dendl; } else { - dout(20) << "_share_map_outgoing " << inst << " don't know epoch, doing nothing" << dendl; + dout(20) << "_share_map_outgoing " << con << " don't know epoch, doing nothing" << dendl; // no idea about peer's epoch. // ??? send recent ??? // do nothing. @@ -3625,14 +3690,19 @@ void OSD::handle_osd_map(MOSDMap *m) OSDMapRef newmap = get_map(cur); assert(newmap); // we just cached it above! + // start blacklisting messages sent to peers that go down. + service.pre_publish_map(newmap); + // kill connections to newly down osds set<int> old; osdmap->get_all_osds(old); - for (set<int>::iterator p = old.begin(); p != old.end(); p++) + for (set<int>::iterator p = old.begin(); p != old.end(); p++) { if (*p != whoami && osdmap->have_inst(*p) && // in old map - (!newmap->exists(*p) || !newmap->is_up(*p))) // but not the new one + (!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one note_down_osd(*p); + } + } osdmap = newmap; @@ -3923,6 +3993,7 @@ void OSD::activate_map() } to_remove.clear(); + service.pre_publish_map(osdmap); service.publish_map(osdmap); // scan pg's @@ -4004,6 +4075,14 @@ void OSD::send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy) msgr->send_message(m, inst); } +void OSD::send_map(MOSDMap *m, Connection *con) +{ + Messenger *msgr = client_messenger; + if (entity_name_t::TYPE_OSD == con->get_peer_type()) + msgr = cluster_messenger; + msgr->send_message(m, con); +} + void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy) { dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() @@ -4030,6 +4109,32 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la } } +void OSD::send_incremental_map(epoch_t since, Connection *con) +{ + dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() + << " to " << con << " " << con->get_peer_addr() << dendl; + + if (since < superblock.oldest_map) { + // just send latest full map + MOSDMap *m = new MOSDMap(monc->get_fsid()); + m->oldest_map = superblock.oldest_map; + m->newest_map = superblock.newest_map; + epoch_t e = osdmap->get_epoch(); + get_map_bl(e, m->maps[e]); + send_map(m, con); + return; + } + + while (since < osdmap->get_epoch()) { + epoch_t to = osdmap->get_epoch(); + if (to - since > (epoch_t)g_conf->osd_map_message_max) + to = since + g_conf->osd_map_message_max; + MOSDMap *m = build_incremental_map_msg(since, to); + send_map(m, con); + since = to; + } +} + bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl) { bool found = map_bl_cache.lookup(e, &bl); @@ -4583,15 +4688,16 @@ void OSD::do_notifies( } if (!curmap->is_up(it->first)) continue; - Connection *con = - cluster_messenger->get_connection(curmap->get_cluster_inst(it->first)); - _share_map_outgoing(curmap->get_cluster_inst(it->first), curmap); + ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch()); + if (!con) + continue; + _share_map_outgoing(it->first, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_notify osd." << it->first << " on " << it->second.size() << " PGs" << dendl; MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(), it->second); - cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first)); + cluster_messenger->send_message(m, con.get()); } else { dout(7) << "do_notify osd." << it->first << " sending seperate messages" << dendl; @@ -4603,7 +4709,7 @@ void OSD::do_notifies( list[0] = *i; MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent, list); - cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first)); + cluster_messenger->send_message(m, con.get()); } } } @@ -4622,14 +4728,15 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map, if (!curmap->is_up(pit->first)) continue; int who = pit->first; - Connection *con = - cluster_messenger->get_connection(curmap->get_cluster_inst(pit->first)); - _share_map_outgoing(curmap->get_cluster_inst(who), curmap); + ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch()); + if (!con) + continue; + _share_map_outgoing(who, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_queries querying osd." << who << " on " << pit->second.size() << " PGs" << dendl; MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second); - cluster_messenger->send_message(m, curmap->get_cluster_inst(who)); + cluster_messenger->send_message(m, con.get()); } else { dout(7) << "do_queries querying osd." << who << " sending seperate messages " @@ -4640,7 +4747,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map, map<pg_t, pg_query_t> to_send; to_send.insert(*i); MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send); - cluster_messenger->send_message(m, curmap->get_cluster_inst(who)); + cluster_messenger->send_message(m, con.get()); } } } @@ -4660,13 +4767,14 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info ++i) { dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl; } - Connection *con = - cluster_messenger->get_connection(curmap->get_cluster_inst(p->first)); - _share_map_outgoing(curmap->get_cluster_inst(p->first), curmap); + ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch()); + if (!con) + continue; + _share_map_outgoing(p->first, con.get(), curmap); if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch()); m->pg_list = p->second; - cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first)); + cluster_messenger->send_message(m, con.get()); } else { for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i = p->second.begin(); @@ -4676,7 +4784,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info to_send[0] = *i; MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent); m->pg_list = to_send; - cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first)); + cluster_messenger->send_message(m, con.get()); } } } @@ -5047,11 +5155,13 @@ void OSD::handle_pg_query(OpRequestRef op) pg_info_t empty(pgid); if (it->second.type == pg_query_t::LOG || it->second.type == pg_query_t::FULLLOG) { - MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty, - it->second.epoch_sent); - _share_map_outgoing(osdmap->get_cluster_inst(from)); - cluster_messenger->send_message(mlog, - osdmap->get_cluster_inst(from)); + ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch()); + if (con) { + MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty, + it->second.epoch_sent); + _share_map_outgoing(from, con.get(), osdmap); + cluster_messenger->send_message(mlog, con.get()); + } } else { notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent, osdmap->get_epoch(), diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ab6e3ac004b..12d50c6f9c4 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -168,8 +168,10 @@ public: ObjectStore *&store; LogClient &clog; PGRecoveryStats &pg_recovery_stats; +private: Messenger *&cluster_messenger; Messenger *&client_messenger; +public: PerfCounters *&logger; MonClient *&monc; ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq; @@ -182,7 +184,7 @@ public: ClassHandler *&class_handler; // -- superblock -- - Mutex publish_lock; + Mutex publish_lock, pre_publish_lock; OSDSuperblock superblock; OSDSuperblock get_superblock() { Mutex::Locker l(publish_lock); @@ -192,6 +194,9 @@ public: Mutex::Locker l(publish_lock); superblock = block; } + + int get_nodeid() const { return whoami; } + OSDMapRef osdmap; OSDMapRef get_osdmap() { Mutex::Locker l(publish_lock); @@ -202,7 +207,37 @@ public: osdmap = map; } - int get_nodeid() const { return whoami; } + /* + * osdmap - current published amp + * next_osdmap - pre_published map that is about to be published. + * + * We use the next_osdmap to send messages and initiate connections, + * but only if the target is the same instance as the one in the map + * epoch the current user is working from (i.e., the result is + * equivalent to what is in next_osdmap). + * + * This allows the helpers to start ignoring osds that are about to + * go down, and let OSD::handle_osd_map()/note_down_osd() mark them + * down, without worrying about reopening connections from threads + * working from old maps. + */ + OSDMapRef next_osdmap; + void pre_publish_map(OSDMapRef map) { + Mutex::Locker l(pre_publish_lock); + next_osdmap = map; + } + ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); + ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch); + void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); + void send_message_osd_cluster(Message *m, Connection *con) { + cluster_messenger->send_message(m, con); + } + void send_message_osd_client(Message *m, Connection *con) { + client_messenger->send_message(m, con); + } + entity_name_t get_cluster_msgr_name() { + return cluster_messenger->get_myname(); + } // -- scrub scheduling -- Mutex sched_scrub_lock; @@ -467,7 +502,7 @@ private: // -- heartbeat -- /// information about a heartbeat peer struct HeartbeatInfo { - entity_inst_t inst; ///< peer + int peer; ///< peer Connection *con; ///< peer connection utime_t first_tx; ///< time we sent our first ping request utime_t last_tx; ///< last time we sent a ping request @@ -699,7 +734,7 @@ private: bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch, Session *session = 0); - void _share_map_outgoing(const entity_inst_t& inst, + void _share_map_outgoing(int peer, Connection *con, OSDMapRef map = OSDMapRef()); void wait_for_new_map(OpRequestRef op); @@ -742,7 +777,9 @@ private: MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to); void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false); + void send_incremental_map(epoch_t since, Connection *con); void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy); + void send_map(MOSDMap *m, Connection *con); protected: // -- placement groups -- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index b81bfed36f4..617ba9e250f 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1343,11 +1343,10 @@ void PG::build_might_have_unfound() struct C_PG_ActivateCommitted : public Context { PG *pg; epoch_t epoch; - entity_inst_t primary; - C_PG_ActivateCommitted(PG *p, epoch_t e, const entity_inst_t &pi) - : pg(p), epoch(e), primary(pi) {} + C_PG_ActivateCommitted(PG *p, epoch_t e) + : pg(p), epoch(e) {} void finish(int r) { - pg->_activate_committed(epoch, primary); + pg->_activate_committed(epoch); } }; @@ -1411,8 +1410,7 @@ void PG::activate(ObjectStore::Transaction& t, // find out when we commit get(); // for callback - tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch, - get_osdmap()->get_cluster_inst(acting[0]))); + tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch)); // initialize snap_trimq if (is_primary()) { @@ -1542,7 +1540,7 @@ void PG::activate(ObjectStore::Transaction& t, if (m) { dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl; //m->log.print(cout); - osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch()); } // peer now has @@ -1731,7 +1729,7 @@ void PG::replay_queued_ops() update_stats(); } -void PG::_activate_committed(epoch_t e, entity_inst_t& primary) +void PG::_activate_committed(epoch_t e) { lock(); if (e < last_peering_reset) { @@ -1751,7 +1749,7 @@ void PG::_activate_committed(epoch_t e, entity_inst_t& primary) info); i.info.history.last_epoch_started = e; m->pg_list.push_back(make_pair(i, pg_interval_map_t())); - osd->cluster_messenger->send_message(m, primary); + osd->send_message_osd_cluster(acting[0], m, get_osdmap()->get_epoch()); } if (dirty_info) { @@ -1959,8 +1957,7 @@ void PG::purge_strays() MOSDPGRemove *m = new MOSDPGRemove( get_osdmap()->get_epoch(), to_remove); - osd->cluster_messenger->send_message( - m, get_osdmap()->get_cluster_inst(*p)); + osd->send_message_osd_cluster(*p, m, get_osdmap()->get_epoch()); stray_purged.insert(*p); } else { dout(10) << "not sending PGRemove to down osd." << *p << dendl; @@ -2273,9 +2270,10 @@ void PG::trim_peers() dout(10) << "trim_peers " << pg_trim_to << dendl; if (pg_trim_to != eversion_t()) { for (unsigned i=1; i<acting.size(); i++) - osd->cluster_messenger->send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid, - pg_trim_to), - get_osdmap()->get_cluster_inst(acting[i])); + osd->send_message_osd_cluster(acting[i], + new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid, + pg_trim_to), + get_osdmap()->get_epoch()); } } @@ -3001,8 +2999,7 @@ void PG::_request_scrub_map_classic(int replica, eversion_t version) MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version, last_update_applied, get_osdmap()->get_epoch()); - osd->cluster_messenger->send_message(repscrubop, - get_osdmap()->get_cluster_inst(replica)); + osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch()); } // send scrub v3 messages (chunky scrub) @@ -3015,8 +3012,7 @@ void PG::_request_scrub_map(int replica, eversion_t version, MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version, get_osdmap()->get_epoch(), start, end, deep); - osd->cluster_messenger->send_message(repscrubop, - get_osdmap()->get_cluster_inst(replica)); + osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch()); } void PG::sub_op_scrub_reserve(OpRequestRef op) @@ -3036,7 +3032,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op) MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ::encode(scrubber.reserved, reply->get_data()); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } void PG::sub_op_scrub_reserve_reply(OpRequestRef op) @@ -3094,7 +3090,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op) scrubber.reserved = false; MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } void PG::clear_scrub_reserved() @@ -3121,7 +3117,7 @@ void PG::scrub_reserve_replicas() MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0, get_osdmap()->get_epoch(), osd->get_tid(), v); subop->ops = scrub; - osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i])); + osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch()); } } @@ -3137,7 +3133,7 @@ void PG::scrub_unreserve_replicas() MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0, get_osdmap()->get_epoch(), osd->get_tid(), v); subop->ops = scrub; - osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i])); + osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch()); } } @@ -3361,7 +3357,7 @@ void PG::replica_scrub(MOSDRepScrub *msg) ::encode(map, subop->get_data()); subop->ops = scrub; - osd->cluster_messenger->send_message(subop, msg->get_connection()); + osd->send_message_osd_cluster(subop, msg->get_connection()); } /* Scrub: @@ -3390,7 +3386,9 @@ void PG::scrub() OSDMapRef curmap = osd->get_osdmap(); scrubber.is_chunky = true; for (unsigned i=1; i<acting.size(); i++) { - Connection *con = osd->cluster_messenger->get_connection(curmap->get_cluster_inst(acting[i])); + ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch()); + if (!con) + continue; if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) { dout(20) << "OSD " << acting[i] << " does not support chunky scrubs, falling back to classic" @@ -4149,7 +4147,7 @@ void PG::share_pg_info() get_osdmap()->get_epoch(), info), pg_interval_map_t())); - osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch()); } } @@ -4181,7 +4179,7 @@ void PG::share_pg_log() } pinfo.last_update = m->log.head; - osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch()); } } @@ -4233,10 +4231,13 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; - osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from), - get_osdmap()); - osd->cluster_messenger->send_message(mlog, - get_osdmap()->get_cluster_inst(from)); + ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch()); + if (con) { + osd->osd->_share_map_outgoing(from, con.get(), get_osdmap()); + osd->send_message_osd_cluster(mlog, con.get()); + } else { + mlog->put(); + } } @@ -5255,18 +5256,19 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con context< RecoveryMachine >().log_enter(state_name); PG *pg = context< RecoveryMachine >().pg; pg->state_set(PG_STATE_BACKFILL_WAIT); - Connection *con = - pg->osd->cluster_messenger->get_connection( - pg->get_osdmap()->get_cluster_inst(pg->backfill_target)); - if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) { - pg->osd->cluster_messenger->send_message( - new MBackfillReserve( - MBackfillReserve::REQUEST, - pg->info.pgid, - pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(pg->backfill_target)); - } else { - post_event(RemoteBackfillReserved()); + ConnectionRef con = pg->osd->get_con_osd_cluster( + pg->backfill_target, pg->get_osdmap()->get_epoch()); + if (con) { + if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) { + pg->osd->send_message_osd_cluster( + new MBackfillReserve( + MBackfillReserve::REQUEST, + pg->info.pgid, + pg->get_osdmap()->get_epoch()), + con.get()); + } else { + post_event(RemoteBackfillReserved()); + } } } @@ -5366,12 +5368,13 @@ boost::statechart::result PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt) { PG *pg = context< RecoveryMachine >().pg; - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( + pg->acting[0], new MRecoveryReserve( MRecoveryReserve::GRANT, pg->info.pgid, pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(pg->acting[0])); + pg->get_osdmap()->get_epoch()); return transit<RepRecovering>(); } @@ -5413,12 +5416,13 @@ boost::statechart::result PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt) { PG *pg = context< RecoveryMachine >().pg; - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( + pg->acting[0], new MBackfillReserve( MBackfillReserve::GRANT, pg->info.pgid, pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(pg->acting[0])); + pg->get_osdmap()->get_epoch()); return transit<RepRecovering>(); } @@ -5426,12 +5430,13 @@ boost::statechart::result PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt) { PG *pg = context< RecoveryMachine >().pg; - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( + pg->acting[0], new MBackfillReserve( MBackfillReserve::REJECT, pg->info.pgid, pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(pg->acting[0])); + pg->get_osdmap()->get_epoch()); return transit<RepNotRecovering>(); } @@ -5498,18 +5503,17 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con } if (acting_osd_it != context< Active >().sorted_acting_set.end()) { - Connection *con = - pg->osd->cluster_messenger->get_connection( - pg->get_osdmap()->get_cluster_inst(*acting_osd_it)); - if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { - pg->osd->cluster_messenger->send_message( - new MRecoveryReserve( - MRecoveryReserve::REQUEST, - pg->info.pgid, - pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(*acting_osd_it)); - } else { - post_event(RemoteRecoveryReserved()); + ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch()); + if (con) { + if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { + pg->osd->send_message_osd_cluster( + new MRecoveryReserve(MRecoveryReserve::REQUEST, + pg->info.pgid, + pg->get_osdmap()->get_epoch()), + con.get()); + } else { + post_event(RemoteRecoveryReserved()); + } } ++acting_osd_it; } else { @@ -5546,16 +5550,15 @@ void PG::RecoveryState::Recovering::release_reservations() ++i) { if (*i == pg->osd->whoami) // skip myself continue; - Connection *con = - pg->osd->cluster_messenger->get_connection( - pg->get_osdmap()->get_cluster_inst(*i)); - if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { - pg->osd->cluster_messenger->send_message( - new MRecoveryReserve( - MRecoveryReserve::RELEASE, - pg->info.pgid, - pg->get_osdmap()->get_epoch()), - pg->get_osdmap()->get_cluster_inst(*i)); + ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); + if (con) { + if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { + pg->osd->send_message_osd_cluster( + new MRecoveryReserve(MRecoveryReserve::RELEASE, + pg->info.pgid, + pg->get_osdmap()->get_epoch()), + con.get()); + } } } } diff --git a/src/osd/PG.h b/src/osd/PG.h index 292d8c02d9b..b9693fb072a 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -757,7 +757,7 @@ public: list<Context*>& tfin, map< int, map<pg_t,pg_query_t> >& query_map, map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map=0); - void _activate_committed(epoch_t e, entity_inst_t& primary); + void _activate_committed(epoch_t e); void all_activated_and_committed(); void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 388751e8e8b..3ad597531d1 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -557,7 +557,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); reply->set_data(outdata); reply->set_result(result); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); delete filter; } @@ -872,7 +872,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (already_ack(oldv)) { MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); } else { dout(10) << " waiting for " << oldv << " to ack" << dendl; waiting_for_ack[oldv].push_back(op); @@ -981,7 +981,7 @@ void ReplicatedPG::do_op(OpRequestRef op) MOSDOpReply *reply = ctx->reply; ctx->reply = NULL; reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); delete ctx; put_object_context(obc); put_object_contexts(src_obc); @@ -1179,7 +1179,7 @@ void ReplicatedPG::do_scan(OpRequestRef op) get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); ::encode(bi.objects, reply->get_data()); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } break; @@ -1238,7 +1238,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op) get_osdmap()->get_epoch(), m->query_epoch, info.pgid); reply->set_priority(g_conf->osd_recovery_op_priority); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); queue_peering_event( CephPeeringEvtRef( new CephPeeringEvt( @@ -1340,7 +1340,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); - osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this); ctx->mtime = ceph_clock_now(g_ceph_context); @@ -3367,7 +3367,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) /* there is a pending notification for this watcher, we should resend it anyway even if we already sent it as it might not have received it */ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->client_messenger->send_message(notify_msg, session->con); + osd->send_message_osd_client(notify_msg, session->con); } } } @@ -3423,7 +3423,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) s->add_notif(notif, name); MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->client_messenger->send_message(notify_msg, s->con); + osd->send_message_osd_client(notify_msg, s->con); } else { // unconnected entity_name_t name = i->first; @@ -3841,7 +3841,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); dout(10) << " sending commit on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); repop->sent_disk = true; } } @@ -3858,7 +3858,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) MOSDOp *m = (MOSDOp*)(*i)->request; MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); } waiting_for_ack.erase(repop->v); } @@ -3873,7 +3873,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) reply->add_flags(CEPH_OSD_FLAG_ACK); dout(10) << " sending ack on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); repop->sent_ack = true; } @@ -3975,7 +3975,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, } wr->pg_trim_to = pg_trim_to; - osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch()); // keep peer_info up to date if (pinfo.last_complete == pinfo.last_update) @@ -4142,7 +4142,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); - osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); ctx->mtime = ceph_clock_now(g_ceph_context); @@ -4624,7 +4624,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) // send ack to acker only if we haven't sent a commit already MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd)); + osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch()); } rm->applied = true; @@ -4670,7 +4670,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); commit->set_last_complete_ondisk(rm->last_complete); commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd)); + osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); } rm->committed = true; @@ -4965,7 +4965,7 @@ int ReplicatedPG::pull( void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) { tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); dout(10) << "send_remove_op " << oid << " from osd." << peer << " tid " << tid << dendl; @@ -4975,7 +4975,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) subop->ops = vector<OSDOp>(1); subop->ops[0].op.op = CEPH_OSD_OP_DELETE; - osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); } /* @@ -5093,7 +5093,7 @@ int ReplicatedPG::send_pull(int prio, int peer, { // send op tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); dout(10) << "send_pull_op " << recovery_info.soid << " " << recovery_info.version @@ -5112,8 +5112,7 @@ int ReplicatedPG::send_pull(int prio, int peer, subop->recovery_info = recovery_info; subop->recovery_progress = progress; - osd->cluster_messenger->send_message(subop, - get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); osd->logger->inc(l_osd_pull); return 0; @@ -5406,7 +5405,7 @@ void ReplicatedPG::handle_push(OpRequestRef op) MOSDSubOpReply *reply = new MOSDSubOpReply( m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } int ReplicatedPG::send_push(int prio, int peer, @@ -5417,7 +5416,7 @@ int ReplicatedPG::send_push(int prio, int peer, ObjectRecoveryProgress new_progress = progress; tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, false, 0, get_osdmap()->get_epoch(), tid, recovery_info.version); @@ -5507,8 +5506,7 @@ int ReplicatedPG::send_push(int prio, int peer, subop->recovery_info = recovery_info; subop->recovery_progress = new_progress; subop->current_progress = progress; - osd->cluster_messenger-> - send_message(subop, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); if (out_progress) *out_progress = new_progress; return 0; @@ -5518,14 +5516,14 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer) { // send a blank push back to the primary tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, get_osdmap()->get_epoch(), tid, eversion_t()); subop->ops = vector<OSDOp>(1); subop->ops[0].op.op = CEPH_OSD_OP_PUSH; subop->first = false; subop->complete = false; - osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer)); + osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); } void ReplicatedPG::sub_op_push_reply(OpRequestRef op) @@ -5664,10 +5662,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since, if (last_complete_ondisk == info.last_update) { if (is_replica()) { // we are fully up to date. tell the primary! - osd->cluster_messenger-> - send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid, - last_complete_ondisk), - get_osdmap()->get_cluster_inst(get_primary())); + osd->send_message_osd_cluster(get_primary(), + new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid, + last_complete_ondisk), + get_osdmap()->get_epoch()); // adjust local snaps! adjust_local_snaps(); @@ -6713,7 +6711,7 @@ int ReplicatedPG::recover_backfill(int max) epoch_t e = get_osdmap()->get_epoch(); MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid, pbi.end, hobject_t()); - osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target)); + osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch()); waiting_on_backfill = true; start_recovery_op(pbi.end); ops++; @@ -6828,7 +6826,7 @@ int ReplicatedPG::recover_backfill(int max) } m->last_backfill = bound; m->stats = pinfo.stats.stats; - osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target)); + osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch()); } dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects |