diff options
Diffstat (limited to 'src/osd/PG.cc')
-rw-r--r-- | src/osd/PG.cc | 987 |
1 files changed, 118 insertions, 869 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc index da6a68ed387..94c10d0ab6e 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -211,33 +211,6 @@ std::string PG::gen_prefix() const return out.str(); } - - -void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s) -{ - if (complete_to != log.end() && - complete_to->version <= s) { - generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version - << " on " << *this << dendl; - } - - set<string> keys_to_rm; - while (!log.empty()) { - pg_log_entry_t &e = *log.begin(); - if (e.version > s) - break; - generic_dout(20) << "trim " << e << dendl; - unindex(e); // remove from index, - keys_to_rm.insert(e.get_key_name()); - log.pop_front(); // from log - } - t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm); - - // raise tail? - if (tail < s) - tail = s; -} - /********* PG **********/ void PG::proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing, int from) @@ -264,106 +237,7 @@ void PG::proc_replica_log(ObjectStore::Transaction& t, dout(10) << "proc_replica_log for osd." << from << ": " << oinfo << " " << olog << " " << omissing << dendl; - /* - basically what we're doing here is rewinding the remote log, - dropping divergent entries, until we find something that matches - our master log. we then reset last_update to reflect the new - point up to which missing is accurate. - - later, in activate(), missing will get wound forward again and - we will send the peer enough log to arrive at the same state. - */ - - for (map<hobject_t, pg_missing_t::item>::iterator i = omissing.missing.begin(); - i != omissing.missing.end(); - ++i) { - dout(20) << " before missing " << i->first << " need " << i->second.need - << " have " << i->second.have << dendl; - } - - list<pg_log_entry_t>::const_reverse_iterator pp = olog.log.rbegin(); - eversion_t lu(oinfo.last_update); - while (true) { - if (pp == olog.log.rend()) { - if (pp != olog.log.rbegin()) // no last_update adjustment if we discard nothing! - lu = olog.tail; - break; - } - const pg_log_entry_t& oe = *pp; - - // don't continue past the tail of our log. - if (oe.version <= log.tail) - break; - - if (!log.objects.count(oe.soid)) { - dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl; - ++pp; - continue; - } - - pg_log_entry_t& ne = *log.objects[oe.soid]; - if (ne.version == oe.version) { - dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl; - lu = pp->version; - break; - } - - if (oe.soid > oinfo.last_backfill) { - // past backfill line, don't care - dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl; - ++pp; - continue; - } - - if (ne.version > oe.version) { - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - } else { - if (oe.is_delete()) { - if (ne.is_delete()) { - // old and new are delete - dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl; - } else { - // old delete, new update. - dout(10) << " had " << oe << " new " << ne << " : missing" << dendl; - omissing.add(ne.soid, ne.version, eversion_t()); - } - } else { - if (ne.is_delete()) { - // old update, new delete - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.rm(oe.soid, oe.version); - } else { - // old update, new update - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.revise_need(ne.soid, ne.version); - } - } - } - - ++pp; - } - - if (lu < oinfo.last_update) { - dout(10) << " peer osd." << from << " last_update now " << lu << dendl; - oinfo.last_update = lu; - } - - if (omissing.have_missing()) { - eversion_t first_missing = - omissing.missing[omissing.rmissing.begin()->second].need; - oinfo.last_complete = eversion_t(); - list<pg_log_entry_t>::const_iterator i = olog.log.begin(); - for (; - i != olog.log.end(); - ++i) { - if (i->version < first_missing) - oinfo.last_complete = i->version; - else - break; - } - } else { - oinfo.last_complete = oinfo.last_update; - } + pg_log.proc_replica_log(t, oinfo, olog, omissing, from); peer_info[from] = oinfo; dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; @@ -429,271 +303,24 @@ void PG::remove_snap_mapped_object( } } - -/* - * merge an old (possibly divergent) log entry into the new log. this - * happens _after_ new log items have been assimilated. thus, we assume - * the index already references newer entries (if present), and missing - * has been updated accordingly. - * - * return true if entry is not divergent. - */ -bool PG::merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe) +void PG::merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from) { - if (oe.soid > info.last_backfill) { - dout(20) << "merge_old_entry had " << oe << " : beyond last_backfill" << dendl; - return false; - } - if (log.objects.count(oe.soid)) { - pg_log_entry_t &ne = *log.objects[oe.soid]; // new(er?) entry - - if (ne.version > oe.version) { - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl; - assert(ne.is_delete() || missing.is_missing(ne.soid)); - return false; - } - if (ne.version == oe.version) { - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : same" << dendl; - return true; - } - if (oe.is_delete()) { - if (ne.is_delete()) { - // old and new are delete - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : both deletes" << dendl; - } else { - // old delete, new update. - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl; - missing.revise_need(ne.soid, ne.version); - } - } else { - if (ne.is_delete()) { - // old update, new delete - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl; - missing.rm(oe.soid, oe.version); - } else { - // old update, new update - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl; - missing.revise_need(ne.soid, ne.version); - } - } - } else if (oe.op == pg_log_entry_t::CLONE) { - assert(oe.soid.snap != CEPH_NOSNAP); - dout(20) << "merge_old_entry had " << oe - << ", clone with no non-divergent log entries, " - << "deleting" << dendl; - remove_snap_mapped_object(t, oe.soid); - if (missing.is_missing(oe.soid)) - missing.rm(oe.soid, missing.missing[oe.soid].need); - } else if (oe.prior_version > info.log_tail) { - /** - * oe.prior_version is a previously divergent log entry - * oe.soid must have already been handled and the missing - * set updated appropriately - */ - dout(20) << "merge_old_entry had oe " << oe - << " with divergent prior_version " << oe.prior_version - << " oe.soid " << oe.soid - << " must already have been merged" << dendl; - } else { - if (!oe.is_delete()) { - dout(20) << "merge_old_entry had " << oe << " deleting" << dendl; - remove_snap_mapped_object(t, oe.soid); - } - dout(20) << "merge_old_entry had " << oe << " updating missing to " - << oe.prior_version << dendl; - if (oe.prior_version > eversion_t()) { - ondisklog.add_divergent_prior(oe.prior_version, oe.soid); - dirty_log = true; - missing.revise_need(oe.soid, oe.prior_version); - } else if (missing.is_missing(oe.soid)) { - missing.rm(oe.soid, missing.missing[oe.soid].need); - } - } - return false; + list<hobject_t> to_remove; + pg_log.merge_log(t, oinfo, olog, from, info, to_remove, dirty_log, dirty_info, dirty_big_info); + for(list<hobject_t>::iterator i = to_remove.begin(); + i != to_remove.end(); + i++) + remove_snap_mapped_object(t, *i); } -/** - * rewind divergent entries at the head of the log - * - * This rewinds entries off the head of our log that are divergent. - * This is used by replicas during activation. - * - * @param t transaction - * @param newhead new head to rewind to - */ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead) { - dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl; - assert(newhead > log.tail); - - list<pg_log_entry_t>::iterator p = log.log.end(); - list<pg_log_entry_t> divergent; - while (true) { - if (p == log.log.begin()) { - // yikes, the whole thing is divergent! - divergent.swap(log.log); - break; - } - --p; - if (p->version == newhead) { - ++p; - divergent.splice(divergent.begin(), log.log, p, log.log.end()); - break; - } - assert(p->version > newhead); - dout(10) << "rewind_divergent_log future divergent " << *p << dendl; - log.unindex(*p); - } - - log.head = newhead; - info.last_update = newhead; - if (info.last_complete > newhead) - info.last_complete = newhead; - - for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d) - merge_old_entry(t, *d); - - dirty_info = true; - dirty_big_info = true; - dirty_log = true; -} - -void PG::merge_log(ObjectStore::Transaction& t, - pg_info_t &oinfo, pg_log_t &olog, int fromosd) -{ - dout(10) << "merge_log " << olog << " from osd." << fromosd - << " into " << log << dendl; - - // Check preconditions - - // If our log is empty, the incoming log needs to have not been trimmed. - assert(!log.null() || olog.tail == eversion_t()); - // The logs must overlap. - assert(log.head >= olog.tail && olog.head >= log.tail); - - for (map<hobject_t, pg_missing_t::item>::iterator i = missing.missing.begin(); - i != missing.missing.end(); - ++i) { - dout(20) << "pg_missing_t sobject: " << i->first << dendl; - } - - bool changed = false; - - // extend on tail? - // this is just filling in history. it does not affect our - // missing set, as that should already be consistent with our - // current log. - if (olog.tail < log.tail) { - dout(10) << "merge_log extending tail to " << olog.tail << dendl; - list<pg_log_entry_t>::iterator from = olog.log.begin(); - list<pg_log_entry_t>::iterator to; - for (to = from; - to != olog.log.end(); - ++to) { - if (to->version > log.tail) - break; - log.index(*to); - dout(15) << *to << dendl; - } - assert(to != olog.log.end() || - (olog.head == info.last_update)); - - // splice into our log. - log.log.splice(log.log.begin(), - olog.log, from, to); - - info.log_tail = log.tail = olog.tail; - changed = true; - } - - if (oinfo.stats.reported < info.stats.reported) // make sure reported always increases - oinfo.stats.reported = info.stats.reported; - if (info.last_backfill.is_max()) - info.stats = oinfo.stats; - - // do we have divergent entries to throw out? - if (olog.head < log.head) { - rewind_divergent_log(t, olog.head); - changed = true; - } - - // extend on head? - if (olog.head > log.head) { - dout(10) << "merge_log extending head to " << olog.head << dendl; - - // find start point in olog - list<pg_log_entry_t>::iterator to = olog.log.end(); - list<pg_log_entry_t>::iterator from = olog.log.end(); - eversion_t lower_bound = olog.tail; - while (1) { - if (from == olog.log.begin()) - break; - --from; - dout(20) << " ? " << *from << dendl; - if (from->version <= log.head) { - dout(20) << "merge_log cut point (usually last shared) is " << *from << dendl; - lower_bound = from->version; - ++from; - break; - } - } - - // index, update missing, delete deleted - for (list<pg_log_entry_t>::iterator p = from; p != to; ++p) { - pg_log_entry_t &ne = *p; - dout(20) << "merge_log " << ne << dendl; - log.index(ne); - if (ne.soid <= info.last_backfill) { - missing.add_next_event(ne); - if (ne.is_delete()) - remove_snap_mapped_object(t, ne.soid); - } - } - - // move aside divergent items - list<pg_log_entry_t> divergent; - while (!log.empty()) { - pg_log_entry_t &oe = *log.log.rbegin(); - /* - * look at eversion.version here. we want to avoid a situation like: - * our log: 100'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 - * new log: 122'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 - * lower_bound = 100'9 - * i.e, same request, different version. If the eversion.version is > the - * lower_bound, we it is divergent. - */ - if (oe.version.version <= lower_bound.version) - break; - dout(10) << "merge_log divergent " << oe << dendl; - divergent.push_front(oe); - log.unindex(oe); - log.log.pop_back(); - } - - // splice - log.log.splice(log.log.end(), - olog.log, from, to); - log.index(); - - info.last_update = log.head = olog.head; - info.purged_snaps = oinfo.purged_snaps; - - // process divergent items - if (!divergent.empty()) { - for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d) - merge_old_entry(t, *d); - } - - changed = true; - } - - dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl; - - if (changed) { - dirty_info = true; - dirty_big_info = true; - dirty_log = true; - } + list<hobject_t> to_remove; + pg_log.rewind_divergent_log(t, newhead, info, to_remove, dirty_log, dirty_info, dirty_big_info); + for(list<hobject_t>::iterator i = to_remove.begin(); + i != to_remove.end(); + i++) + remove_snap_mapped_object(t, *i); } /* @@ -714,8 +341,8 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing peer_missing[fromosd]; // found items? - for (map<hobject_t,pg_missing_t::item>::iterator p = missing.missing.begin(); - p != missing.missing.end(); + for (map<hobject_t,pg_missing_t::item>::const_iterator p = pg_log.get_missing().missing.begin(); + p != pg_log.get_missing().missing.end(); ++p) { const hobject_t &soid(p->first); eversion_t need = p->second.need; @@ -775,12 +402,13 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing publish_stats_to_osd(); } - dout(20) << "search_for_missing missing " << missing.missing << dendl; + dout(20) << "search_for_missing missing " << pg_log.get_missing().missing << dendl; return found_missing; } void PG::discover_all_missing(map< int, map<pg_t,pg_query_t> > &query_map) { + const pg_missing_t &missing = pg_log.get_missing(); assert(missing.have_missing()); dout(10) << __func__ << " " @@ -826,23 +454,6 @@ void PG::discover_all_missing(map< int, map<pg_t,pg_query_t> > &query_map) } } - -ostream& PG::IndexedLog::print(ostream& out) const -{ - out << *this << std::endl; - for (list<pg_log_entry_t>::const_iterator p = log.begin(); - p != log.end(); - ++p) { - out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl; - assert(!p->reqid_is_indexed() || logged_req(p->reqid)); - } - return out; -} - - - - - /******* PG ***********/ bool PG::needs_recovery() const { @@ -850,6 +461,8 @@ bool PG::needs_recovery() const bool ret = false; + const pg_missing_t &missing = pg_log.get_missing(); + if (missing.num_missing()) { dout(10) << __func__ << " primary has " << missing.num_missing() << dendl; ret = true; @@ -1120,7 +733,7 @@ void PG::clear_primary_state() missing_loc.clear(); missing_loc_sources.clear(); - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); scrubber.reserved_peers.clear(); scrub_after_recovery = false; @@ -1486,6 +1099,8 @@ void PG::activate(ObjectStore::Transaction& t, info.last_epoch_started = query_epoch; + const pg_missing_t &missing = pg_log.get_missing(); + if (is_primary()) { // If necessary, create might_have_unfound to help us find our unfound objects. // NOTE: It's important that we build might_have_unfound before trimming the @@ -1530,24 +1145,10 @@ void PG::activate(ObjectStore::Transaction& t, dout(10) << "activate - no missing, moving last_complete " << info.last_complete << " -> " << info.last_update << dendl; info.last_complete = info.last_update; - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); } else { dout(10) << "activate - not complete, " << missing << dendl; - log.complete_to = log.log.begin(); - while (log.complete_to->version < - missing.missing[missing.rmissing.begin()->second].need) - log.complete_to++; - assert(log.complete_to != log.log.end()); - if (log.complete_to == log.log.begin()) { - info.last_complete = eversion_t(); - } else { - log.complete_to--; - info.last_complete = log.complete_to->version; - log.complete_to++; - } - log.last_requested = 0; - dout(10) << "activate - complete_to = " << log.complete_to->version - << dendl; + pg_log.activate_not_complete(info); if (is_primary()) { dout(10) << "activate - starting recovery" << dendl; osd->queue_for_recovery(this); @@ -1592,7 +1193,7 @@ void PG::activate(ObjectStore::Transaction& t, dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; m = new MOSDPGLog(get_osdmap()->get_epoch(), info); } - } else if (log.tail > pi.last_update || pi.last_backfill == hobject_t()) { + } else if (pg_log.get_tail() > pi.last_update || pi.last_backfill == hobject_t()) { // backfill osd->clog.info() << info.pgid << " restarting backfill on osd." << peer << " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill @@ -1607,17 +1208,17 @@ void PG::activate(ObjectStore::Transaction& t, m = new MOSDPGLog(get_osdmap()->get_epoch(), pi); // send some recent log, so that op dup detection works well. - m->log.copy_up_to(log, g_conf->osd_min_pg_log_entries); + m->log.copy_up_to(pg_log.get_log(), g_conf->osd_min_pg_log_entries); m->info.log_tail = m->log.tail; pi.log_tail = m->log.tail; // sigh... pm.clear(); } else { // catch up - assert(log.tail <= pi.last_update); + assert(pg_log.get_tail() <= pi.last_update); m = new MOSDPGLog(get_osdmap()->get_epoch(), info); // send new stuff to append to replicas log - m->log.copy_after(log, pi.last_update); + m->log.copy_after(pg_log.get_log(), pi.last_update); } // share past_intervals if we are creating the pg on the replica @@ -2047,47 +1648,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) osd->osd->finish_recovery_op(this, soid, dequeue); } -void PG::IndexedLog::split_into( - pg_t child_pgid, - unsigned split_bits, - PG::IndexedLog *olog) -{ - list<pg_log_entry_t> oldlog; - oldlog.swap(log); - - eversion_t old_tail; - olog->head = head; - olog->tail = tail; - unsigned mask = ~((~0)<<split_bits); - for (list<pg_log_entry_t>::iterator i = oldlog.begin(); - i != oldlog.end(); - ) { - if ((i->soid.hash & mask) == child_pgid.m_seed) { - olog->log.push_back(*i); - if (log.empty()) - tail = i->version; - } else { - log.push_back(*i); - if (olog->empty()) - olog->tail = i->version; - } - oldlog.erase(i++); - } - - if (log.empty()) - tail = head; - else - head = log.rbegin()->version; - - if (olog->empty()) - olog->tail = olog->head; - else - olog->head = olog->log.rbegin()->version; - - olog->index(); - index(); -} - static void split_list( list<OpRequestRef> *from, list<OpRequestRef> *to, @@ -2149,22 +1709,19 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) child->pool = pool; // Log - log.split_into(child_pgid, split_bits, &(child->log)); + pg_log.split_into(child_pgid, split_bits, &(child->pg_log)); child->info.last_complete = info.last_complete; - info.last_update = log.head; - child->info.last_update = child->log.head; - - info.log_tail = log.tail; - child->info.log_tail = child->log.tail; + info.last_update = pg_log.get_head(); + child->info.last_update = child->pg_log.get_head(); - if (info.last_complete < log.tail) - info.last_complete = log.tail; - if (child->info.last_complete < child->log.tail) - child->info.last_complete = child->log.tail; + info.log_tail = pg_log.get_tail(); + child->info.log_tail = child->pg_log.get_tail(); - // Missing - missing.split_into(child_pgid, split_bits, &(child->missing)); + if (info.last_complete < pg_log.get_tail()) + info.last_complete = pg_log.get_tail(); + if (child->info.last_complete < child->pg_log.get_tail()) + child->info.last_complete = child->pg_log.get_tail(); // Info child->info.history = info.history; @@ -2201,7 +1758,7 @@ void PG::clear_recovery_state() { dout(10) << "clear_recovery_state" << dendl; - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); finish_sync_event = 0; hobject_t soid; @@ -2339,10 +1896,10 @@ void PG::publish_stats_to_osd() info.stats.last_active = now; info.stats.last_unstale = now; - info.stats.log_size = ondisklog.length(); - info.stats.ondisk_log_size = ondisklog.length(); - info.stats.log_start = log.tail; - info.stats.ondisk_log_start = log.tail; + info.stats.log_size = pg_log.get_ondisklog().length(); + info.stats.ondisk_log_size = pg_log.get_ondisklog().length(); + info.stats.log_start = pg_log.get_tail(); + info.stats.ondisk_log_start = pg_log.get_tail(); pg_stats_publish_valid = true; pg_stats_publish = info.stats; @@ -2364,8 +1921,9 @@ void PG::publish_stats_to_osd() degraded += (target - acting.size()) * num_objects; // missing on primary - pg_stats_publish.stats.sum.num_objects_missing_on_primary = missing.num_missing(); - degraded += missing.num_missing(); + pg_stats_publish.stats.sum.num_objects_missing_on_primary = + pg_log.get_missing().num_missing(); + degraded += pg_log.get_missing().num_missing(); for (unsigned i=1; i<acting.size(); i++) { assert(peer_missing.count(acting[i])); @@ -2648,21 +2206,6 @@ void PG::write_info(ObjectStore::Transaction& t) dirty_big_info = false; } -void PG::clear_info_log( - pg_t pgid, - const hobject_t &infos_oid, - const hobject_t &log_oid, - ObjectStore::Transaction *t) { - - set<string> keys_to_remove; - keys_to_remove.insert(get_epoch_key(pgid)); - keys_to_remove.insert(get_biginfo_key(pgid)); - keys_to_remove.insert(get_info_key(pgid)); - - t->remove(coll_t::META_COLL, log_oid); - t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove); -} - epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid, bufferlist *bl) { assert(bl); @@ -2695,30 +2238,9 @@ epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid return cur_epoch; } -void PG::_write_log(ObjectStore::Transaction& t, pg_log_t &log, - const hobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors) -{ - //dout(10) << "write_log" << dendl; - t.remove(coll_t::META_COLL, log_oid); - t.touch(coll_t::META_COLL, log_oid); - map<string,bufferlist> keys; - for (list<pg_log_entry_t>::iterator p = log.log.begin(); - p != log.log.end(); - ++p) { - bufferlist bl(sizeof(*p) * 2); - p->encode_with_checksum(bl); - keys[p->get_key_name()].claim(bl); - } - //dout(10) << "write_log " << keys.size() << " keys" << dendl; - - ::encode(divergent_priors, keys["divergent_priors"]); - - t.omap_setkeys(coll_t::META_COLL, log_oid, keys); -} - void PG::write_log(ObjectStore::Transaction& t) { - _write_log(t, log, log_oid, ondisklog.divergent_priors); + pg_log.write_log(t, log_oid); dirty_log = false; } @@ -2730,23 +2252,6 @@ void PG::write_if_dirty(ObjectStore::Transaction& t) write_log(t); } -void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to) -{ - // trim? - if (trim_to > log.tail) { - /* If we are trimming, we must be complete up to trim_to, time - * to throw out any divergent_priors - */ - ondisklog.divergent_priors.clear(); - // We shouldn't be trimming the log past last_complete - assert(trim_to <= info.last_complete); - - dout(10) << "trim " << log << " to " << trim_to << dendl; - log.trim(t, log_oid, trim_to); - info.log_tail = log.tail; - } -} - void PG::trim_peers() { calc_trim_to(); @@ -2771,7 +2276,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl) info.last_update = e.version; // log mutation - log.add(e); + pg_log.add(e); dout(10) << "add_log_entry " << e << dendl; e.encode_with_checksum(log_bl); @@ -2781,7 +2286,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl) void PG::append_log( vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t) { - dout(10) << "append_log " << log << " " << logv << dendl; + dout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl; map<string,bufferlist> keys; for (vector<pg_log_entry_t>::iterator p = logv.begin(); @@ -2794,7 +2299,7 @@ void PG::append_log( dout(10) << "append_log adding " << keys.size() << " keys" << dendl; t.omap_setkeys(coll_t::META_COLL, log_oid, keys); - trim(t, trim_to); + pg_log.trim(t, trim_to, info, log_oid); // update the local pg, pg log dirty_info = true; @@ -2803,7 +2308,7 @@ void PG::append_log( bool PG::check_log_for_corruption(ObjectStore *store) { - OndiskLog bounds; + PGLog::OndiskLog bounds; bufferlist blb; store->collection_getattr(coll, "ondisklog", blb); bufferlist::iterator p = blb.begin(); @@ -2958,14 +2463,14 @@ void PG::read_state(ObjectStore *store, bufferlist &bl) assert(r >= 0); ostringstream oss; - if (read_log( + if (pg_log.read_log( store, coll, log_oid, info, - ondisklog, log, missing, oss, this)) { + oss)) { /* We don't want to leave the old format around in case the next log * write happens to be an append_log() */ ObjectStore::Transaction t; - write_log(t); + pg_log.write_log(t, log_oid); int r = osd->store->apply_transaction(t); assert(!r); } @@ -2978,36 +2483,40 @@ void PG::read_state(ObjectStore *store, bufferlist &bl) void PG::log_weirdness() { - if (log.tail != info.log_tail) + if (pg_log.get_tail() != info.log_tail) osd->clog.error() << info.pgid - << " info mismatch, log.tail " << log.tail + << " info mismatch, log.tail " << pg_log.get_tail() << " != info.log_tail " << info.log_tail << "\n"; - if (log.head != info.last_update) + if (pg_log.get_head() != info.last_update) osd->clog.error() << info.pgid - << " info mismatch, log.head " << log.head + << " info mismatch, log.head " << pg_log.get_head() << " != info.last_update " << info.last_update << "\n"; - if (log.empty()) { + if (pg_log.get_log().empty()) { // shoudl it be? - if (log.head != log.tail) + if (pg_log.get_head() != pg_log.get_tail()) osd->clog.error() << info.pgid - << " log bound mismatch, empty but (" << log.tail << "," << log.head << "]\n"; + << " log bound mismatch, empty but (" << pg_log.get_tail() << "," + << pg_log.get_head() << "]\n"; } else { - if ((log.log.begin()->version <= log.tail) || // sloppy check - (log.log.rbegin()->version != log.head && !(log.head == log.tail))) + if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()) || // sloppy check + (pg_log.get_log().log.rbegin()->version != pg_log.get_head() && + !(pg_log.get_head() == pg_log.get_tail()))) osd->clog.error() << info.pgid - << " log bound mismatch, info (" << log.tail << "," << log.head << "]" + << " log bound mismatch, info (" << pg_log.get_tail() << "," + << pg_log.get_head() << "]" << " actual [" - << log.log.begin()->version << "," << log.log.rbegin()->version << "]" + << pg_log.get_log().log.begin()->version << "," + << pg_log.get_log().log.rbegin()->version << "]" << "\n"; } - if (log.caller_ops.size() > log.log.size()) { + if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) { osd->clog.error() << info.pgid - << " caller_ops.size " << log.caller_ops.size() - << " > log size " << log.log.size() + << " caller_ops.size " << pg_log.get_log().caller_ops.size() + << " > log size " << pg_log.get_log().log.size() << "\n"; } } @@ -3691,17 +3200,17 @@ void PG::build_inc_scrub_map( map.valid_through = last_update_applied; map.incr_since = v; vector<hobject_t> ls; - list<pg_log_entry_t>::iterator p; - if (v == log.tail) { - p = log.log.begin(); - } else if (v > log.tail) { - p = log.find_entry(v); + list<pg_log_entry_t>::const_iterator p; + if (v == pg_log.get_tail()) { + p = pg_log.get_log().log.begin(); + } else if (v > pg_log.get_tail()) { + p = pg_log.get_log().find_entry(v); ++p; } else { assert(0); } - for (; p != log.log.end(); ++p) { + for (; p != pg_log.get_log().log.end(); ++p) { if (p->is_update()) { ls.push_back(p->soid); map.objects[p->soid].negative = false; @@ -3731,11 +3240,11 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer // We should only be scrubbing if the PG is clean. assert(waiting_for_missing_object.empty()); - missing.add(soid, oi.version, eversion_t()); + pg_log.missing_add(soid, oi.version, eversion_t()); missing_loc[soid].insert(ok_peer); missing_loc_sources.insert(ok_peer); - log.last_requested = 0; + pg_log.set_last_requested(0); } } @@ -4010,7 +3519,7 @@ void PG::classic_scrub(ThreadPool::TPHandle &handle) return; } - if (scrubber.primary_scrubmap.valid_through != log.head) { + if (scrubber.primary_scrubmap.valid_through != pg_log.get_head()) { ScrubMap incr; build_inc_scrub_map(incr, scrubber.primary_scrubmap.valid_through, handle); scrubber.primary_scrubmap.merge_incr(incr); @@ -4179,9 +3688,9 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) scrubber.block_writes = true; // walk the log to find the latest update that affects our chunk - scrubber.subset_last_update = log.tail; - for (list<pg_log_entry_t>::iterator p = log.log.begin(); - p != log.log.end(); + scrubber.subset_last_update = pg_log.get_tail(); + for (list<pg_log_entry_t>::const_iterator p = pg_log.get_log().log.begin(); + p != pg_log.get_log().log.end(); ++p) { if (p->soid >= scrubber.start && p->soid < scrubber.end) scrubber.subset_last_update = p->version; @@ -4324,7 +3833,7 @@ bool PG::scrub_gather_replica_maps() p != scrubber.received_maps.end(); ++p) { - if (scrubber.received_maps[p->first].valid_through != log.head) { + if (scrubber.received_maps[p->first].valid_through != pg_log.get_head()) { scrubber.waiting_on++; scrubber.waiting_on_whom.insert(p->first); // Need to request another incremental map @@ -4831,7 +4340,7 @@ void PG::share_pg_log() pg_info_t& pinfo(peer_info[peer]); MOSDPGLog *m = new MOSDPGLog(info.last_update.epoch, info); - m->log.copy_after(log, pinfo.last_update); + m->log.copy_after(pg_log.get_log(), pinfo.last_update); for (list<pg_log_entry_t>::const_iterator i = m->log.log.begin(); i != m->log.log.end(); @@ -4871,23 +4380,23 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) MOSDPGLog *mlog = new MOSDPGLog(get_osdmap()->get_epoch(), info, query_epoch); - mlog->missing = missing; + mlog->missing = pg_log.get_missing(); // primary -> other, when building master log if (query.type == pg_query_t::LOG) { dout(10) << " sending info+missing+log since " << query.since << dendl; - if (query.since != eversion_t() && query.since < log.tail) { + if (query.since != eversion_t() && query.since < pg_log.get_tail()) { osd->clog.error() << info.pgid << " got broken pg_query_t::LOG since " << query.since - << " when my log.tail is " << log.tail + << " when my log.tail is " << pg_log.get_tail() << ", sending full log instead\n"; - mlog->log = log; // primary should not have requested this!! + mlog->log = pg_log.get_log(); // primary should not have requested this!! } else - mlog->log.copy_after(log, query.since); + mlog->log.copy_after(pg_log.get_log(), query.since); } else if (query.type == pg_query_t::FULLLOG) { dout(10) << " sending info+missing+full log" << dendl; - mlog->log = log; + mlog->log = pg_log.get_log(); } dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; @@ -5259,21 +4768,22 @@ ostream& operator<<(ostream& out, const PG& pg) if (pg.recovery_ops_active) out << " rops=" << pg.recovery_ops_active; - if (pg.log.tail != pg.info.log_tail || - pg.log.head != pg.info.last_update) - out << " (info mismatch, " << pg.log << ")"; + if (pg.pg_log.get_tail() != pg.info.log_tail || + pg.pg_log.get_head() != pg.info.last_update) + out << " (info mismatch, " << pg.pg_log.get_log() << ")"; - if (pg.log.empty()) { + if (pg.pg_log.get_log().empty()) { // shoudl it be? - if (pg.log.head.version - pg.log.tail.version != 0) { + if (pg.pg_log.get_head().version - pg.pg_log.get_tail().version != 0) { out << " (log bound mismatch, empty)"; } } else { - if ((pg.log.log.begin()->version <= pg.log.tail) || // sloppy check - (pg.log.log.rbegin()->version != pg.log.head && !(pg.log.head == pg.log.tail))) { + if ((pg.pg_log.get_log().log.begin()->version <= pg.pg_log.get_tail()) || // sloppy check + (pg.pg_log.get_log().log.rbegin()->version != pg.pg_log.get_head() && + !(pg.pg_log.get_head() == pg.pg_log.get_tail()))) { out << " (log bound mismatch, actual=[" - << pg.log.log.begin()->version << "," - << pg.log.log.rbegin()->version << "]"; + << pg.pg_log.get_log().log.begin()->version << "," + << pg.pg_log.get_log().log.rbegin()->version << "]"; //out << "len=" << pg.log.log.size(); out << ")"; } @@ -5300,9 +4810,9 @@ ostream& operator<<(ostream& out, const PG& pg) if (pg.scrubber.must_scrub) out << " MUST_SCRUB"; - //out << " (" << pg.log.tail << "," << pg.log.head << "]"; - if (pg.missing.num_missing()) { - out << " m=" << pg.missing.num_missing(); + //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]"; + if (pg.pg_log.get_missing().num_missing()) { + out << " m=" << pg.pg_log.get_missing().num_missing(); if (pg.is_primary()) { int unfound = pg.get_num_unfound(); if (unfound) @@ -5601,265 +5111,6 @@ std::ostream& operator<<(std::ostream& oss, return oss; } -/*---------------------------------------------------*/ -// Handle staitc function so it can use dout() -#undef dout_prefix -#define dout_prefix if (passedpg) _prefix(_dout, passedpg) - -bool PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg) -{ - dout(10) << "read_log" << dendl; - bool rewrite_log = false; - - // legacy? - struct stat st; - int r = store->stat(coll_t::META_COLL, log_oid, &st); - assert(r == 0); - if (st.st_size > 0) { - read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg); - rewrite_log = true; - } else { - log.tail = info.log_tail; - ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid); - if (p) for (p->seek_to_first(); p->valid() ; p->next()) { - bufferlist bl = p->value();//Copy bufferlist before creating iterator - bufferlist::iterator bp = bl.begin(); - if (p->key() == "divergent_priors") { - ::decode(ondisklog.divergent_priors, bp); - dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl; - } else { - pg_log_entry_t e; - e.decode_with_checksum(bp); - dout(20) << "read_log " << e << dendl; - if (!log.log.empty()) { - pg_log_entry_t last_e(log.log.back()); - assert(last_e.version.version < e.version.version); - assert(last_e.version.epoch <= e.version.epoch); - } - log.log.push_back(e); - log.head = e.version; - } - } - } - log.head = info.last_update; - log.index(); - - // build missing - if (info.last_complete < info.last_update) { - dout(10) << "read_log checking for missing items over interval (" << info.last_complete - << "," << info.last_update << "]" << dendl; - - set<hobject_t> did; - for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin(); - i != log.log.rend(); - ++i) { - if (i->version <= info.last_complete) break; - if (did.count(i->soid)) continue; - did.insert(i->soid); - - if (i->is_delete()) continue; - - bufferlist bv; - int r = store->getattr(coll, i->soid, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - if (oi.version < i->version) { - dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl; - missing.add(i->soid, i->version, oi.version); - } - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->soid, i->version, eversion_t()); - } - } - for (map<eversion_t, hobject_t>::reverse_iterator i = - ondisklog.divergent_priors.rbegin(); - i != ondisklog.divergent_priors.rend(); - ++i) { - if (i->first <= info.last_complete) break; - if (did.count(i->second)) continue; - did.insert(i->second); - bufferlist bv; - int r = store->getattr(coll, i->second, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - /** - * 1) we see this entry in the divergent priors mapping - * 2) we didn't see an entry for this object in the log - * - * From 1 & 2 we know that either the object does not exist - * or it is at the version specified in the divergent_priors - * map since the object would have been deleted atomically - * with the addition of the divergent_priors entry, an older - * version would not have been recovered, and a newer version - * would show up in the log above. - */ - assert(oi.version == i->first); - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->second, i->first, eversion_t()); - } - } - } - dout(10) << "read_log done" << dendl; - return rewrite_log; -} - -void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg) -{ - // load bounds, based on old OndiskLog encoding. - uint64_t ondisklog_tail = 0; - uint64_t ondisklog_head = 0; - uint64_t ondisklog_zero_to; - bool ondisklog_has_checksums; - - bufferlist blb; - store->collection_getattr(coll, "ondisklog", blb); - { - bufferlist::iterator bl = blb.begin(); - DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); - ondisklog_has_checksums = (struct_v >= 2); - ::decode(ondisklog_tail, bl); - ::decode(ondisklog_head, bl); - if (struct_v >= 4) - ::decode(ondisklog_zero_to, bl); - else - ondisklog_zero_to = 0; - if (struct_v >= 5) - ::decode(ondisklog.divergent_priors, bl); - DECODE_FINISH(bl); - } - uint64_t ondisklog_length = ondisklog_head - ondisklog_tail; - dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl; - - log.tail = info.log_tail; - - // In case of sobject_t based encoding, may need to list objects in the store - // to find hashes - vector<hobject_t> ls; - - if (ondisklog_head > 0) { - // read - bufferlist bl; - store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl); - if (bl.length() < ondisklog_length) { - std::ostringstream oss; - oss << "read_log got " << bl.length() << " bytes, expected " - << ondisklog_head << "-" << ondisklog_tail << "=" - << ondisklog_length; - throw read_log_error(oss.str().c_str()); - } - - pg_log_entry_t e; - bufferlist::iterator p = bl.begin(); - assert(log.empty()); - eversion_t last; - bool reorder = false; - bool listed_collection = false; - - while (!p.end()) { - uint64_t pos = ondisklog_tail + p.get_off(); - if (ondisklog_has_checksums) { - bufferlist ebl; - ::decode(ebl, p); - __u32 crc; - ::decode(crc, p); - - __u32 got = ebl.crc32c(0); - if (crc == got) { - bufferlist::iterator q = ebl.begin(); - ::decode(e, q); - } else { - std::ostringstream oss; - oss << "read_log " << pos << " bad crc got " << got << " expected" << crc; - throw read_log_error(oss.str().c_str()); - } - } else { - ::decode(e, p); - } - dout(20) << "read_log " << pos << " " << e << dendl; - - // [repair] in order? - if (e.version < last) { - dout(0) << "read_log " << pos << " out of order entry " << e << " follows " << last << dendl; - oss << info.pgid << " log has out of order entry " - << e << " following " << last << "\n"; - reorder = true; - } - - if (e.version <= log.tail) { - dout(20) << "read_log ignoring entry at " << pos << " below log.tail" << dendl; - continue; - } - if (last.version == e.version.version) { - dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl; - log.log.pop_back(); - oss << info.pgid << " read_log got dup " - << e.version << " after " << last << "\n"; - } - - if (e.invalid_hash) { - // We need to find the object in the store to get the hash - if (!listed_collection) { - store->collection_list(coll, ls); - listed_collection = true; - } - bool found = false; - for (vector<hobject_t>::iterator i = ls.begin(); - i != ls.end(); - ++i) { - if (i->oid == e.soid.oid && i->snap == e.soid.snap) { - e.soid = *i; - found = true; - break; - } - } - if (!found) { - // Didn't find the correct hash - std::ostringstream oss; - oss << "Could not find hash for hoid " << e.soid << std::endl; - throw read_log_error(oss.str().c_str()); - } - } - - if (e.invalid_pool) { - e.soid.pool = info.pgid.pool(); - } - - e.offset = pos; - uint64_t endpos = ondisklog_tail + p.get_off(); - log.log.push_back(e); - last = e.version; - - // [repair] at end of log? - if (!p.end() && e.version == info.last_update) { - oss << info.pgid << " log has extra data at " - << endpos << "~" << (ondisklog_head-endpos) << " after " - << info.last_update << "\n"; - - dout(0) << "read_log " << endpos << " *** extra gunk at end of log, " - << "adjusting ondisklog_head" << dendl; - ondisklog_head = endpos; - break; - } - } - - if (reorder) { - dout(0) << "read_log reordering log" << dendl; - map<eversion_t, pg_log_entry_t> m; - for (list<pg_log_entry_t>::iterator p = log.log.begin(); p != log.log.end(); ++p) - m[p->version] = *p; - log.log.clear(); - for (map<eversion_t, pg_log_entry_t>::iterator p = m.begin(); p != m.end(); ++p) - log.log.push_back(p->second); - } - } -} - /*------------ Recovery State Machine----------------*/ #undef dout_prefix #define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() \ @@ -6539,7 +5790,7 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx) void PG::RecoveryState::Recovering::release_reservations() { PG *pg = context< RecoveryMachine >().pg; - assert(!pg->missing.have_missing()); + assert(!pg->pg_log.get_missing().have_missing()); pg->state_clear(PG_STATE_RECOVERING); // release remote reservations @@ -6718,7 +5969,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&) if (g_conf->osd_check_for_log_corruption) pg->check_log_for_corruption(pg->osd->store); - int unfound = pg->missing.num_missing() - pg->missing_loc.size(); + int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size(); if (unfound > 0 && pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) { if (g_conf->osd_auto_mark_unfound_lost) { @@ -6917,10 +6168,9 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const MLogRec& { PG *pg = context< RecoveryMachine >().pg; dout(10) << "received log from " << logevt.from << dendl; - pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(), - logevt.msg->info, logevt.msg->log, logevt.from); - - assert(pg->log.head == pg->info.last_update); + ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction(); + pg->merge_log(*t,logevt.msg->info, logevt.msg->log, logevt.from); + assert(pg->pg_log.get_head() == pg->info.last_update); return discard_event(); } @@ -6994,14 +6244,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt) pg->dirty_info = true; pg->dirty_big_info = true; // maybe. pg->dirty_log = true; - pg->log.claim_log(msg->log); - pg->missing.clear(); + pg->pg_log.claim_log(msg->log); } else { - pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(), - msg->info, msg->log, logevt.from); + ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction(); + pg->merge_log(*t, msg->info, msg->log, logevt.from); } - assert(pg->log.head == pg->info.last_update); + assert(pg->pg_log.get_head() == pg->info.last_update); post_event(Activate(logevt.msg->get_epoch())); return transit<ReplicaActive>(); @@ -7014,13 +6263,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoev if (pg->info.last_update > infoevt.info.last_update) { // rewind divergent log entries - pg->rewind_divergent_log(*context< RecoveryMachine >().get_cur_transaction(), - infoevt.info.last_update); + ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction(); + pg->rewind_divergent_log(*t, infoevt.info.last_update); pg->info.stats = infoevt.info.stats; } assert(infoevt.info.last_update == pg->info.last_update); - assert(pg->log.head == pg->info.last_update); + assert(pg->pg_log.get_head() == pg->info.last_update); post_event(Activate(infoevt.msg_epoch)); return transit<ReplicaActive>(); @@ -7451,7 +6700,7 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx) if (pi.is_empty()) continue; // no pg data, nothing divergent - if (pi.last_update < pg->log.tail) { + if (pi.last_update < pg->pg_log.get_tail()) { dout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl; pg->peer_missing[*i]; continue; |