summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage.weil@dreamhost.com>2012-04-27 22:08:03 -0700
committerSage Weil <sage.weil@dreamhost.com>2012-04-28 07:46:42 -0700
commit7e8ab0f29b580fe81588e0763d7ff6d4d3e0ddf9 (patch)
tree90c1821074879d45044ad28d435b88ce4f9d0ddc
parent0c6914039cf818d9336b7db9f313bf71b1096c46 (diff)
downloadceph-7e8ab0f29b580fe81588e0763d7ff6d4d3e0ddf9.tar.gz
osd: share past intervals with notifies
Send past_intervals along with pg_info_t on every notify. The reasoning here is as follows: - we already have the state in memory - if we don't send it, and the primary doesn't have it, it will recalculate it by reading/decoding many previous maps from disk - for a highly-tortured cluster, i see past_intervals on the order of ~6 KB, times 600 pgs means ~2.5 MB sent for every activate_map(). for comparison, the same cluster would need to read and decode ~1 GB of maps to recalculate the same info. - for healthy clusters, the data is small, and costs little. - for unhealthy clusters, the data is large, but most useful. In theory we could set a threshold so that we don't send it if it is large, but allow the primary to query it explicitly. I doubt it's worth the complexity. Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
-rw-r--r--src/messages/MOSDPGNotify.h44
-rw-r--r--src/osd/OSD.cc37
-rw-r--r--src/osd/OSD.h4
-rw-r--r--src/osd/PG.cc20
-rw-r--r--src/osd/PG.h12
5 files changed, 77 insertions, 40 deletions
diff --git a/src/messages/MOSDPGNotify.h b/src/messages/MOSDPGNotify.h
index 55aa1dc5ecd..a9ccaa610f9 100644
--- a/src/messages/MOSDPGNotify.h
+++ b/src/messages/MOSDPGNotify.h
@@ -25,7 +25,7 @@
class MOSDPGNotify : public Message {
- static const int HEAD_VERSION = 2;
+ static const int HEAD_VERSION = 3;
static const int COMPAT_VERSION = 1;
epoch_t epoch;
@@ -34,16 +34,16 @@ class MOSDPGNotify : public Message {
/// query. This allows the recipient to disregard responses to old
/// queries.
epoch_t query_epoch;
- vector<pg_info_t> pg_list; // pgid -> version
+ vector<pair<pg_info_t,pg_interval_map_t> > pg_list; // pgid -> version
public:
version_t get_epoch() { return epoch; }
- vector<pg_info_t>& get_pg_list() { return pg_list; }
+ vector<pair<pg_info_t,pg_interval_map_t> >& get_pg_list() { return pg_list; }
epoch_t get_query_epoch() { return query_epoch; }
MOSDPGNotify()
: Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION) { }
- MOSDPGNotify(epoch_t e, vector<pg_info_t>& l, epoch_t query_epoch)
+ MOSDPGNotify(epoch_t e, vector<pair<pg_info_t,pg_interval_map_t> >& l, epoch_t query_epoch)
: Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION),
epoch(e), query_epoch(query_epoch) {
pg_list.swap(l);
@@ -56,25 +56,53 @@ public:
void encode_payload(uint64_t features) {
::encode(epoch, payload);
- ::encode(pg_list, payload);
+
+ // v2 was vector<pg_info_t>
+ __u32 n = pg_list.size();
+ ::encode(n, payload);
+ for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = pg_list.begin();
+ p != pg_list.end();
+ p++)
+ ::encode(p->first, payload);
+
::encode(query_epoch, payload);
+
+ // v3 needs the pg_interval_map_t for each record
+ for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = pg_list.begin();
+ p != pg_list.end();
+ p++)
+ ::encode(p->second, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(epoch, p);
- ::decode(pg_list, p);
+
+ // decode pg_info_t portion of the vector
+ __u32 n;
+ ::decode(n, p);
+ pg_list.resize(n);
+ for (unsigned i=0; i<n; i++)
+ ::decode(pg_list[i].first, p);
+
if (header.version >= 2) {
::decode(query_epoch, p);
}
+ if (header.version >= 3) {
+ // get the pg_interval_map_t portion
+ for (unsigned i=0; i<n; i++)
+ ::decode(pg_list[i].second, p);
+ }
}
void print(ostream& out) const {
out << "pg_notify(";
- for (vector<pg_info_t>::const_iterator i = pg_list.begin();
+ for (vector<pair<pg_info_t,pg_interval_map_t> >::const_iterator i = pg_list.begin();
i != pg_list.end();
++i) {
if (i != pg_list.begin())
out << ",";
- out << i->pgid;
+ out << i->first.pgid;
+ if (i->second.size())
+ out << "(" << i->second.size() << ")";
}
out << " epoch " << epoch
<< " query_epoch " << query_epoch
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index fe31d0182ae..d6fc3a7ee61 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1113,6 +1113,7 @@ PG *OSD::_open_lock_pg(pg_t pgid, bool no_lockdep_check, bool hold_map_lock)
PG *OSD::_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock,
int role, vector<int>& up, vector<int>& acting, pg_history_t history,
+ pg_interval_map_t *pim,
ObjectStore::Transaction& t)
{
assert(osd_lock.is_locked());
@@ -1132,7 +1133,7 @@ PG *OSD::_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock,
history.last_epoch_started = history.epoch_created - 1;
}
- pg->init(role, up, acting, history, &t);
+ pg->init(role, up, acting, history, pim, &t);
dout(7) << "_create_lock_pg " << *pg << dendl;
return pg;
@@ -1239,7 +1240,7 @@ void OSD::load_pgs()
* hasn't changed since the given epoch and we are the primary.
*/
PG *OSD::get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& created,
- bool primary,
+ bool primary, pg_interval_map_t *ppi,
ObjectStore::Transaction **pt,
C_Contexts **pfin)
{
@@ -1288,11 +1289,11 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& c
// ok, create PG locally using provided Info and History
*pt = new ObjectStore::Transaction;
*pfin = new C_Contexts(g_ceph_context);
- pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, **pt);
+ pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, ppi, **pt);
created++;
dout(10) << *pg << " is new" << dendl;
-
+
// kick any waiters
wake_pg_waiters(pg->info.pgid);
@@ -3571,7 +3572,7 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
- map< int, vector<pg_info_t> > notify_list; // primary -> list
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list
map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
map<int,MOSDPGInfo*> info_map; // peer -> message
@@ -3896,7 +3897,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction&
history.same_interval_since = history.same_primary_since =
osdmap->get_epoch();
PG *pg = _create_lock_pg(*q, true, true,
- parent->get_role(), parent->up, parent->acting, history, t);
+ parent->get_role(), parent->up, parent->acting, history, NULL, t);
children[*q] = pg;
dout(10) << " child " << *pg << dendl;
}
@@ -3904,7 +3905,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction&
split_pg(parent, children, t);
// reset pg
- map< int, vector<pg_info_t> > notify_list; // primary -> list
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list
map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
map<int,MOSDPGInfo*> info_map; // peer -> message
PG::RecoveryCtx rctx(&query_map, &info_map, &notify_list, &tfin->contexts, &t);
@@ -4130,7 +4131,7 @@ void OSD::handle_pg_create(OpRequestRef op)
C_Contexts *fin = new C_Contexts(g_ceph_context);
PG *pg = _create_lock_pg(pgid, true, false,
- 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history,
+ 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, NULL,
*t);
creating_pgs.erase(pgid);
@@ -4162,10 +4163,10 @@ void OSD::handle_pg_create(OpRequestRef op)
* content for, and they are primary for.
*/
-void OSD::do_notifies(map< int, vector<pg_info_t> >& notify_list,
+void OSD::do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
epoch_t query_epoch)
{
- for (map< int, vector<pg_info_t> >::iterator it = notify_list.begin();
+ for (map< int, vector<pair<pg_info_t,pg_interval_map_t> > >::iterator it = notify_list.begin();
it != notify_list.end();
it++) {
if (it->first == whoami) {
@@ -4242,14 +4243,14 @@ void OSD::handle_pg_notify(OpRequestRef op)
map<int, MOSDPGInfo*> info_map;
int created = 0;
- for (vector<pg_info_t>::iterator it = m->get_pg_list().begin();
+ for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator it = m->get_pg_list().begin();
it != m->get_pg_list().end();
it++) {
PG *pg = 0;
ObjectStore::Transaction *t;
C_Contexts *fin;
- pg = get_or_create_pg(*it, m->get_epoch(), from, created, true, &t, &fin);
+ pg = get_or_create_pg(it->first, m->get_epoch(), from, created, true, &it->second, &t, &fin);
if (!pg)
continue;
@@ -4262,7 +4263,7 @@ void OSD::handle_pg_notify(OpRequestRef op)
}
PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
- pg->handle_notify(from, *it, &rctx);
+ pg->handle_notify(from, it->first, &rctx);
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
@@ -4291,7 +4292,7 @@ void OSD::handle_pg_log(OpRequestRef op)
ObjectStore::Transaction *t;
C_Contexts *fin;
PG *pg = get_or_create_pg(m->info, m->get_epoch(),
- from, created, false, &t, &fin);
+ from, created, false, NULL, &t, &fin);
if (!pg) {
return;
}
@@ -4344,7 +4345,7 @@ void OSD::handle_pg_info(OpRequestRef op)
ObjectStore::Transaction *t = 0;
C_Contexts *fin = 0;
PG *pg = get_or_create_pg(*p, m->get_epoch(),
- from, created, false, &t, &fin);
+ from, created, false, NULL, &t, &fin);
if (!pg)
continue;
@@ -4548,7 +4549,7 @@ void OSD::handle_pg_query(OpRequestRef op)
op->mark_started();
- map< int, vector<pg_info_t> > notify_list;
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list;
for (map<pg_t,pg_query_t>::iterator it = m->pg_list.begin();
it != m->pg_list.end();
@@ -4583,7 +4584,7 @@ void OSD::handle_pg_query(OpRequestRef op)
cluster_messenger->send_message(mlog,
osdmap->get_cluster_inst(from));
} else {
- notify_list[from].push_back(empty);
+ notify_list[from].push_back(make_pair(empty, pg_interval_map_t()));
}
continue;
}
@@ -4915,7 +4916,7 @@ void OSD::do_recovery(PG *pg)
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *fin = new C_Contexts(g_ceph_context);
- map< int, vector<pg_info_t> > notify_list; // primary -> list
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list
map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since
map<int,MOSDPGInfo*> info_map; // peer -> message
PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 6b345f2a769..8843a9ac714 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -471,11 +471,13 @@ protected:
PG *_open_lock_pg(pg_t pg, bool no_lockdep_check=false, bool hold_map_lock=false);
PG *_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock,
int role, vector<int>& up, vector<int>& acting, pg_history_t history,
+ pg_interval_map_t *pim,
ObjectStore::Transaction& t);
PG *lookup_lock_raw_pg(pg_t pgid);
PG *get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& pcreated, bool primary,
+ pg_interval_map_t *ppi,
ObjectStore::Transaction **pt,
C_Contexts **pfin);
@@ -617,7 +619,7 @@ protected:
// -- generic pg peering --
- void do_notifies(map< int, vector<pg_info_t> >& notify_list,
+ void do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
epoch_t query_epoch);
void do_queries(map< int, map<pg_t,pg_query_t> >& query_map);
void do_infos(map<int, MOSDPGInfo*>& info_map);
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 7c758453f69..70818db7c82 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1815,6 +1815,7 @@ void PG::clear_stats()
* @param t transaction to write out our new state in
*/
void PG::init(int role, vector<int>& newup, vector<int>& newacting, pg_history_t& history,
+ pg_interval_map_t *pim,
ObjectStore::Transaction *t)
{
dout(10) << "init role " << role << " up " << newup << " acting " << newacting
@@ -1832,6 +1833,11 @@ void PG::init(int role, vector<int>& newup, vector<int>& newacting, pg_history_t
osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
+ if (pim) {
+ past_intervals.swap(*pim);
+ dout(10) << "init got " << past_intervals.size() << " past intervals" << dendl;
+ }
+
write_info(*t);
write_log(*t);
}
@@ -3305,7 +3311,7 @@ void PG::share_pg_log()
}
void PG::fulfill_info(int from, const pg_query_t &query,
- pair<int, pg_info_t> &notify_info)
+ pair<pg_info_t, pg_interval_map_t> &notify_info)
{
assert(!acting.empty());
assert(from == acting[0]);
@@ -3313,7 +3319,7 @@ void PG::fulfill_info(int from, const pg_query_t &query,
// info
dout(10) << "sending info" << dendl;
- notify_info = make_pair(from, info);
+ notify_info = make_pair(info, past_intervals);
}
void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
@@ -3855,7 +3861,7 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
PG *pg = context< RecoveryMachine >().pg;
if (pg->is_stray() && pg->get_primary() >= 0) {
context< RecoveryMachine >().send_notify(pg->get_primary(),
- pg->info);
+ pg->info, pg->past_intervals);
}
pg->update_heartbeat_peers();
@@ -4243,7 +4249,7 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const ActMap&)
PG *pg = context< RecoveryMachine >().pg;
if (pg->is_stray() && pg->get_primary() >= 0) {
context< RecoveryMachine >().send_notify(pg->get_primary(),
- pg->info);
+ pg->info, pg->past_intervals);
}
return discard_event();
}
@@ -4328,9 +4334,9 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MQuery& query)
{
PG *pg = context< RecoveryMachine >().pg;
if (query.query.type == pg_query_t::INFO) {
- pair<int, pg_info_t> notify_info;
+ pair<pg_info_t,pg_interval_map_t> notify_info;
pg->fulfill_info(query.from, query.query, notify_info);
- context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
+ context< RecoveryMachine >().send_notify(query.from, notify_info.first, notify_info.second);
} else {
pg->fulfill_log(query.from, query.query, query.query_epoch);
}
@@ -4342,7 +4348,7 @@ boost::statechart::result PG::RecoveryState::Stray::react(const ActMap&)
PG *pg = context< RecoveryMachine >().pg;
if (pg->is_stray() && pg->get_primary() >= 0) {
context< RecoveryMachine >().send_notify(pg->get_primary(),
- pg->info);
+ pg->info, pg->past_intervals);
}
return discard_event();
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 1fee39ecf72..07c92925c6e 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -475,14 +475,14 @@ public:
utime_t start_time;
map< int, map<pg_t, pg_query_t> > *query_map;
map< int, MOSDPGInfo* > *info_map;
- map< int, vector<pg_info_t> > *notify_list;
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > *notify_list;
list< Context* > *context_list;
ObjectStore::Transaction *transaction;
RecoveryCtx() : query_map(0), info_map(0), notify_list(0),
context_list(0), transaction(0) {}
RecoveryCtx(map< int, map<pg_t, pg_query_t> > *query_map,
map< int, MOSDPGInfo* > *info_map,
- map< int, vector<pg_info_t> > *notify_list,
+ map< int, vector<pair<pg_info_t,pg_interval_map_t> > > *notify_list,
list< Context* > *context_list,
ObjectStore::Transaction *transaction)
: query_map(query_map), info_map(info_map),
@@ -903,9 +903,9 @@ public:
return state->rctx->context_list;
}
- void send_notify(int to, const pg_info_t &info) {
+ void send_notify(int to, const pg_info_t& info, const pg_interval_map_t& pi) {
assert(state->rctx->notify_list);
- (*state->rctx->notify_list)[to].push_back(info);
+ (*state->rctx->notify_list)[to].push_back(make_pair(info, pi));
}
};
friend class RecoveryMachine;
@@ -1291,7 +1291,7 @@ public:
bool is_empty() const { return info.last_update == eversion_t(0,0); }
void init(int role, vector<int>& up, vector<int>& acting, pg_history_t& history,
- ObjectStore::Transaction *t);
+ pg_interval_map_t *pim, ObjectStore::Transaction *t);
// pg on-disk state
void do_pending_flush();
@@ -1332,7 +1332,7 @@ public:
void set_last_peering_reset();
void fulfill_info(int from, const pg_query_t &query,
- pair<int, pg_info_t> &notify_info);
+ pair<pg_info_t, pg_interval_map_t> &notify_info);
void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch);
bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);