summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-30 12:12:23 -0800
committerSage Weil <sage@inktank.com>2012-11-30 12:12:23 -0800
commit246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd (patch)
treecd0f74dc59ddca9a57b72d2456c5b7602cafeffe
parent7412bd3675e3e941eeaefeb7d48b32007751357a (diff)
parenta928b6dbf630b63108aa7805adf9601253d40397 (diff)
downloadceph-246eb7b2af4f4dd071bc1dbe48b7cd02897b55cd.tar.gz
Merge remote-tracking branch 'gh/wip-osd-msgr'
-rw-r--r--src/msg/DispatchQueue.h3
-rw-r--r--src/msg/Message.h6
-rw-r--r--src/osd/OSD.cc184
-rw-r--r--src/osd/OSD.h45
-rw-r--r--src/osd/PG.cc143
-rw-r--r--src/osd/PG.h2
-rw-r--r--src/osd/ReplicatedPG.cc62
7 files changed, 298 insertions, 147 deletions
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
index ad4584829d1..ea44c165d56 100644
--- a/src/msg/DispatchQueue.h
+++ b/src/msg/DispatchQueue.h
@@ -33,9 +33,6 @@ class SimpleMessenger;
class Message;
class Connection;
-typedef boost::intrusive_ptr<Connection> ConnectionRef;
-typedef boost::intrusive_ptr<Message> MessageRef;
-
/**
* The DispatchQueue contains all the Pipes which have Messages
* they want to be dispatched, carefully organized by Message priority
diff --git a/src/msg/Message.h b/src/msg/Message.h
index b6a113f771f..fc434ed9b85 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -18,6 +18,10 @@
#include <stdlib.h>
#include <ostream>
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
#include "include/types.h"
#include "include/buffer.h"
#include "common/Throttle.h"
@@ -261,6 +265,7 @@ public:
rx_buffers.erase(tid);
}
};
+typedef boost::intrusive_ptr<Connection> ConnectionRef;
@@ -466,6 +471,7 @@ public:
void encode(uint64_t features, bool datacrc);
};
+typedef boost::intrusive_ptr<Message> MessageRef;
extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
ceph_msg_footer& footer, bufferlist& front,
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 913157a8508..7297548ebe7 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -158,6 +158,7 @@ OSDService::OSDService(OSD *osd) :
rep_scrub_wq(osd->rep_scrub_wq),
class_handler(osd->class_handler),
publish_lock("OSDService::publish_lock"),
+ pre_publish_lock("OSDService::pre_publish_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
watch_lock("OSD::watch_lock"),
@@ -1773,9 +1774,13 @@ void OSD::_add_heartbeat_peer(int p)
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
+ ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
+ if (!con)
+ return;
hi = &heartbeat_peers[p];
- hi->inst = osdmap->get_hb_inst(p);
- hi->con = hbclient_messenger->get_connection(hi->inst);
+ hi->con = con.get();
+ hi->con->get();
+ hi->peer = p;
hi->con->set_priv(new HeartbeatSession(p));
dout(10) << "_add_heartbeat_peer: new peer osd." << p
<< " " << hi->con->get_peer_addr() << dendl;
@@ -1901,8 +1906,12 @@ void OSD::handle_osd_ping(MOSDPing *m)
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
- if (is_active())
- _share_map_outgoing(curmap->get_cluster_inst(from));
+ if (is_active()) {
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ if (con) {
+ _share_map_outgoing(from, con.get());
+ }
+ }
}
}
break;
@@ -1922,8 +1931,12 @@ void OSD::handle_osd_ping(MOSDPing *m)
if (m->map_epoch &&
curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
- if (is_active())
- _share_map_outgoing(curmap->get_cluster_inst(from));
+ if (is_active()) {
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ if (con) {
+ _share_map_outgoing(from, con.get());
+ }
+ }
}
// Cancel false reports
@@ -2066,8 +2079,15 @@ bool OSD::heartbeat_reset(Connection *con)
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
p->second.con == con) {
+ ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ if (!newcon) {
+ dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
+ s->put();
+ return true;
+ }
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = hbclient_messenger->get_connection(p->second.inst);
+ p->second.con = newcon.get();
+ p->second.con->get();
p->second.con->set_priv(s);
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
@@ -2468,6 +2488,55 @@ void OSD::send_alive()
}
}
+void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+{
+ Mutex::Locker l(pre_publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ m->put();
+ return;
+ }
+ osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer));
+}
+
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+{
+ Mutex::Locker l(pre_publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ return NULL;
+ }
+ ConnectionRef ret(
+ osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
+}
+
+ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+{
+ Mutex::Locker l(pre_publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ return NULL;
+ }
+ ConnectionRef ret(
+ osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
+}
+
void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
{
Mutex::Locker l(pg_temp_lock);
@@ -2960,25 +3029,21 @@ bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
}
-void OSD::_share_map_outgoing(const entity_inst_t& inst,
- OSDMapRef map)
+void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
{
if (!map)
map = service.get_osdmap();
- assert(inst.name.is_osd());
-
- int peer = inst.name.num();
// send map?
epoch_t pe = get_peer_epoch(peer);
if (pe) {
if (pe < map->get_epoch()) {
- send_incremental_map(pe, inst);
+ send_incremental_map(pe, con);
note_peer_epoch(peer, map->get_epoch());
} else
- dout(20) << "_share_map_outgoing " << inst << " already has epoch " << pe << dendl;
+ dout(20) << "_share_map_outgoing " << con << " already has epoch " << pe << dendl;
} else {
- dout(20) << "_share_map_outgoing " << inst << " don't know epoch, doing nothing" << dendl;
+ dout(20) << "_share_map_outgoing " << con << " don't know epoch, doing nothing" << dendl;
// no idea about peer's epoch.
// ??? send recent ???
// do nothing.
@@ -3625,14 +3690,19 @@ void OSD::handle_osd_map(MOSDMap *m)
OSDMapRef newmap = get_map(cur);
assert(newmap); // we just cached it above!
+ // start blacklisting messages sent to peers that go down.
+ service.pre_publish_map(newmap);
+
// kill connections to newly down osds
set<int> old;
osdmap->get_all_osds(old);
- for (set<int>::iterator p = old.begin(); p != old.end(); p++)
+ for (set<int>::iterator p = old.begin(); p != old.end(); p++) {
if (*p != whoami &&
osdmap->have_inst(*p) && // in old map
- (!newmap->exists(*p) || !newmap->is_up(*p))) // but not the new one
+ (!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one
note_down_osd(*p);
+ }
+ }
osdmap = newmap;
@@ -3923,6 +3993,7 @@ void OSD::activate_map()
}
to_remove.clear();
+ service.pre_publish_map(osdmap);
service.publish_map(osdmap);
// scan pg's
@@ -4004,6 +4075,14 @@ void OSD::send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy)
msgr->send_message(m, inst);
}
+void OSD::send_map(MOSDMap *m, Connection *con)
+{
+ Messenger *msgr = client_messenger;
+ if (entity_name_t::TYPE_OSD == con->get_peer_type())
+ msgr = cluster_messenger;
+ msgr->send_message(m, con);
+}
+
void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
{
dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
@@ -4030,6 +4109,32 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la
}
}
+void OSD::send_incremental_map(epoch_t since, Connection *con)
+{
+ dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
+ << " to " << con << " " << con->get_peer_addr() << dendl;
+
+ if (since < superblock.oldest_map) {
+ // just send latest full map
+ MOSDMap *m = new MOSDMap(monc->get_fsid());
+ m->oldest_map = superblock.oldest_map;
+ m->newest_map = superblock.newest_map;
+ epoch_t e = osdmap->get_epoch();
+ get_map_bl(e, m->maps[e]);
+ send_map(m, con);
+ return;
+ }
+
+ while (since < osdmap->get_epoch()) {
+ epoch_t to = osdmap->get_epoch();
+ if (to - since > (epoch_t)g_conf->osd_map_message_max)
+ to = since + g_conf->osd_map_message_max;
+ MOSDMap *m = build_incremental_map_msg(since, to);
+ send_map(m, con);
+ since = to;
+ }
+}
+
bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
{
bool found = map_bl_cache.lookup(e, &bl);
@@ -4583,15 +4688,16 @@ void OSD::do_notifies(
}
if (!curmap->is_up(it->first))
continue;
- Connection *con =
- cluster_messenger->get_connection(curmap->get_cluster_inst(it->first));
- _share_map_outgoing(curmap->get_cluster_inst(it->first), curmap);
+ ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
+ if (!con)
+ continue;
+ _share_map_outgoing(it->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_notify osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_notify osd." << it->first
<< " sending seperate messages" << dendl;
@@ -4603,7 +4709,7 @@ void OSD::do_notifies(
list[0] = *i;
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
list);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ cluster_messenger->send_message(m, con.get());
}
}
}
@@ -4622,14 +4728,15 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
if (!curmap->is_up(pit->first))
continue;
int who = pit->first;
- Connection *con =
- cluster_messenger->get_connection(curmap->get_cluster_inst(pit->first));
- _share_map_outgoing(curmap->get_cluster_inst(who), curmap);
+ ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
+ if (!con)
+ continue;
+ _share_map_outgoing(who, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_queries querying osd." << who
<< " sending seperate messages "
@@ -4640,7 +4747,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
map<pg_t, pg_query_t> to_send;
to_send.insert(*i);
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ cluster_messenger->send_message(m, con.get());
}
}
}
@@ -4660,13 +4767,14 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
++i) {
dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl;
}
- Connection *con =
- cluster_messenger->get_connection(curmap->get_cluster_inst(p->first));
- _share_map_outgoing(curmap->get_cluster_inst(p->first), curmap);
+ ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
+ if (!con)
+ continue;
+ _share_map_outgoing(p->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ cluster_messenger->send_message(m, con.get());
} else {
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
p->second.begin();
@@ -4676,7 +4784,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
to_send[0] = *i;
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
m->pg_list = to_send;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ cluster_messenger->send_message(m, con.get());
}
}
}
@@ -5047,11 +5155,13 @@ void OSD::handle_pg_query(OpRequestRef op)
pg_info_t empty(pgid);
if (it->second.type == pg_query_t::LOG ||
it->second.type == pg_query_t::FULLLOG) {
- MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
- it->second.epoch_sent);
- _share_map_outgoing(osdmap->get_cluster_inst(from));
- cluster_messenger->send_message(mlog,
- osdmap->get_cluster_inst(from));
+ ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
+ if (con) {
+ MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
+ it->second.epoch_sent);
+ _share_map_outgoing(from, con.get(), osdmap);
+ cluster_messenger->send_message(mlog, con.get());
+ }
} else {
notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
osdmap->get_epoch(),
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index ab6e3ac004b..12d50c6f9c4 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -168,8 +168,10 @@ public:
ObjectStore *&store;
LogClient &clog;
PGRecoveryStats &pg_recovery_stats;
+private:
Messenger *&cluster_messenger;
Messenger *&client_messenger;
+public:
PerfCounters *&logger;
MonClient *&monc;
ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
@@ -182,7 +184,7 @@ public:
ClassHandler *&class_handler;
// -- superblock --
- Mutex publish_lock;
+ Mutex publish_lock, pre_publish_lock;
OSDSuperblock superblock;
OSDSuperblock get_superblock() {
Mutex::Locker l(publish_lock);
@@ -192,6 +194,9 @@ public:
Mutex::Locker l(publish_lock);
superblock = block;
}
+
+ int get_nodeid() const { return whoami; }
+
OSDMapRef osdmap;
OSDMapRef get_osdmap() {
Mutex::Locker l(publish_lock);
@@ -202,7 +207,37 @@ public:
osdmap = map;
}
- int get_nodeid() const { return whoami; }
+ /*
+ * osdmap - current published amp
+ * next_osdmap - pre_published map that is about to be published.
+ *
+ * We use the next_osdmap to send messages and initiate connections,
+ * but only if the target is the same instance as the one in the map
+ * epoch the current user is working from (i.e., the result is
+ * equivalent to what is in next_osdmap).
+ *
+ * This allows the helpers to start ignoring osds that are about to
+ * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
+ * down, without worrying about reopening connections from threads
+ * working from old maps.
+ */
+ OSDMapRef next_osdmap;
+ void pre_publish_map(OSDMapRef map) {
+ Mutex::Locker l(pre_publish_lock);
+ next_osdmap = map;
+ }
+ ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+ ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
+ void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+ void send_message_osd_cluster(Message *m, Connection *con) {
+ cluster_messenger->send_message(m, con);
+ }
+ void send_message_osd_client(Message *m, Connection *con) {
+ client_messenger->send_message(m, con);
+ }
+ entity_name_t get_cluster_msgr_name() {
+ return cluster_messenger->get_myname();
+ }
// -- scrub scheduling --
Mutex sched_scrub_lock;
@@ -467,7 +502,7 @@ private:
// -- heartbeat --
/// information about a heartbeat peer
struct HeartbeatInfo {
- entity_inst_t inst; ///< peer
+ int peer; ///< peer
Connection *con; ///< peer connection
utime_t first_tx; ///< time we sent our first ping request
utime_t last_tx; ///< last time we sent a ping request
@@ -699,7 +734,7 @@ private:
bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
Session *session = 0);
- void _share_map_outgoing(const entity_inst_t& inst,
+ void _share_map_outgoing(int peer, Connection *con,
OSDMapRef map = OSDMapRef());
void wait_for_new_map(OpRequestRef op);
@@ -742,7 +777,9 @@ private:
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to);
void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
+ void send_incremental_map(epoch_t since, Connection *con);
void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy);
+ void send_map(MOSDMap *m, Connection *con);
protected:
// -- placement groups --
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index b81bfed36f4..617ba9e250f 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1343,11 +1343,10 @@ void PG::build_might_have_unfound()
struct C_PG_ActivateCommitted : public Context {
PG *pg;
epoch_t epoch;
- entity_inst_t primary;
- C_PG_ActivateCommitted(PG *p, epoch_t e, const entity_inst_t &pi)
- : pg(p), epoch(e), primary(pi) {}
+ C_PG_ActivateCommitted(PG *p, epoch_t e)
+ : pg(p), epoch(e) {}
void finish(int r) {
- pg->_activate_committed(epoch, primary);
+ pg->_activate_committed(epoch);
}
};
@@ -1411,8 +1410,7 @@ void PG::activate(ObjectStore::Transaction& t,
// find out when we commit
get(); // for callback
- tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch,
- get_osdmap()->get_cluster_inst(acting[0])));
+ tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch));
// initialize snap_trimq
if (is_primary()) {
@@ -1542,7 +1540,7 @@ void PG::activate(ObjectStore::Transaction& t,
if (m) {
dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
//m->log.print(cout);
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
// peer now has
@@ -1731,7 +1729,7 @@ void PG::replay_queued_ops()
update_stats();
}
-void PG::_activate_committed(epoch_t e, entity_inst_t& primary)
+void PG::_activate_committed(epoch_t e)
{
lock();
if (e < last_peering_reset) {
@@ -1751,7 +1749,7 @@ void PG::_activate_committed(epoch_t e, entity_inst_t& primary)
info);
i.info.history.last_epoch_started = e;
m->pg_list.push_back(make_pair(i, pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, primary);
+ osd->send_message_osd_cluster(acting[0], m, get_osdmap()->get_epoch());
}
if (dirty_info) {
@@ -1959,8 +1957,7 @@ void PG::purge_strays()
MOSDPGRemove *m = new MOSDPGRemove(
get_osdmap()->get_epoch(),
to_remove);
- osd->cluster_messenger->send_message(
- m, get_osdmap()->get_cluster_inst(*p));
+ osd->send_message_osd_cluster(*p, m, get_osdmap()->get_epoch());
stray_purged.insert(*p);
} else {
dout(10) << "not sending PGRemove to down osd." << *p << dendl;
@@ -2273,9 +2270,10 @@ void PG::trim_peers()
dout(10) << "trim_peers " << pg_trim_to << dendl;
if (pg_trim_to != eversion_t()) {
for (unsigned i=1; i<acting.size(); i++)
- osd->cluster_messenger->send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- pg_trim_to),
- get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i],
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ pg_trim_to),
+ get_osdmap()->get_epoch());
}
}
@@ -3001,8 +2999,7 @@ void PG::_request_scrub_map_classic(int replica, eversion_t version)
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
last_update_applied,
get_osdmap()->get_epoch());
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
// send scrub v3 messages (chunky scrub)
@@ -3015,8 +3012,7 @@ void PG::_request_scrub_map(int replica, eversion_t version,
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
get_osdmap()->get_epoch(),
start, end, deep);
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
void PG::sub_op_scrub_reserve(OpRequestRef op)
@@ -3036,7 +3032,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op)
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
::encode(scrubber.reserved, reply->get_data());
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
@@ -3094,7 +3090,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op)
scrubber.reserved = false;
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
void PG::clear_scrub_reserved()
@@ -3121,7 +3117,7 @@ void PG::scrub_reserve_replicas()
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
@@ -3137,7 +3133,7 @@ void PG::scrub_unreserve_replicas()
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
@@ -3361,7 +3357,7 @@ void PG::replica_scrub(MOSDRepScrub *msg)
::encode(map, subop->get_data());
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, msg->get_connection());
+ osd->send_message_osd_cluster(subop, msg->get_connection());
}
/* Scrub:
@@ -3390,7 +3386,9 @@ void PG::scrub()
OSDMapRef curmap = osd->get_osdmap();
scrubber.is_chunky = true;
for (unsigned i=1; i<acting.size(); i++) {
- Connection *con = osd->cluster_messenger->get_connection(curmap->get_cluster_inst(acting[i]));
+ ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
+ if (!con)
+ continue;
if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
dout(20) << "OSD " << acting[i]
<< " does not support chunky scrubs, falling back to classic"
@@ -4149,7 +4147,7 @@ void PG::share_pg_info()
get_osdmap()->get_epoch(),
info),
pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
@@ -4181,7 +4179,7 @@ void PG::share_pg_log()
}
pinfo.last_update = m->log.head;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
@@ -4233,10 +4231,13 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
- osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from),
- get_osdmap());
- osd->cluster_messenger->send_message(mlog,
- get_osdmap()->get_cluster_inst(from));
+ ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
+ if (con) {
+ osd->osd->_share_map_outgoing(from, con.get(), get_osdmap());
+ osd->send_message_osd_cluster(mlog, con.get());
+ } else {
+ mlog->put();
+ }
}
@@ -5255,18 +5256,19 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->state_set(PG_STATE_BACKFILL_WAIT);
- Connection *con =
- pg->osd->cluster_messenger->get_connection(
- pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
- if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
- new MBackfillReserve(
- MBackfillReserve::REQUEST,
- pg->info.pgid,
- pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
- } else {
- post_event(RemoteBackfillReserved());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(
+ pg->backfill_target, pg->get_osdmap()->get_epoch());
+ if (con) {
+ if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
+ pg->osd->send_message_osd_cluster(
+ new MBackfillReserve(
+ MBackfillReserve::REQUEST,
+ pg->info.pgid,
+ pg->get_osdmap()->get_epoch()),
+ con.get());
+ } else {
+ post_event(RemoteBackfillReserved());
+ }
}
}
@@ -5366,12 +5368,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MRecoveryReserve(
MRecoveryReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
@@ -5413,12 +5416,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
@@ -5426,12 +5430,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::REJECT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepNotRecovering>();
}
@@ -5498,18 +5503,17 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
}
if (acting_osd_it != context< Active >().sorted_acting_set.end()) {
- Connection *con =
- pg->osd->cluster_messenger->get_connection(
- pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
- if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
- new MRecoveryReserve(
- MRecoveryReserve::REQUEST,
- pg->info.pgid,
- pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
- } else {
- post_event(RemoteRecoveryReserved());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
+ if (con) {
+ if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
+ pg->osd->send_message_osd_cluster(
+ new MRecoveryReserve(MRecoveryReserve::REQUEST,
+ pg->info.pgid,
+ pg->get_osdmap()->get_epoch()),
+ con.get());
+ } else {
+ post_event(RemoteRecoveryReserved());
+ }
}
++acting_osd_it;
} else {
@@ -5546,16 +5550,15 @@ void PG::RecoveryState::Recovering::release_reservations()
++i) {
if (*i == pg->osd->whoami) // skip myself
continue;
- Connection *con =
- pg->osd->cluster_messenger->get_connection(
- pg->get_osdmap()->get_cluster_inst(*i));
- if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
- new MRecoveryReserve(
- MRecoveryReserve::RELEASE,
- pg->info.pgid,
- pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*i));
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
+ if (con) {
+ if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
+ pg->osd->send_message_osd_cluster(
+ new MRecoveryReserve(MRecoveryReserve::RELEASE,
+ pg->info.pgid,
+ pg->get_osdmap()->get_epoch()),
+ con.get());
+ }
}
}
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 292d8c02d9b..b9693fb072a 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -757,7 +757,7 @@ public:
list<Context*>& tfin,
map< int, map<pg_t,pg_query_t> >& query_map,
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map=0);
- void _activate_committed(epoch_t e, entity_inst_t& primary);
+ void _activate_committed(epoch_t e);
void all_activated_and_committed();
void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 388751e8e8b..3ad597531d1 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -557,7 +557,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
reply->set_data(outdata);
reply->set_result(result);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
delete filter;
}
@@ -872,7 +872,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (already_ack(oldv)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
} else {
dout(10) << " waiting for " << oldv << " to ack" << dendl;
waiting_for_ack[oldv].push_back(op);
@@ -981,7 +981,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
MOSDOpReply *reply = ctx->reply;
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
@@ -1179,7 +1179,7 @@ void ReplicatedPG::do_scan(OpRequestRef op)
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
::encode(bi.objects, reply->get_data());
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
break;
@@ -1238,7 +1238,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid);
reply->set_priority(g_conf->osd_recovery_op_priority);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
queue_peering_event(
CephPeeringEvtRef(
new CephPeeringEvt(
@@ -1340,7 +1340,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
@@ -3367,7 +3367,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
/* there is a pending notification for this watcher, we should resend it anyway
even if we already sent it as it might not have received it */
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->client_messenger->send_message(notify_msg, session->con);
+ osd->send_message_osd_client(notify_msg, session->con);
}
}
}
@@ -3423,7 +3423,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
s->add_notif(notif, name);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->client_messenger->send_message(notify_msg, s->con);
+ osd->send_message_osd_client(notify_msg, s->con);
} else {
// unconnected
entity_name_t name = i->first;
@@ -3841,7 +3841,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
repop->sent_disk = true;
}
}
@@ -3858,7 +3858,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOp *m = (MOSDOp*)(*i)->request;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
}
waiting_for_ack.erase(repop->v);
}
@@ -3873,7 +3873,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
repop->sent_ack = true;
}
@@ -3975,7 +3975,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
}
wr->pg_trim_to = pg_trim_to;
- osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
// keep peer_info up to date
if (pinfo.last_complete == pinfo.last_update)
@@ -4142,7 +4142,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
@@ -4624,7 +4624,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
}
rm->applied = true;
@@ -4670,7 +4670,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
}
rm->committed = true;
@@ -4965,7 +4965,7 @@ int ReplicatedPG::pull(
void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
{
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_remove_op " << oid << " from osd." << peer
<< " tid " << tid << dendl;
@@ -4975,7 +4975,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
/*
@@ -5093,7 +5093,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
{
// send op
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_pull_op " << recovery_info.soid << " "
<< recovery_info.version
@@ -5112,8 +5112,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
- osd->cluster_messenger->send_message(subop,
- get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
osd->logger->inc(l_osd_pull);
return 0;
@@ -5406,7 +5405,7 @@ void ReplicatedPG::handle_push(OpRequestRef op)
MOSDSubOpReply *reply = new MOSDSubOpReply(
m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
int ReplicatedPG::send_push(int prio, int peer,
@@ -5417,7 +5416,7 @@ int ReplicatedPG::send_push(int prio, int peer,
ObjectRecoveryProgress new_progress = progress;
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
false, 0, get_osdmap()->get_epoch(),
tid, recovery_info.version);
@@ -5507,8 +5506,7 @@ int ReplicatedPG::send_push(int prio, int peer,
subop->recovery_info = recovery_info;
subop->recovery_progress = new_progress;
subop->current_progress = progress;
- osd->cluster_messenger->
- send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
if (out_progress)
*out_progress = new_progress;
return 0;
@@ -5518,14 +5516,14 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
{
// send a blank push back to the primary
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
get_osdmap()->get_epoch(), tid, eversion_t());
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
subop->first = false;
subop->complete = false;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
@@ -5664,10 +5662,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since,
if (last_complete_ondisk == info.last_update) {
if (is_replica()) {
// we are fully up to date. tell the primary!
- osd->cluster_messenger->
- send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- last_complete_ondisk),
- get_osdmap()->get_cluster_inst(get_primary()));
+ osd->send_message_osd_cluster(get_primary(),
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ last_complete_ondisk),
+ get_osdmap()->get_epoch());
// adjust local snaps!
adjust_local_snaps();
@@ -6713,7 +6711,7 @@ int ReplicatedPG::recover_backfill(int max)
epoch_t e = get_osdmap()->get_epoch();
MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
pbi.end, hobject_t());
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
waiting_on_backfill = true;
start_recovery_op(pbi.end);
ops++;
@@ -6828,7 +6826,7 @@ int ReplicatedPG::recover_backfill(int max)
}
m->last_backfill = bound;
m->stats = pinfo.stats.stats;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
}
dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects