diff options
author | Sage Weil <sage.weil@dreamhost.com> | 2012-04-27 22:08:03 -0700 |
---|---|---|
committer | Sage Weil <sage.weil@dreamhost.com> | 2012-04-28 07:46:42 -0700 |
commit | 7e8ab0f29b580fe81588e0763d7ff6d4d3e0ddf9 (patch) | |
tree | 90c1821074879d45044ad28d435b88ce4f9d0ddc | |
parent | 0c6914039cf818d9336b7db9f313bf71b1096c46 (diff) | |
download | ceph-7e8ab0f29b580fe81588e0763d7ff6d4d3e0ddf9.tar.gz |
osd: share past intervals with notifies
Send past_intervals along with pg_info_t on every notify. The reasoning
here is as follows:
- we already have the state in memory
- if we don't send it, and the primary doesn't have it, it will
recalculate it by reading/decoding many previous maps from disk
- for a highly-tortured cluster, i see past_intervals on the order of
~6 KB, times 600 pgs means ~2.5 MB sent for every activate_map(). for
comparison, the same cluster would need to read and decode ~1 GB of
maps to recalculate the same info.
- for healthy clusters, the data is small, and costs little.
- for unhealthy clusters, the data is large, but most useful.
In theory we could set a threshold so that we don't send it if it is
large, but allow the primary to query it explicitly. I doubt it's worth
the complexity.
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
-rw-r--r-- | src/messages/MOSDPGNotify.h | 44 | ||||
-rw-r--r-- | src/osd/OSD.cc | 37 | ||||
-rw-r--r-- | src/osd/OSD.h | 4 | ||||
-rw-r--r-- | src/osd/PG.cc | 20 | ||||
-rw-r--r-- | src/osd/PG.h | 12 |
5 files changed, 77 insertions, 40 deletions
diff --git a/src/messages/MOSDPGNotify.h b/src/messages/MOSDPGNotify.h index 55aa1dc5ecd..a9ccaa610f9 100644 --- a/src/messages/MOSDPGNotify.h +++ b/src/messages/MOSDPGNotify.h @@ -25,7 +25,7 @@ class MOSDPGNotify : public Message { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 3; static const int COMPAT_VERSION = 1; epoch_t epoch; @@ -34,16 +34,16 @@ class MOSDPGNotify : public Message { /// query. This allows the recipient to disregard responses to old /// queries. epoch_t query_epoch; - vector<pg_info_t> pg_list; // pgid -> version + vector<pair<pg_info_t,pg_interval_map_t> > pg_list; // pgid -> version public: version_t get_epoch() { return epoch; } - vector<pg_info_t>& get_pg_list() { return pg_list; } + vector<pair<pg_info_t,pg_interval_map_t> >& get_pg_list() { return pg_list; } epoch_t get_query_epoch() { return query_epoch; } MOSDPGNotify() : Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION) { } - MOSDPGNotify(epoch_t e, vector<pg_info_t>& l, epoch_t query_epoch) + MOSDPGNotify(epoch_t e, vector<pair<pg_info_t,pg_interval_map_t> >& l, epoch_t query_epoch) : Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION), epoch(e), query_epoch(query_epoch) { pg_list.swap(l); @@ -56,25 +56,53 @@ public: void encode_payload(uint64_t features) { ::encode(epoch, payload); - ::encode(pg_list, payload); + + // v2 was vector<pg_info_t> + __u32 n = pg_list.size(); + ::encode(n, payload); + for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = pg_list.begin(); + p != pg_list.end(); + p++) + ::encode(p->first, payload); + ::encode(query_epoch, payload); + + // v3 needs the pg_interval_map_t for each record + for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = pg_list.begin(); + p != pg_list.end(); + p++) + ::encode(p->second, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(epoch, p); - ::decode(pg_list, p); + + // decode pg_info_t portion of the vector + __u32 n; + ::decode(n, p); + pg_list.resize(n); + for (unsigned i=0; i<n; i++) + ::decode(pg_list[i].first, p); + if (header.version >= 2) { ::decode(query_epoch, p); } + if (header.version >= 3) { + // get the pg_interval_map_t portion + for (unsigned i=0; i<n; i++) + ::decode(pg_list[i].second, p); + } } void print(ostream& out) const { out << "pg_notify("; - for (vector<pg_info_t>::const_iterator i = pg_list.begin(); + for (vector<pair<pg_info_t,pg_interval_map_t> >::const_iterator i = pg_list.begin(); i != pg_list.end(); ++i) { if (i != pg_list.begin()) out << ","; - out << i->pgid; + out << i->first.pgid; + if (i->second.size()) + out << "(" << i->second.size() << ")"; } out << " epoch " << epoch << " query_epoch " << query_epoch diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index fe31d0182ae..d6fc3a7ee61 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1113,6 +1113,7 @@ PG *OSD::_open_lock_pg(pg_t pgid, bool no_lockdep_check, bool hold_map_lock) PG *OSD::_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock, int role, vector<int>& up, vector<int>& acting, pg_history_t history, + pg_interval_map_t *pim, ObjectStore::Transaction& t) { assert(osd_lock.is_locked()); @@ -1132,7 +1133,7 @@ PG *OSD::_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock, history.last_epoch_started = history.epoch_created - 1; } - pg->init(role, up, acting, history, &t); + pg->init(role, up, acting, history, pim, &t); dout(7) << "_create_lock_pg " << *pg << dendl; return pg; @@ -1239,7 +1240,7 @@ void OSD::load_pgs() * hasn't changed since the given epoch and we are the primary. */ PG *OSD::get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& created, - bool primary, + bool primary, pg_interval_map_t *ppi, ObjectStore::Transaction **pt, C_Contexts **pfin) { @@ -1288,11 +1289,11 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& c // ok, create PG locally using provided Info and History *pt = new ObjectStore::Transaction; *pfin = new C_Contexts(g_ceph_context); - pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, **pt); + pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, ppi, **pt); created++; dout(10) << *pg << " is new" << dendl; - + // kick any waiters wake_pg_waiters(pg->info.pgid); @@ -3571,7 +3572,7 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin) dout(7) << "activate_map version " << osdmap->get_epoch() << dendl; - map< int, vector<pg_info_t> > notify_list; // primary -> list + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since map<int,MOSDPGInfo*> info_map; // peer -> message @@ -3896,7 +3897,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& history.same_interval_since = history.same_primary_since = osdmap->get_epoch(); PG *pg = _create_lock_pg(*q, true, true, - parent->get_role(), parent->up, parent->acting, history, t); + parent->get_role(), parent->up, parent->acting, history, NULL, t); children[*q] = pg; dout(10) << " child " << *pg << dendl; } @@ -3904,7 +3905,7 @@ void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& split_pg(parent, children, t); // reset pg - map< int, vector<pg_info_t> > notify_list; // primary -> list + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since map<int,MOSDPGInfo*> info_map; // peer -> message PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &tfin->contexts, &t); @@ -4130,7 +4131,7 @@ void OSD::handle_pg_create(OpRequestRef op) C_Contexts *fin = new C_Contexts(g_ceph_context); PG *pg = _create_lock_pg(pgid, true, false, - 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, + 0, creating_pgs[pgid].acting, creating_pgs[pgid].acting, history, NULL, *t); creating_pgs.erase(pgid); @@ -4162,10 +4163,10 @@ void OSD::handle_pg_create(OpRequestRef op) * content for, and they are primary for. */ -void OSD::do_notifies(map< int, vector<pg_info_t> >& notify_list, +void OSD::do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list, epoch_t query_epoch) { - for (map< int, vector<pg_info_t> >::iterator it = notify_list.begin(); + for (map< int, vector<pair<pg_info_t,pg_interval_map_t> > >::iterator it = notify_list.begin(); it != notify_list.end(); it++) { if (it->first == whoami) { @@ -4242,14 +4243,14 @@ void OSD::handle_pg_notify(OpRequestRef op) map<int, MOSDPGInfo*> info_map; int created = 0; - for (vector<pg_info_t>::iterator it = m->get_pg_list().begin(); + for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator it = m->get_pg_list().begin(); it != m->get_pg_list().end(); it++) { PG *pg = 0; ObjectStore::Transaction *t; C_Contexts *fin; - pg = get_or_create_pg(*it, m->get_epoch(), from, created, true, &t, &fin); + pg = get_or_create_pg(it->first, m->get_epoch(), from, created, true, &it->second, &t, &fin); if (!pg) continue; @@ -4262,7 +4263,7 @@ void OSD::handle_pg_notify(OpRequestRef op) } PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); - pg->handle_notify(from, *it, &rctx); + pg->handle_notify(from, it->first, &rctx); int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); @@ -4291,7 +4292,7 @@ void OSD::handle_pg_log(OpRequestRef op) ObjectStore::Transaction *t; C_Contexts *fin; PG *pg = get_or_create_pg(m->info, m->get_epoch(), - from, created, false, &t, &fin); + from, created, false, NULL, &t, &fin); if (!pg) { return; } @@ -4344,7 +4345,7 @@ void OSD::handle_pg_info(OpRequestRef op) ObjectStore::Transaction *t = 0; C_Contexts *fin = 0; PG *pg = get_or_create_pg(*p, m->get_epoch(), - from, created, false, &t, &fin); + from, created, false, NULL, &t, &fin); if (!pg) continue; @@ -4548,7 +4549,7 @@ void OSD::handle_pg_query(OpRequestRef op) op->mark_started(); - map< int, vector<pg_info_t> > notify_list; + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; for (map<pg_t,pg_query_t>::iterator it = m->pg_list.begin(); it != m->pg_list.end(); @@ -4583,7 +4584,7 @@ void OSD::handle_pg_query(OpRequestRef op) cluster_messenger->send_message(mlog, osdmap->get_cluster_inst(from)); } else { - notify_list[from].push_back(empty); + notify_list[from].push_back(make_pair(empty, pg_interval_map_t())); } continue; } @@ -4915,7 +4916,7 @@ void OSD::do_recovery(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts(g_ceph_context); - map< int, vector<pg_info_t> > notify_list; // primary -> list + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > notify_list; // primary -> list map< int, map<pg_t,pg_query_t> > query_map; // peer -> PG -> get_summary_since map<int,MOSDPGInfo*> info_map; // peer -> message PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 6b345f2a769..8843a9ac714 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -471,11 +471,13 @@ protected: PG *_open_lock_pg(pg_t pg, bool no_lockdep_check=false, bool hold_map_lock=false); PG *_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock, int role, vector<int>& up, vector<int>& acting, pg_history_t history, + pg_interval_map_t *pim, ObjectStore::Transaction& t); PG *lookup_lock_raw_pg(pg_t pgid); PG *get_or_create_pg(const pg_info_t& info, epoch_t epoch, int from, int& pcreated, bool primary, + pg_interval_map_t *ppi, ObjectStore::Transaction **pt, C_Contexts **pfin); @@ -617,7 +619,7 @@ protected: // -- generic pg peering -- - void do_notifies(map< int, vector<pg_info_t> >& notify_list, + void do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list, epoch_t query_epoch); void do_queries(map< int, map<pg_t,pg_query_t> >& query_map); void do_infos(map<int, MOSDPGInfo*>& info_map); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 7c758453f69..70818db7c82 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1815,6 +1815,7 @@ void PG::clear_stats() * @param t transaction to write out our new state in */ void PG::init(int role, vector<int>& newup, vector<int>& newacting, pg_history_t& history, + pg_interval_map_t *pim, ObjectStore::Transaction *t) { dout(10) << "init role " << role << " up " << newup << " acting " << newacting @@ -1832,6 +1833,11 @@ void PG::init(int role, vector<int>& newup, vector<int>& newacting, pg_history_t osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); + if (pim) { + past_intervals.swap(*pim); + dout(10) << "init got " << past_intervals.size() << " past intervals" << dendl; + } + write_info(*t); write_log(*t); } @@ -3305,7 +3311,7 @@ void PG::share_pg_log() } void PG::fulfill_info(int from, const pg_query_t &query, - pair<int, pg_info_t> ¬ify_info) + pair<pg_info_t, pg_interval_map_t> ¬ify_info) { assert(!acting.empty()); assert(from == acting[0]); @@ -3313,7 +3319,7 @@ void PG::fulfill_info(int from, const pg_query_t &query, // info dout(10) << "sending info" << dendl; - notify_info = make_pair(from, info); + notify_info = make_pair(info, past_intervals); } void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) @@ -3855,7 +3861,7 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&) PG *pg = context< RecoveryMachine >().pg; if (pg->is_stray() && pg->get_primary() >= 0) { context< RecoveryMachine >().send_notify(pg->get_primary(), - pg->info); + pg->info, pg->past_intervals); } pg->update_heartbeat_peers(); @@ -4243,7 +4249,7 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const ActMap&) PG *pg = context< RecoveryMachine >().pg; if (pg->is_stray() && pg->get_primary() >= 0) { context< RecoveryMachine >().send_notify(pg->get_primary(), - pg->info); + pg->info, pg->past_intervals); } return discard_event(); } @@ -4328,9 +4334,9 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MQuery& query) { PG *pg = context< RecoveryMachine >().pg; if (query.query.type == pg_query_t::INFO) { - pair<int, pg_info_t> notify_info; + pair<pg_info_t,pg_interval_map_t> notify_info; pg->fulfill_info(query.from, query.query, notify_info); - context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second); + context< RecoveryMachine >().send_notify(query.from, notify_info.first, notify_info.second); } else { pg->fulfill_log(query.from, query.query, query.query_epoch); } @@ -4342,7 +4348,7 @@ boost::statechart::result PG::RecoveryState::Stray::react(const ActMap&) PG *pg = context< RecoveryMachine >().pg; if (pg->is_stray() && pg->get_primary() >= 0) { context< RecoveryMachine >().send_notify(pg->get_primary(), - pg->info); + pg->info, pg->past_intervals); } return discard_event(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 1fee39ecf72..07c92925c6e 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -475,14 +475,14 @@ public: utime_t start_time; map< int, map<pg_t, pg_query_t> > *query_map; map< int, MOSDPGInfo* > *info_map; - map< int, vector<pg_info_t> > *notify_list; + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > *notify_list; list< Context* > *context_list; ObjectStore::Transaction *transaction; RecoveryCtx() : query_map(0), info_map(0), notify_list(0), context_list(0), transaction(0) {} RecoveryCtx(map< int, map<pg_t, pg_query_t> > *query_map, map< int, MOSDPGInfo* > *info_map, - map< int, vector<pg_info_t> > *notify_list, + map< int, vector<pair<pg_info_t,pg_interval_map_t> > > *notify_list, list< Context* > *context_list, ObjectStore::Transaction *transaction) : query_map(query_map), info_map(info_map), @@ -903,9 +903,9 @@ public: return state->rctx->context_list; } - void send_notify(int to, const pg_info_t &info) { + void send_notify(int to, const pg_info_t& info, const pg_interval_map_t& pi) { assert(state->rctx->notify_list); - (*state->rctx->notify_list)[to].push_back(info); + (*state->rctx->notify_list)[to].push_back(make_pair(info, pi)); } }; friend class RecoveryMachine; @@ -1291,7 +1291,7 @@ public: bool is_empty() const { return info.last_update == eversion_t(0,0); } void init(int role, vector<int>& up, vector<int>& acting, pg_history_t& history, - ObjectStore::Transaction *t); + pg_interval_map_t *pim, ObjectStore::Transaction *t); // pg on-disk state void do_pending_flush(); @@ -1332,7 +1332,7 @@ public: void set_last_peering_reset(); void fulfill_info(int from, const pg_query_t &query, - pair<int, pg_info_t> ¬ify_info); + pair<pg_info_t, pg_interval_map_t> ¬ify_info); void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch); bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting); bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); |