summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-29 09:21:49 -0800
committerSage Weil <sage@inktank.com>2012-11-29 12:39:44 -0800
commit5bc10ecce305e5e7b1ab53de69b8419521e1aa43 (patch)
tree5dd6cc28b6fbb6a2d32fd6680306f84f7b07d633
parentbd03234c31dab0860683f94fbc7f50f5daf82a0d (diff)
downloadceph-5bc10ecce305e5e7b1ab53de69b8419521e1aa43.tar.gz
osd: use OSDService send_message helper from PG context
Use the OSDService helper to send messages to peers. This ensures that if we are on an older OSDMap the messages don't actually get sent to down OSDs that handle_osd_map has done mark_down() on. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/osd/OSD.cc15
-rw-r--r--src/osd/PG.cc61
-rw-r--r--src/osd/ReplicatedPG.cc28
3 files changed, 52 insertions, 52 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index eef6d15f7db..da4b18ebeb7 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -4654,7 +4654,7 @@ void OSD::do_notifies(
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
} else {
dout(7) << "do_notify osd." << it->first
<< " sending seperate messages" << dendl;
@@ -4666,7 +4666,7 @@ void OSD::do_notifies(
list[0] = *i;
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
list);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
}
}
}
@@ -4692,7 +4692,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ service.send_message_osd_cluster(who, m, curmap->get_epoch());
} else {
dout(7) << "do_queries querying osd." << who
<< " sending seperate messages "
@@ -4703,7 +4703,7 @@ void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
map<pg_t, pg_query_t> to_send;
to_send.insert(*i);
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ service.send_message_osd_cluster(who, m, curmap->get_epoch());
}
}
}
@@ -4729,7 +4729,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
} else {
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
p->second.begin();
@@ -4739,7 +4739,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
to_send[0] = *i;
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
m->pg_list = to_send;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
}
}
}
@@ -5113,8 +5113,7 @@ void OSD::handle_pg_query(OpRequestRef op)
MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
it->second.epoch_sent);
_share_map_outgoing(osdmap->get_cluster_inst(from));
- cluster_messenger->send_message(mlog,
- osdmap->get_cluster_inst(from));
+ service.send_message_osd_cluster(from, mlog, osdmap->get_epoch());
} else {
notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
osdmap->get_epoch(),
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 77935ea2d7b..21a6f514dbc 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1538,7 +1538,7 @@ void PG::activate(ObjectStore::Transaction& t,
if (m) {
dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
//m->log.print(cout);
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
// peer now has
@@ -1747,7 +1747,7 @@ void PG::_activate_committed(epoch_t e)
info);
i.info.history.last_epoch_started = e;
m->pg_list.push_back(make_pair(i, pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(acting[0]));
+ osd->send_message_osd_cluster(acting[0], m, get_osdmap()->get_epoch());
}
if (dirty_info) {
@@ -1954,8 +1954,7 @@ void PG::purge_strays()
MOSDPGRemove *m = new MOSDPGRemove(
get_osdmap()->get_epoch(),
to_remove);
- osd->cluster_messenger->send_message(
- m, get_osdmap()->get_cluster_inst(*p));
+ osd->send_message_osd_cluster(*p, m, get_osdmap()->get_epoch());
stray_purged.insert(*p);
} else {
dout(10) << "not sending PGRemove to down osd." << *p << dendl;
@@ -2268,9 +2267,10 @@ void PG::trim_peers()
dout(10) << "trim_peers " << pg_trim_to << dendl;
if (pg_trim_to != eversion_t()) {
for (unsigned i=1; i<acting.size(); i++)
- osd->cluster_messenger->send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- pg_trim_to),
- get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i],
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ pg_trim_to),
+ get_osdmap()->get_epoch());
}
}
@@ -2996,8 +2996,7 @@ void PG::_request_scrub_map_classic(int replica, eversion_t version)
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
last_update_applied,
get_osdmap()->get_epoch());
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
// send scrub v3 messages (chunky scrub)
@@ -3010,8 +3009,7 @@ void PG::_request_scrub_map(int replica, eversion_t version,
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
get_osdmap()->get_epoch(),
start, end, deep);
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
void PG::sub_op_scrub_reserve(OpRequestRef op)
@@ -3116,7 +3114,7 @@ void PG::scrub_reserve_replicas()
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
@@ -3132,7 +3130,7 @@ void PG::scrub_unreserve_replicas()
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
@@ -4140,7 +4138,7 @@ void PG::share_pg_info()
get_osdmap()->get_epoch(),
info),
pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
@@ -4172,7 +4170,7 @@ void PG::share_pg_log()
}
pinfo.last_update = m->log.head;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
@@ -4226,8 +4224,7 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from),
get_osdmap());
- osd->cluster_messenger->send_message(mlog,
- get_osdmap()->get_cluster_inst(from));
+ osd->send_message_osd_cluster(from, mlog, get_osdmap()->get_epoch());
}
@@ -5246,12 +5243,13 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->backfill_target,
new MBackfillReserve(
MBackfillReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
+ pg->get_osdmap()->get_epoch());
} else {
post_event(RemoteBackfillReserved());
}
@@ -5353,12 +5351,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MRecoveryReserve(
MRecoveryReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
@@ -5400,12 +5399,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
@@ -5413,12 +5413,13 @@ boost::statechart::result
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::REJECT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepNotRecovering>();
}
@@ -5489,12 +5490,13 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ *acting_osd_it,
new MRecoveryReserve(
MRecoveryReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
+ pg->get_osdmap()->get_epoch());
} else {
post_event(RemoteRecoveryReserved());
}
@@ -5537,12 +5539,13 @@ void PG::RecoveryState::Recovering::release_reservations()
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(*i));
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ *i,
new MRecoveryReserve(
MRecoveryReserve::RELEASE,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*i));
+ pg->get_osdmap()->get_epoch());
}
}
}
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 2513d9d6fe4..de259eb1673 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -3975,7 +3975,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
}
wr->pg_trim_to = pg_trim_to;
- osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
// keep peer_info up to date
if (pinfo.last_complete == pinfo.last_update)
@@ -4624,7 +4624,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
}
rm->applied = true;
@@ -4670,7 +4670,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
}
rm->committed = true;
@@ -4975,7 +4975,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
/*
@@ -5112,8 +5112,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
- osd->cluster_messenger->send_message(subop,
- get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
osd->logger->inc(l_osd_pull);
return 0;
@@ -5507,8 +5506,7 @@ int ReplicatedPG::send_push(int prio, int peer,
subop->recovery_info = recovery_info;
subop->recovery_progress = new_progress;
subop->current_progress = progress;
- osd->cluster_messenger->
- send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
if (out_progress)
*out_progress = new_progress;
return 0;
@@ -5525,7 +5523,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
subop->first = false;
subop->complete = false;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
@@ -5664,10 +5662,10 @@ void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since,
if (last_complete_ondisk == info.last_update) {
if (is_replica()) {
// we are fully up to date. tell the primary!
- osd->cluster_messenger->
- send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- last_complete_ondisk),
- get_osdmap()->get_cluster_inst(get_primary()));
+ osd->send_message_osd_cluster(get_primary(),
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ last_complete_ondisk),
+ get_osdmap()->get_epoch());
// adjust local snaps!
adjust_local_snaps();
@@ -6713,7 +6711,7 @@ int ReplicatedPG::recover_backfill(int max)
epoch_t e = get_osdmap()->get_epoch();
MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
pbi.end, hobject_t());
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
waiting_on_backfill = true;
start_recovery_op(pbi.end);
ops++;
@@ -6828,7 +6826,7 @@ int ReplicatedPG::recover_backfill(int max)
}
m->last_backfill = bound;
m->stats = pinfo.stats.stats;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
}
dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects