From ec7731f737bcd061d4d1189c391ccff2661ca7ee Mon Sep 17 00:00:00 2001 From: Loic Dachary Date: Wed, 22 May 2013 14:14:26 +0200 Subject: move log, ondisklog, missing from PG to PGLog PG::log, PG::ondisklog, PG::missing are moved from PG to a new PGLog class and are made protected data members. It is a preliminary step before writing unit tests to cover the methods that have side effects on these data members and define a clean PGLog API. It improves encapsulation and does not change any of the logic already in place. Possible issues : * an additional reference (PG->PGLog->IndexedLog instead of PG->IndexedLog for instance) is introduced : is it optimized ? * rewriting log.log into pg_log.get_log().log affects the readability but should be optimized and have no impact on performances The guidelines followed for this patch are: * const access to the data members are preserved, no attempt is made to define accessors * all non const methods are in PGLog, no access to non const methods of PGLog::log, PGLog::logondisk and PGLog::missing are provided * when methods are moved from PG to PGLog the change to their implementation is restricted to the minimum. * the PG::OndiskLog and PG::IndexedLog sub classes are moved to PGLog sub classes unmodified and remain public A const version of the pg_log_t::find_entry method was added. A const accessor is provided for PGLog::get_log, PGLog::get_missing, PGLog::get_ondisklog but no non-const accessor. Arguments are added to most of the methods moved from PG to PGLog so that they can get access to PG data members such as info or log_oid. The PGLog method are sorted according to the data member they modify. //////////////////// missing //////////////////// * The pg_missing_t::{got,have,need,add,rm} methods are wrapped as PGLog::missing_{got,have,need,add,rm} //////////////////// log //////////////////// * PGLog::get_tail, PGLog::get_head getters are created * PGLog::set_tail, PGLog::set_head, PGLog::set_last_requested setters are created * PGLog::index, PGLog::unindex, PGLog::add wrappers, PGLog::reset_recovery_pointers are created * PGLog::clear_info_log replaces PG::clear_info_log * PGLog::trim replaces PG::trim //////////////////// log & missing //////////////////// * PGLog::claim_log is created with code extracted from PG::RecoveryState::Stray::react. * PGLog::split_into is created with code extracted from PG::split_into. * PGLog::recover_got is created with code extracted from ReplicatedPG::recover_got. * PGLog::activate_not_complete is created with code extracted from PG::active * PGLog:proc_replica_log is created with code extracted from PG::proc_replica_log * PGLog:write_log is created with code extracted from PG::write_log * PGLog::merge_old_entry replaces PG::merge_old_entry The remove_snap argument is used to collect hobject_t * PGLog::rewind_divergent_log replaces PG::rewind_divergent_log The remove_snap argument is used to collect hobject_t A new PG::rewind_divergent_log method is added to call remove_snap_mapped_object on each of the remove_snap elements * PGLog::merge_log replaces PG::merge_log The remove_snap argument is used to collect hobject_t A new PG::merge_log method is added to call remove_snap_mapped_object on each of the remove_snap elements * PGLog:write_log is created with code extracted from PG::write_log. A non-static version is created for convenience but is a simple wrapper. * PGLog:read_log replaces PG::read_log. A non-static version is created for convenience but is a simple wrapper. * PGLog:read_log_old replaces PG::read_log_old. http://tracker.ceph.com/issues/5046 refs #5046 Signed-off-by: Loic Dachary --- src/Makefile.am | 2 + src/osd/OSD.cc | 12 +- src/osd/PG.cc | 987 +++++---------------------------------- src/osd/PG.h | 242 +--------- src/osd/PGLog.cc | 789 +++++++++++++++++++++++++++++++ src/osd/PGLog.h | 391 ++++++++++++++++ src/osd/ReplicatedPG.cc | 202 ++++---- src/osd/ReplicatedPG.h | 2 +- src/osd/osd_types.h | 18 + src/test/encoding/types.h | 4 +- src/test/test_osd_types.cc | 32 ++ src/tools/ceph-filestore-dump.cc | 16 +- 12 files changed, 1472 insertions(+), 1225 deletions(-) create mode 100644 src/osd/PGLog.cc create mode 100644 src/osd/PGLog.h diff --git a/src/Makefile.am b/src/Makefile.am index 5e176874b11..ad80eac4a74 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1497,6 +1497,7 @@ noinst_LIBRARIES += libos.a libosd_a_SOURCES = \ osd/PG.cc \ + osd/PGLog.cc \ osd/ReplicatedPG.cc \ osd/Ager.cc \ osd/OSD.cc \ @@ -1993,6 +1994,7 @@ noinst_HEADERS = \ osd/OpRequest.h\ osd/SnapMapper.h\ osd/PG.h\ + osd/PGLog.h\ osd/ReplicatedPG.h\ osd/Watch.h\ osd/osd_types.h\ diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 0ca3092372f..b6bdf2de409 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1829,7 +1829,7 @@ void OSD::load_pgs() PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0); pg->handle_loaded(&rctx); - dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl; + dout(10) << "load_pgs loaded " << *pg << " " << pg->pg_log.get_log() << dendl; pg->unlock(); } dout(10) << "load_pgs done" << dendl; @@ -3034,7 +3034,7 @@ void OSD::RemoveWQ::_process(pair item) return; ObjectStore::Transaction *t = new ObjectStore::Transaction; - PG::clear_info_log( + PGLog::clear_info_log( pg->info.pgid, OSD::make_infos_oid(), pg->log_oid, @@ -3645,8 +3645,10 @@ void OSD::do_command(Connection *con, tid_t tid, vector& cmd, bufferlist pg->lock(); fout << *pg << std::endl; - std::map::iterator mend = pg->missing.missing.end(); - std::map::iterator mi = pg->missing.missing.begin(); + std::map::const_iterator mend = + pg->pg_log.get_missing().missing.end(); + std::map::const_iterator mi = + pg->pg_log.get_missing().missing.begin(); for (; mi != mend; ++mi) { fout << mi->first << " -> " << mi->second << std::endl; map >::const_iterator mli = @@ -5756,7 +5758,7 @@ void OSD::handle_pg_trim(OpRequestRef op) } else { // primary is instructing us to trim ObjectStore::Transaction *t = new ObjectStore::Transaction; - pg->trim(*t, m->trim_to); + pg->pg_log.trim(*t, m->trim_to, pg->info, pg->log_oid); pg->dirty_info = true; pg->write_if_dirty(*t); int tr = store->queue_transaction(pg->osr.get(), t, diff --git a/src/osd/PG.cc b/src/osd/PG.cc index da6a68ed387..94c10d0ab6e 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -211,33 +211,6 @@ std::string PG::gen_prefix() const return out.str(); } - - -void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s) -{ - if (complete_to != log.end() && - complete_to->version <= s) { - generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version - << " on " << *this << dendl; - } - - set keys_to_rm; - while (!log.empty()) { - pg_log_entry_t &e = *log.begin(); - if (e.version > s) - break; - generic_dout(20) << "trim " << e << dendl; - unindex(e); // remove from index, - keys_to_rm.insert(e.get_key_name()); - log.pop_front(); // from log - } - t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm); - - // raise tail? - if (tail < s) - tail = s; -} - /********* PG **********/ void PG::proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing, int from) @@ -264,106 +237,7 @@ void PG::proc_replica_log(ObjectStore::Transaction& t, dout(10) << "proc_replica_log for osd." << from << ": " << oinfo << " " << olog << " " << omissing << dendl; - /* - basically what we're doing here is rewinding the remote log, - dropping divergent entries, until we find something that matches - our master log. we then reset last_update to reflect the new - point up to which missing is accurate. - - later, in activate(), missing will get wound forward again and - we will send the peer enough log to arrive at the same state. - */ - - for (map::iterator i = omissing.missing.begin(); - i != omissing.missing.end(); - ++i) { - dout(20) << " before missing " << i->first << " need " << i->second.need - << " have " << i->second.have << dendl; - } - - list::const_reverse_iterator pp = olog.log.rbegin(); - eversion_t lu(oinfo.last_update); - while (true) { - if (pp == olog.log.rend()) { - if (pp != olog.log.rbegin()) // no last_update adjustment if we discard nothing! - lu = olog.tail; - break; - } - const pg_log_entry_t& oe = *pp; - - // don't continue past the tail of our log. - if (oe.version <= log.tail) - break; - - if (!log.objects.count(oe.soid)) { - dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl; - ++pp; - continue; - } - - pg_log_entry_t& ne = *log.objects[oe.soid]; - if (ne.version == oe.version) { - dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl; - lu = pp->version; - break; - } - - if (oe.soid > oinfo.last_backfill) { - // past backfill line, don't care - dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl; - ++pp; - continue; - } - - if (ne.version > oe.version) { - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - } else { - if (oe.is_delete()) { - if (ne.is_delete()) { - // old and new are delete - dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl; - } else { - // old delete, new update. - dout(10) << " had " << oe << " new " << ne << " : missing" << dendl; - omissing.add(ne.soid, ne.version, eversion_t()); - } - } else { - if (ne.is_delete()) { - // old update, new delete - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.rm(oe.soid, oe.version); - } else { - // old update, new update - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.revise_need(ne.soid, ne.version); - } - } - } - - ++pp; - } - - if (lu < oinfo.last_update) { - dout(10) << " peer osd." << from << " last_update now " << lu << dendl; - oinfo.last_update = lu; - } - - if (omissing.have_missing()) { - eversion_t first_missing = - omissing.missing[omissing.rmissing.begin()->second].need; - oinfo.last_complete = eversion_t(); - list::const_iterator i = olog.log.begin(); - for (; - i != olog.log.end(); - ++i) { - if (i->version < first_missing) - oinfo.last_complete = i->version; - else - break; - } - } else { - oinfo.last_complete = oinfo.last_update; - } + pg_log.proc_replica_log(t, oinfo, olog, omissing, from); peer_info[from] = oinfo; dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; @@ -429,271 +303,24 @@ void PG::remove_snap_mapped_object( } } - -/* - * merge an old (possibly divergent) log entry into the new log. this - * happens _after_ new log items have been assimilated. thus, we assume - * the index already references newer entries (if present), and missing - * has been updated accordingly. - * - * return true if entry is not divergent. - */ -bool PG::merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe) +void PG::merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from) { - if (oe.soid > info.last_backfill) { - dout(20) << "merge_old_entry had " << oe << " : beyond last_backfill" << dendl; - return false; - } - if (log.objects.count(oe.soid)) { - pg_log_entry_t &ne = *log.objects[oe.soid]; // new(er?) entry - - if (ne.version > oe.version) { - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl; - assert(ne.is_delete() || missing.is_missing(ne.soid)); - return false; - } - if (ne.version == oe.version) { - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : same" << dendl; - return true; - } - if (oe.is_delete()) { - if (ne.is_delete()) { - // old and new are delete - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : both deletes" << dendl; - } else { - // old delete, new update. - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl; - missing.revise_need(ne.soid, ne.version); - } - } else { - if (ne.is_delete()) { - // old update, new delete - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl; - missing.rm(oe.soid, oe.version); - } else { - // old update, new update - dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl; - missing.revise_need(ne.soid, ne.version); - } - } - } else if (oe.op == pg_log_entry_t::CLONE) { - assert(oe.soid.snap != CEPH_NOSNAP); - dout(20) << "merge_old_entry had " << oe - << ", clone with no non-divergent log entries, " - << "deleting" << dendl; - remove_snap_mapped_object(t, oe.soid); - if (missing.is_missing(oe.soid)) - missing.rm(oe.soid, missing.missing[oe.soid].need); - } else if (oe.prior_version > info.log_tail) { - /** - * oe.prior_version is a previously divergent log entry - * oe.soid must have already been handled and the missing - * set updated appropriately - */ - dout(20) << "merge_old_entry had oe " << oe - << " with divergent prior_version " << oe.prior_version - << " oe.soid " << oe.soid - << " must already have been merged" << dendl; - } else { - if (!oe.is_delete()) { - dout(20) << "merge_old_entry had " << oe << " deleting" << dendl; - remove_snap_mapped_object(t, oe.soid); - } - dout(20) << "merge_old_entry had " << oe << " updating missing to " - << oe.prior_version << dendl; - if (oe.prior_version > eversion_t()) { - ondisklog.add_divergent_prior(oe.prior_version, oe.soid); - dirty_log = true; - missing.revise_need(oe.soid, oe.prior_version); - } else if (missing.is_missing(oe.soid)) { - missing.rm(oe.soid, missing.missing[oe.soid].need); - } - } - return false; + list to_remove; + pg_log.merge_log(t, oinfo, olog, from, info, to_remove, dirty_log, dirty_info, dirty_big_info); + for(list::iterator i = to_remove.begin(); + i != to_remove.end(); + i++) + remove_snap_mapped_object(t, *i); } -/** - * rewind divergent entries at the head of the log - * - * This rewinds entries off the head of our log that are divergent. - * This is used by replicas during activation. - * - * @param t transaction - * @param newhead new head to rewind to - */ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead) { - dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl; - assert(newhead > log.tail); - - list::iterator p = log.log.end(); - list divergent; - while (true) { - if (p == log.log.begin()) { - // yikes, the whole thing is divergent! - divergent.swap(log.log); - break; - } - --p; - if (p->version == newhead) { - ++p; - divergent.splice(divergent.begin(), log.log, p, log.log.end()); - break; - } - assert(p->version > newhead); - dout(10) << "rewind_divergent_log future divergent " << *p << dendl; - log.unindex(*p); - } - - log.head = newhead; - info.last_update = newhead; - if (info.last_complete > newhead) - info.last_complete = newhead; - - for (list::iterator d = divergent.begin(); d != divergent.end(); ++d) - merge_old_entry(t, *d); - - dirty_info = true; - dirty_big_info = true; - dirty_log = true; -} - -void PG::merge_log(ObjectStore::Transaction& t, - pg_info_t &oinfo, pg_log_t &olog, int fromosd) -{ - dout(10) << "merge_log " << olog << " from osd." << fromosd - << " into " << log << dendl; - - // Check preconditions - - // If our log is empty, the incoming log needs to have not been trimmed. - assert(!log.null() || olog.tail == eversion_t()); - // The logs must overlap. - assert(log.head >= olog.tail && olog.head >= log.tail); - - for (map::iterator i = missing.missing.begin(); - i != missing.missing.end(); - ++i) { - dout(20) << "pg_missing_t sobject: " << i->first << dendl; - } - - bool changed = false; - - // extend on tail? - // this is just filling in history. it does not affect our - // missing set, as that should already be consistent with our - // current log. - if (olog.tail < log.tail) { - dout(10) << "merge_log extending tail to " << olog.tail << dendl; - list::iterator from = olog.log.begin(); - list::iterator to; - for (to = from; - to != olog.log.end(); - ++to) { - if (to->version > log.tail) - break; - log.index(*to); - dout(15) << *to << dendl; - } - assert(to != olog.log.end() || - (olog.head == info.last_update)); - - // splice into our log. - log.log.splice(log.log.begin(), - olog.log, from, to); - - info.log_tail = log.tail = olog.tail; - changed = true; - } - - if (oinfo.stats.reported < info.stats.reported) // make sure reported always increases - oinfo.stats.reported = info.stats.reported; - if (info.last_backfill.is_max()) - info.stats = oinfo.stats; - - // do we have divergent entries to throw out? - if (olog.head < log.head) { - rewind_divergent_log(t, olog.head); - changed = true; - } - - // extend on head? - if (olog.head > log.head) { - dout(10) << "merge_log extending head to " << olog.head << dendl; - - // find start point in olog - list::iterator to = olog.log.end(); - list::iterator from = olog.log.end(); - eversion_t lower_bound = olog.tail; - while (1) { - if (from == olog.log.begin()) - break; - --from; - dout(20) << " ? " << *from << dendl; - if (from->version <= log.head) { - dout(20) << "merge_log cut point (usually last shared) is " << *from << dendl; - lower_bound = from->version; - ++from; - break; - } - } - - // index, update missing, delete deleted - for (list::iterator p = from; p != to; ++p) { - pg_log_entry_t &ne = *p; - dout(20) << "merge_log " << ne << dendl; - log.index(ne); - if (ne.soid <= info.last_backfill) { - missing.add_next_event(ne); - if (ne.is_delete()) - remove_snap_mapped_object(t, ne.soid); - } - } - - // move aside divergent items - list divergent; - while (!log.empty()) { - pg_log_entry_t &oe = *log.log.rbegin(); - /* - * look at eversion.version here. we want to avoid a situation like: - * our log: 100'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 - * new log: 122'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 - * lower_bound = 100'9 - * i.e, same request, different version. If the eversion.version is > the - * lower_bound, we it is divergent. - */ - if (oe.version.version <= lower_bound.version) - break; - dout(10) << "merge_log divergent " << oe << dendl; - divergent.push_front(oe); - log.unindex(oe); - log.log.pop_back(); - } - - // splice - log.log.splice(log.log.end(), - olog.log, from, to); - log.index(); - - info.last_update = log.head = olog.head; - info.purged_snaps = oinfo.purged_snaps; - - // process divergent items - if (!divergent.empty()) { - for (list::iterator d = divergent.begin(); d != divergent.end(); ++d) - merge_old_entry(t, *d); - } - - changed = true; - } - - dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl; - - if (changed) { - dirty_info = true; - dirty_big_info = true; - dirty_log = true; - } + list to_remove; + pg_log.rewind_divergent_log(t, newhead, info, to_remove, dirty_log, dirty_info, dirty_big_info); + for(list::iterator i = to_remove.begin(); + i != to_remove.end(); + i++) + remove_snap_mapped_object(t, *i); } /* @@ -714,8 +341,8 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing peer_missing[fromosd]; // found items? - for (map::iterator p = missing.missing.begin(); - p != missing.missing.end(); + for (map::const_iterator p = pg_log.get_missing().missing.begin(); + p != pg_log.get_missing().missing.end(); ++p) { const hobject_t &soid(p->first); eversion_t need = p->second.need; @@ -775,12 +402,13 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing publish_stats_to_osd(); } - dout(20) << "search_for_missing missing " << missing.missing << dendl; + dout(20) << "search_for_missing missing " << pg_log.get_missing().missing << dendl; return found_missing; } void PG::discover_all_missing(map< int, map > &query_map) { + const pg_missing_t &missing = pg_log.get_missing(); assert(missing.have_missing()); dout(10) << __func__ << " " @@ -826,23 +454,6 @@ void PG::discover_all_missing(map< int, map > &query_map) } } - -ostream& PG::IndexedLog::print(ostream& out) const -{ - out << *this << std::endl; - for (list::const_iterator p = log.begin(); - p != log.end(); - ++p) { - out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl; - assert(!p->reqid_is_indexed() || logged_req(p->reqid)); - } - return out; -} - - - - - /******* PG ***********/ bool PG::needs_recovery() const { @@ -850,6 +461,8 @@ bool PG::needs_recovery() const bool ret = false; + const pg_missing_t &missing = pg_log.get_missing(); + if (missing.num_missing()) { dout(10) << __func__ << " primary has " << missing.num_missing() << dendl; ret = true; @@ -1120,7 +733,7 @@ void PG::clear_primary_state() missing_loc.clear(); missing_loc_sources.clear(); - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); scrubber.reserved_peers.clear(); scrub_after_recovery = false; @@ -1486,6 +1099,8 @@ void PG::activate(ObjectStore::Transaction& t, info.last_epoch_started = query_epoch; + const pg_missing_t &missing = pg_log.get_missing(); + if (is_primary()) { // If necessary, create might_have_unfound to help us find our unfound objects. // NOTE: It's important that we build might_have_unfound before trimming the @@ -1530,24 +1145,10 @@ void PG::activate(ObjectStore::Transaction& t, dout(10) << "activate - no missing, moving last_complete " << info.last_complete << " -> " << info.last_update << dendl; info.last_complete = info.last_update; - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); } else { dout(10) << "activate - not complete, " << missing << dendl; - log.complete_to = log.log.begin(); - while (log.complete_to->version < - missing.missing[missing.rmissing.begin()->second].need) - log.complete_to++; - assert(log.complete_to != log.log.end()); - if (log.complete_to == log.log.begin()) { - info.last_complete = eversion_t(); - } else { - log.complete_to--; - info.last_complete = log.complete_to->version; - log.complete_to++; - } - log.last_requested = 0; - dout(10) << "activate - complete_to = " << log.complete_to->version - << dendl; + pg_log.activate_not_complete(info); if (is_primary()) { dout(10) << "activate - starting recovery" << dendl; osd->queue_for_recovery(this); @@ -1592,7 +1193,7 @@ void PG::activate(ObjectStore::Transaction& t, dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; m = new MOSDPGLog(get_osdmap()->get_epoch(), info); } - } else if (log.tail > pi.last_update || pi.last_backfill == hobject_t()) { + } else if (pg_log.get_tail() > pi.last_update || pi.last_backfill == hobject_t()) { // backfill osd->clog.info() << info.pgid << " restarting backfill on osd." << peer << " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill @@ -1607,17 +1208,17 @@ void PG::activate(ObjectStore::Transaction& t, m = new MOSDPGLog(get_osdmap()->get_epoch(), pi); // send some recent log, so that op dup detection works well. - m->log.copy_up_to(log, g_conf->osd_min_pg_log_entries); + m->log.copy_up_to(pg_log.get_log(), g_conf->osd_min_pg_log_entries); m->info.log_tail = m->log.tail; pi.log_tail = m->log.tail; // sigh... pm.clear(); } else { // catch up - assert(log.tail <= pi.last_update); + assert(pg_log.get_tail() <= pi.last_update); m = new MOSDPGLog(get_osdmap()->get_epoch(), info); // send new stuff to append to replicas log - m->log.copy_after(log, pi.last_update); + m->log.copy_after(pg_log.get_log(), pi.last_update); } // share past_intervals if we are creating the pg on the replica @@ -2047,47 +1648,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) osd->osd->finish_recovery_op(this, soid, dequeue); } -void PG::IndexedLog::split_into( - pg_t child_pgid, - unsigned split_bits, - PG::IndexedLog *olog) -{ - list oldlog; - oldlog.swap(log); - - eversion_t old_tail; - olog->head = head; - olog->tail = tail; - unsigned mask = ~((~0)<::iterator i = oldlog.begin(); - i != oldlog.end(); - ) { - if ((i->soid.hash & mask) == child_pgid.m_seed) { - olog->log.push_back(*i); - if (log.empty()) - tail = i->version; - } else { - log.push_back(*i); - if (olog->empty()) - olog->tail = i->version; - } - oldlog.erase(i++); - } - - if (log.empty()) - tail = head; - else - head = log.rbegin()->version; - - if (olog->empty()) - olog->tail = olog->head; - else - olog->head = olog->log.rbegin()->version; - - olog->index(); - index(); -} - static void split_list( list *from, list *to, @@ -2149,22 +1709,19 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) child->pool = pool; // Log - log.split_into(child_pgid, split_bits, &(child->log)); + pg_log.split_into(child_pgid, split_bits, &(child->pg_log)); child->info.last_complete = info.last_complete; - info.last_update = log.head; - child->info.last_update = child->log.head; - - info.log_tail = log.tail; - child->info.log_tail = child->log.tail; + info.last_update = pg_log.get_head(); + child->info.last_update = child->pg_log.get_head(); - if (info.last_complete < log.tail) - info.last_complete = log.tail; - if (child->info.last_complete < child->log.tail) - child->info.last_complete = child->log.tail; + info.log_tail = pg_log.get_tail(); + child->info.log_tail = child->pg_log.get_tail(); - // Missing - missing.split_into(child_pgid, split_bits, &(child->missing)); + if (info.last_complete < pg_log.get_tail()) + info.last_complete = pg_log.get_tail(); + if (child->info.last_complete < child->pg_log.get_tail()) + child->info.last_complete = child->pg_log.get_tail(); // Info child->info.history = info.history; @@ -2201,7 +1758,7 @@ void PG::clear_recovery_state() { dout(10) << "clear_recovery_state" << dendl; - log.reset_recovery_pointers(); + pg_log.reset_recovery_pointers(); finish_sync_event = 0; hobject_t soid; @@ -2339,10 +1896,10 @@ void PG::publish_stats_to_osd() info.stats.last_active = now; info.stats.last_unstale = now; - info.stats.log_size = ondisklog.length(); - info.stats.ondisk_log_size = ondisklog.length(); - info.stats.log_start = log.tail; - info.stats.ondisk_log_start = log.tail; + info.stats.log_size = pg_log.get_ondisklog().length(); + info.stats.ondisk_log_size = pg_log.get_ondisklog().length(); + info.stats.log_start = pg_log.get_tail(); + info.stats.ondisk_log_start = pg_log.get_tail(); pg_stats_publish_valid = true; pg_stats_publish = info.stats; @@ -2364,8 +1921,9 @@ void PG::publish_stats_to_osd() degraded += (target - acting.size()) * num_objects; // missing on primary - pg_stats_publish.stats.sum.num_objects_missing_on_primary = missing.num_missing(); - degraded += missing.num_missing(); + pg_stats_publish.stats.sum.num_objects_missing_on_primary = + pg_log.get_missing().num_missing(); + degraded += pg_log.get_missing().num_missing(); for (unsigned i=1; i keys_to_remove; - keys_to_remove.insert(get_epoch_key(pgid)); - keys_to_remove.insert(get_biginfo_key(pgid)); - keys_to_remove.insert(get_info_key(pgid)); - - t->remove(coll_t::META_COLL, log_oid); - t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove); -} - epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid, bufferlist *bl) { assert(bl); @@ -2695,30 +2238,9 @@ epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid return cur_epoch; } -void PG::_write_log(ObjectStore::Transaction& t, pg_log_t &log, - const hobject_t &log_oid, map &divergent_priors) -{ - //dout(10) << "write_log" << dendl; - t.remove(coll_t::META_COLL, log_oid); - t.touch(coll_t::META_COLL, log_oid); - map keys; - for (list::iterator p = log.log.begin(); - p != log.log.end(); - ++p) { - bufferlist bl(sizeof(*p) * 2); - p->encode_with_checksum(bl); - keys[p->get_key_name()].claim(bl); - } - //dout(10) << "write_log " << keys.size() << " keys" << dendl; - - ::encode(divergent_priors, keys["divergent_priors"]); - - t.omap_setkeys(coll_t::META_COLL, log_oid, keys); -} - void PG::write_log(ObjectStore::Transaction& t) { - _write_log(t, log, log_oid, ondisklog.divergent_priors); + pg_log.write_log(t, log_oid); dirty_log = false; } @@ -2730,23 +2252,6 @@ void PG::write_if_dirty(ObjectStore::Transaction& t) write_log(t); } -void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to) -{ - // trim? - if (trim_to > log.tail) { - /* If we are trimming, we must be complete up to trim_to, time - * to throw out any divergent_priors - */ - ondisklog.divergent_priors.clear(); - // We shouldn't be trimming the log past last_complete - assert(trim_to <= info.last_complete); - - dout(10) << "trim " << log << " to " << trim_to << dendl; - log.trim(t, log_oid, trim_to); - info.log_tail = log.tail; - } -} - void PG::trim_peers() { calc_trim_to(); @@ -2771,7 +2276,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl) info.last_update = e.version; // log mutation - log.add(e); + pg_log.add(e); dout(10) << "add_log_entry " << e << dendl; e.encode_with_checksum(log_bl); @@ -2781,7 +2286,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl) void PG::append_log( vector& logv, eversion_t trim_to, ObjectStore::Transaction &t) { - dout(10) << "append_log " << log << " " << logv << dendl; + dout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl; map keys; for (vector::iterator p = logv.begin(); @@ -2794,7 +2299,7 @@ void PG::append_log( dout(10) << "append_log adding " << keys.size() << " keys" << dendl; t.omap_setkeys(coll_t::META_COLL, log_oid, keys); - trim(t, trim_to); + pg_log.trim(t, trim_to, info, log_oid); // update the local pg, pg log dirty_info = true; @@ -2803,7 +2308,7 @@ void PG::append_log( bool PG::check_log_for_corruption(ObjectStore *store) { - OndiskLog bounds; + PGLog::OndiskLog bounds; bufferlist blb; store->collection_getattr(coll, "ondisklog", blb); bufferlist::iterator p = blb.begin(); @@ -2958,14 +2463,14 @@ void PG::read_state(ObjectStore *store, bufferlist &bl) assert(r >= 0); ostringstream oss; - if (read_log( + if (pg_log.read_log( store, coll, log_oid, info, - ondisklog, log, missing, oss, this)) { + oss)) { /* We don't want to leave the old format around in case the next log * write happens to be an append_log() */ ObjectStore::Transaction t; - write_log(t); + pg_log.write_log(t, log_oid); int r = osd->store->apply_transaction(t); assert(!r); } @@ -2978,36 +2483,40 @@ void PG::read_state(ObjectStore *store, bufferlist &bl) void PG::log_weirdness() { - if (log.tail != info.log_tail) + if (pg_log.get_tail() != info.log_tail) osd->clog.error() << info.pgid - << " info mismatch, log.tail " << log.tail + << " info mismatch, log.tail " << pg_log.get_tail() << " != info.log_tail " << info.log_tail << "\n"; - if (log.head != info.last_update) + if (pg_log.get_head() != info.last_update) osd->clog.error() << info.pgid - << " info mismatch, log.head " << log.head + << " info mismatch, log.head " << pg_log.get_head() << " != info.last_update " << info.last_update << "\n"; - if (log.empty()) { + if (pg_log.get_log().empty()) { // shoudl it be? - if (log.head != log.tail) + if (pg_log.get_head() != pg_log.get_tail()) osd->clog.error() << info.pgid - << " log bound mismatch, empty but (" << log.tail << "," << log.head << "]\n"; + << " log bound mismatch, empty but (" << pg_log.get_tail() << "," + << pg_log.get_head() << "]\n"; } else { - if ((log.log.begin()->version <= log.tail) || // sloppy check - (log.log.rbegin()->version != log.head && !(log.head == log.tail))) + if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()) || // sloppy check + (pg_log.get_log().log.rbegin()->version != pg_log.get_head() && + !(pg_log.get_head() == pg_log.get_tail()))) osd->clog.error() << info.pgid - << " log bound mismatch, info (" << log.tail << "," << log.head << "]" + << " log bound mismatch, info (" << pg_log.get_tail() << "," + << pg_log.get_head() << "]" << " actual [" - << log.log.begin()->version << "," << log.log.rbegin()->version << "]" + << pg_log.get_log().log.begin()->version << "," + << pg_log.get_log().log.rbegin()->version << "]" << "\n"; } - if (log.caller_ops.size() > log.log.size()) { + if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) { osd->clog.error() << info.pgid - << " caller_ops.size " << log.caller_ops.size() - << " > log size " << log.log.size() + << " caller_ops.size " << pg_log.get_log().caller_ops.size() + << " > log size " << pg_log.get_log().log.size() << "\n"; } } @@ -3691,17 +3200,17 @@ void PG::build_inc_scrub_map( map.valid_through = last_update_applied; map.incr_since = v; vector ls; - list::iterator p; - if (v == log.tail) { - p = log.log.begin(); - } else if (v > log.tail) { - p = log.find_entry(v); + list::const_iterator p; + if (v == pg_log.get_tail()) { + p = pg_log.get_log().log.begin(); + } else if (v > pg_log.get_tail()) { + p = pg_log.get_log().find_entry(v); ++p; } else { assert(0); } - for (; p != log.log.end(); ++p) { + for (; p != pg_log.get_log().log.end(); ++p) { if (p->is_update()) { ls.push_back(p->soid); map.objects[p->soid].negative = false; @@ -3731,11 +3240,11 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer // We should only be scrubbing if the PG is clean. assert(waiting_for_missing_object.empty()); - missing.add(soid, oi.version, eversion_t()); + pg_log.missing_add(soid, oi.version, eversion_t()); missing_loc[soid].insert(ok_peer); missing_loc_sources.insert(ok_peer); - log.last_requested = 0; + pg_log.set_last_requested(0); } } @@ -4010,7 +3519,7 @@ void PG::classic_scrub(ThreadPool::TPHandle &handle) return; } - if (scrubber.primary_scrubmap.valid_through != log.head) { + if (scrubber.primary_scrubmap.valid_through != pg_log.get_head()) { ScrubMap incr; build_inc_scrub_map(incr, scrubber.primary_scrubmap.valid_through, handle); scrubber.primary_scrubmap.merge_incr(incr); @@ -4179,9 +3688,9 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) scrubber.block_writes = true; // walk the log to find the latest update that affects our chunk - scrubber.subset_last_update = log.tail; - for (list::iterator p = log.log.begin(); - p != log.log.end(); + scrubber.subset_last_update = pg_log.get_tail(); + for (list::const_iterator p = pg_log.get_log().log.begin(); + p != pg_log.get_log().log.end(); ++p) { if (p->soid >= scrubber.start && p->soid < scrubber.end) scrubber.subset_last_update = p->version; @@ -4324,7 +3833,7 @@ bool PG::scrub_gather_replica_maps() p != scrubber.received_maps.end(); ++p) { - if (scrubber.received_maps[p->first].valid_through != log.head) { + if (scrubber.received_maps[p->first].valid_through != pg_log.get_head()) { scrubber.waiting_on++; scrubber.waiting_on_whom.insert(p->first); // Need to request another incremental map @@ -4831,7 +4340,7 @@ void PG::share_pg_log() pg_info_t& pinfo(peer_info[peer]); MOSDPGLog *m = new MOSDPGLog(info.last_update.epoch, info); - m->log.copy_after(log, pinfo.last_update); + m->log.copy_after(pg_log.get_log(), pinfo.last_update); for (list::const_iterator i = m->log.log.begin(); i != m->log.log.end(); @@ -4871,23 +4380,23 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) MOSDPGLog *mlog = new MOSDPGLog(get_osdmap()->get_epoch(), info, query_epoch); - mlog->missing = missing; + mlog->missing = pg_log.get_missing(); // primary -> other, when building master log if (query.type == pg_query_t::LOG) { dout(10) << " sending info+missing+log since " << query.since << dendl; - if (query.since != eversion_t() && query.since < log.tail) { + if (query.since != eversion_t() && query.since < pg_log.get_tail()) { osd->clog.error() << info.pgid << " got broken pg_query_t::LOG since " << query.since - << " when my log.tail is " << log.tail + << " when my log.tail is " << pg_log.get_tail() << ", sending full log instead\n"; - mlog->log = log; // primary should not have requested this!! + mlog->log = pg_log.get_log(); // primary should not have requested this!! } else - mlog->log.copy_after(log, query.since); + mlog->log.copy_after(pg_log.get_log(), query.since); } else if (query.type == pg_query_t::FULLLOG) { dout(10) << " sending info+missing+full log" << dendl; - mlog->log = log; + mlog->log = pg_log.get_log(); } dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; @@ -5259,21 +4768,22 @@ ostream& operator<<(ostream& out, const PG& pg) if (pg.recovery_ops_active) out << " rops=" << pg.recovery_ops_active; - if (pg.log.tail != pg.info.log_tail || - pg.log.head != pg.info.last_update) - out << " (info mismatch, " << pg.log << ")"; + if (pg.pg_log.get_tail() != pg.info.log_tail || + pg.pg_log.get_head() != pg.info.last_update) + out << " (info mismatch, " << pg.pg_log.get_log() << ")"; - if (pg.log.empty()) { + if (pg.pg_log.get_log().empty()) { // shoudl it be? - if (pg.log.head.version - pg.log.tail.version != 0) { + if (pg.pg_log.get_head().version - pg.pg_log.get_tail().version != 0) { out << " (log bound mismatch, empty)"; } } else { - if ((pg.log.log.begin()->version <= pg.log.tail) || // sloppy check - (pg.log.log.rbegin()->version != pg.log.head && !(pg.log.head == pg.log.tail))) { + if ((pg.pg_log.get_log().log.begin()->version <= pg.pg_log.get_tail()) || // sloppy check + (pg.pg_log.get_log().log.rbegin()->version != pg.pg_log.get_head() && + !(pg.pg_log.get_head() == pg.pg_log.get_tail()))) { out << " (log bound mismatch, actual=[" - << pg.log.log.begin()->version << "," - << pg.log.log.rbegin()->version << "]"; + << pg.pg_log.get_log().log.begin()->version << "," + << pg.pg_log.get_log().log.rbegin()->version << "]"; //out << "len=" << pg.log.log.size(); out << ")"; } @@ -5300,9 +4810,9 @@ ostream& operator<<(ostream& out, const PG& pg) if (pg.scrubber.must_scrub) out << " MUST_SCRUB"; - //out << " (" << pg.log.tail << "," << pg.log.head << "]"; - if (pg.missing.num_missing()) { - out << " m=" << pg.missing.num_missing(); + //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]"; + if (pg.pg_log.get_missing().num_missing()) { + out << " m=" << pg.pg_log.get_missing().num_missing(); if (pg.is_primary()) { int unfound = pg.get_num_unfound(); if (unfound) @@ -5601,265 +5111,6 @@ std::ostream& operator<<(std::ostream& oss, return oss; } -/*---------------------------------------------------*/ -// Handle staitc function so it can use dout() -#undef dout_prefix -#define dout_prefix if (passedpg) _prefix(_dout, passedpg) - -bool PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg) -{ - dout(10) << "read_log" << dendl; - bool rewrite_log = false; - - // legacy? - struct stat st; - int r = store->stat(coll_t::META_COLL, log_oid, &st); - assert(r == 0); - if (st.st_size > 0) { - read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg); - rewrite_log = true; - } else { - log.tail = info.log_tail; - ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid); - if (p) for (p->seek_to_first(); p->valid() ; p->next()) { - bufferlist bl = p->value();//Copy bufferlist before creating iterator - bufferlist::iterator bp = bl.begin(); - if (p->key() == "divergent_priors") { - ::decode(ondisklog.divergent_priors, bp); - dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl; - } else { - pg_log_entry_t e; - e.decode_with_checksum(bp); - dout(20) << "read_log " << e << dendl; - if (!log.log.empty()) { - pg_log_entry_t last_e(log.log.back()); - assert(last_e.version.version < e.version.version); - assert(last_e.version.epoch <= e.version.epoch); - } - log.log.push_back(e); - log.head = e.version; - } - } - } - log.head = info.last_update; - log.index(); - - // build missing - if (info.last_complete < info.last_update) { - dout(10) << "read_log checking for missing items over interval (" << info.last_complete - << "," << info.last_update << "]" << dendl; - - set did; - for (list::reverse_iterator i = log.log.rbegin(); - i != log.log.rend(); - ++i) { - if (i->version <= info.last_complete) break; - if (did.count(i->soid)) continue; - did.insert(i->soid); - - if (i->is_delete()) continue; - - bufferlist bv; - int r = store->getattr(coll, i->soid, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - if (oi.version < i->version) { - dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl; - missing.add(i->soid, i->version, oi.version); - } - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->soid, i->version, eversion_t()); - } - } - for (map::reverse_iterator i = - ondisklog.divergent_priors.rbegin(); - i != ondisklog.divergent_priors.rend(); - ++i) { - if (i->first <= info.last_complete) break; - if (did.count(i->second)) continue; - did.insert(i->second); - bufferlist bv; - int r = store->getattr(coll, i->second, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - /** - * 1) we see this entry in the divergent priors mapping - * 2) we didn't see an entry for this object in the log - * - * From 1 & 2 we know that either the object does not exist - * or it is at the version specified in the divergent_priors - * map since the object would have been deleted atomically - * with the addition of the divergent_priors entry, an older - * version would not have been recovered, and a newer version - * would show up in the log above. - */ - assert(oi.version == i->first); - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->second, i->first, eversion_t()); - } - } - } - dout(10) << "read_log done" << dendl; - return rewrite_log; -} - -void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg) -{ - // load bounds, based on old OndiskLog encoding. - uint64_t ondisklog_tail = 0; - uint64_t ondisklog_head = 0; - uint64_t ondisklog_zero_to; - bool ondisklog_has_checksums; - - bufferlist blb; - store->collection_getattr(coll, "ondisklog", blb); - { - bufferlist::iterator bl = blb.begin(); - DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); - ondisklog_has_checksums = (struct_v >= 2); - ::decode(ondisklog_tail, bl); - ::decode(ondisklog_head, bl); - if (struct_v >= 4) - ::decode(ondisklog_zero_to, bl); - else - ondisklog_zero_to = 0; - if (struct_v >= 5) - ::decode(ondisklog.divergent_priors, bl); - DECODE_FINISH(bl); - } - uint64_t ondisklog_length = ondisklog_head - ondisklog_tail; - dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl; - - log.tail = info.log_tail; - - // In case of sobject_t based encoding, may need to list objects in the store - // to find hashes - vector ls; - - if (ondisklog_head > 0) { - // read - bufferlist bl; - store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl); - if (bl.length() < ondisklog_length) { - std::ostringstream oss; - oss << "read_log got " << bl.length() << " bytes, expected " - << ondisklog_head << "-" << ondisklog_tail << "=" - << ondisklog_length; - throw read_log_error(oss.str().c_str()); - } - - pg_log_entry_t e; - bufferlist::iterator p = bl.begin(); - assert(log.empty()); - eversion_t last; - bool reorder = false; - bool listed_collection = false; - - while (!p.end()) { - uint64_t pos = ondisklog_tail + p.get_off(); - if (ondisklog_has_checksums) { - bufferlist ebl; - ::decode(ebl, p); - __u32 crc; - ::decode(crc, p); - - __u32 got = ebl.crc32c(0); - if (crc == got) { - bufferlist::iterator q = ebl.begin(); - ::decode(e, q); - } else { - std::ostringstream oss; - oss << "read_log " << pos << " bad crc got " << got << " expected" << crc; - throw read_log_error(oss.str().c_str()); - } - } else { - ::decode(e, p); - } - dout(20) << "read_log " << pos << " " << e << dendl; - - // [repair] in order? - if (e.version < last) { - dout(0) << "read_log " << pos << " out of order entry " << e << " follows " << last << dendl; - oss << info.pgid << " log has out of order entry " - << e << " following " << last << "\n"; - reorder = true; - } - - if (e.version <= log.tail) { - dout(20) << "read_log ignoring entry at " << pos << " below log.tail" << dendl; - continue; - } - if (last.version == e.version.version) { - dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl; - log.log.pop_back(); - oss << info.pgid << " read_log got dup " - << e.version << " after " << last << "\n"; - } - - if (e.invalid_hash) { - // We need to find the object in the store to get the hash - if (!listed_collection) { - store->collection_list(coll, ls); - listed_collection = true; - } - bool found = false; - for (vector::iterator i = ls.begin(); - i != ls.end(); - ++i) { - if (i->oid == e.soid.oid && i->snap == e.soid.snap) { - e.soid = *i; - found = true; - break; - } - } - if (!found) { - // Didn't find the correct hash - std::ostringstream oss; - oss << "Could not find hash for hoid " << e.soid << std::endl; - throw read_log_error(oss.str().c_str()); - } - } - - if (e.invalid_pool) { - e.soid.pool = info.pgid.pool(); - } - - e.offset = pos; - uint64_t endpos = ondisklog_tail + p.get_off(); - log.log.push_back(e); - last = e.version; - - // [repair] at end of log? - if (!p.end() && e.version == info.last_update) { - oss << info.pgid << " log has extra data at " - << endpos << "~" << (ondisklog_head-endpos) << " after " - << info.last_update << "\n"; - - dout(0) << "read_log " << endpos << " *** extra gunk at end of log, " - << "adjusting ondisklog_head" << dendl; - ondisklog_head = endpos; - break; - } - } - - if (reorder) { - dout(0) << "read_log reordering log" << dendl; - map m; - for (list::iterator p = log.log.begin(); p != log.log.end(); ++p) - m[p->version] = *p; - log.log.clear(); - for (map::iterator p = m.begin(); p != m.end(); ++p) - log.log.push_back(p->second); - } - } -} - /*------------ Recovery State Machine----------------*/ #undef dout_prefix #define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() \ @@ -6539,7 +5790,7 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx) void PG::RecoveryState::Recovering::release_reservations() { PG *pg = context< RecoveryMachine >().pg; - assert(!pg->missing.have_missing()); + assert(!pg->pg_log.get_missing().have_missing()); pg->state_clear(PG_STATE_RECOVERING); // release remote reservations @@ -6718,7 +5969,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&) if (g_conf->osd_check_for_log_corruption) pg->check_log_for_corruption(pg->osd->store); - int unfound = pg->missing.num_missing() - pg->missing_loc.size(); + int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size(); if (unfound > 0 && pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) { if (g_conf->osd_auto_mark_unfound_lost) { @@ -6917,10 +6168,9 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const MLogRec& { PG *pg = context< RecoveryMachine >().pg; dout(10) << "received log from " << logevt.from << dendl; - pg->merge_log(*context().get_cur_transaction(), - logevt.msg->info, logevt.msg->log, logevt.from); - - assert(pg->log.head == pg->info.last_update); + ObjectStore::Transaction* t = context().get_cur_transaction(); + pg->merge_log(*t,logevt.msg->info, logevt.msg->log, logevt.from); + assert(pg->pg_log.get_head() == pg->info.last_update); return discard_event(); } @@ -6994,14 +6244,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt) pg->dirty_info = true; pg->dirty_big_info = true; // maybe. pg->dirty_log = true; - pg->log.claim_log(msg->log); - pg->missing.clear(); + pg->pg_log.claim_log(msg->log); } else { - pg->merge_log(*context().get_cur_transaction(), - msg->info, msg->log, logevt.from); + ObjectStore::Transaction* t = context().get_cur_transaction(); + pg->merge_log(*t, msg->info, msg->log, logevt.from); } - assert(pg->log.head == pg->info.last_update); + assert(pg->pg_log.get_head() == pg->info.last_update); post_event(Activate(logevt.msg->get_epoch())); return transit(); @@ -7014,13 +6263,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoev if (pg->info.last_update > infoevt.info.last_update) { // rewind divergent log entries - pg->rewind_divergent_log(*context< RecoveryMachine >().get_cur_transaction(), - infoevt.info.last_update); + ObjectStore::Transaction* t = context().get_cur_transaction(); + pg->rewind_divergent_log(*t, infoevt.info.last_update); pg->info.stats = infoevt.info.stats; } assert(infoevt.info.last_update == pg->info.last_update); - assert(pg->log.head == pg->info.last_update); + assert(pg->pg_log.get_head() == pg->info.last_update); post_event(Activate(infoevt.msg_epoch)); return transit(); @@ -7451,7 +6700,7 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx) if (pi.is_empty()) continue; // no pg data, nothing divergent - if (pi.last_update < pg->log.tail) { + if (pi.last_update < pg->pg_log.get_tail()) { dout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl; pg->peer_missing[*i]; continue; diff --git a/src/osd/PG.h b/src/osd/PG.h index 8d8ad5c4c45..134f5ec470f 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -36,6 +36,7 @@ #include "include/atomic.h" #include "SnapMapper.h" +#include "PGLog.h" #include "OpRequest.h" #include "OSDMap.h" #include "os/ObjectStore.h" @@ -155,212 +156,8 @@ struct PGPool { class PG { public: - /* Exceptions */ - class read_log_error : public buffer::error { - public: - explicit read_log_error(const char *what) { - snprintf(buf, sizeof(buf), "read_log_error: %s", what); - } - const char *what() const throw () { - return buf; - } - private: - char buf[512]; - }; - std::string gen_prefix() const; - - /** - * IndexLog - adds in-memory index of the log, by oid. - * plus some methods to manipulate it all. - */ - struct IndexedLog : public pg_log_t { - hash_map objects; // ptrs into log. be careful! - hash_map caller_ops; - - // recovery pointers - list::iterator complete_to; // not inclusive of referenced item - version_t last_requested; // last object requested by primary - - /****/ - IndexedLog() : last_requested(0) {} - - void claim_log(const pg_log_t& o) { - log = o.log; - head = o.head; - tail = o.tail; - index(); - } - - void split_into( - pg_t child_pgid, - unsigned split_bits, - IndexedLog *olog); - - void zero() { - unindex(); - pg_log_t::clear(); - reset_recovery_pointers(); - } - void reset_recovery_pointers() { - complete_to = log.end(); - last_requested = 0; - } - - bool logged_object(const hobject_t& oid) const { - return objects.count(oid); - } - bool logged_req(const osd_reqid_t &r) const { - return caller_ops.count(r); - } - eversion_t get_request_version(const osd_reqid_t &r) const { - hash_map::const_iterator p = caller_ops.find(r); - if (p == caller_ops.end()) - return eversion_t(); - return p->second->version; - } - - void index() { - objects.clear(); - caller_ops.clear(); - for (list::iterator i = log.begin(); - i != log.end(); - ++i) { - objects[i->soid] = &(*i); - if (i->reqid_is_indexed()) { - //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old - caller_ops[i->reqid] = &(*i); - } - } - } - - void index(pg_log_entry_t& e) { - if (objects.count(e.soid) == 0 || - objects[e.soid]->version < e.version) - objects[e.soid] = &e; - if (e.reqid_is_indexed()) { - //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old - caller_ops[e.reqid] = &e; - } - } - void unindex() { - objects.clear(); - caller_ops.clear(); - } - void unindex(pg_log_entry_t& e) { - // NOTE: this only works if we remove from the _tail_ of the log! - if (objects.count(e.soid) && objects[e.soid]->version == e.version) - objects.erase(e.soid); - if (e.reqid_is_indexed() && - caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old - caller_ops[e.reqid] == &e) - caller_ops.erase(e.reqid); - } - - // actors - void add(pg_log_entry_t& e) { - // add to log - log.push_back(e); - assert(e.version > head); - assert(head.version == 0 || e.version.version > head.version); - head = e.version; - - // to our index - objects[e.soid] = &(log.back()); - caller_ops[e.reqid] = &(log.back()); - } - - void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s); - - ostream& print(ostream& out) const; - }; - - - /** - * OndiskLog - some info about how we store the log on disk. - */ - class OndiskLog { - public: - // ok - uint64_t tail; // first byte of log. - uint64_t head; // byte following end of log. - uint64_t zero_to; // first non-zeroed byte of log. - bool has_checksums; - - /** - * We reconstruct the missing set by comparing the recorded log against - * the objects in the pg collection. Unfortunately, it's possible to - * have an object in the missing set which is not in the log due to - * a divergent operation with a prior_version pointing before the - * pg log tail. To deal with this, we store alongside the log a mapping - * of divergent priors to be checked along with the log during read_state. - */ - map divergent_priors; - void add_divergent_prior(eversion_t version, hobject_t obj) { - divergent_priors.insert(make_pair(version, obj)); - } - - OndiskLog() : tail(0), head(0), zero_to(0), - has_checksums(true) {} - - uint64_t length() { return head - tail; } - bool trim_to(eversion_t v, ObjectStore::Transaction& t); - - void zero() { - tail = 0; - head = 0; - zero_to = 0; - } - - void encode(bufferlist& bl) const { - ENCODE_START(5, 3, bl); - ::encode(tail, bl); - ::encode(head, bl); - ::encode(zero_to, bl); - ::encode(divergent_priors, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); - has_checksums = (struct_v >= 2); - ::decode(tail, bl); - ::decode(head, bl); - if (struct_v >= 4) - ::decode(zero_to, bl); - else - zero_to = 0; - if (struct_v >= 5) - ::decode(divergent_priors, bl); - DECODE_FINISH(bl); - } - void dump(Formatter *f) const { - f->dump_unsigned("head", head); - f->dump_unsigned("tail", tail); - f->dump_unsigned("zero_to", zero_to); - f->open_array_section("divergent_priors"); - for (map::const_iterator p = divergent_priors.begin(); - p != divergent_priors.end(); - ++p) { - f->open_object_section("prior"); - f->dump_stream("version") << p->first; - f->dump_stream("object") << p->second; - f->close_section(); - } - f->close_section(); - } - static void generate_test_instances(list& o) { - o.push_back(new OndiskLog); - o.push_back(new OndiskLog); - o.back()->tail = 2; - o.back()->head = 3; - o.back()->zero_to = 1; - } - }; - WRITE_CLASS_ENCODER(OndiskLog) - - - /*** PG ****/ protected: OSDService *osd; @@ -466,7 +263,7 @@ public: const interval_set &snapcolls); const coll_t coll; - IndexedLog log; + PGLog pg_log; static string get_info_key(pg_t pgid) { return stringify(pgid) + "_info"; } @@ -478,8 +275,6 @@ public: } hobject_t log_oid; hobject_t biginfo_oid; - OndiskLog ondisklog; - pg_missing_t missing; map > missing_loc; set missing_loc_sources; // superset of missing_loc locations @@ -784,16 +579,6 @@ public: bool proc_replica_info(int from, const pg_info_t &info); void remove_snap_mapped_object( ObjectStore::Transaction& t, const hobject_t& soid); - bool merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe); - - /** - * Merges authoratative log/info into current log/info/store - * - * @param [in,out] t used to delete obsolete objects - * @param [in,out] oinfo recieved authoritative info - * @param [in,out] olog recieved authoritative log - * @param [in] from peer which sent the information - */ void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from); void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead); bool search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing, @@ -822,10 +607,10 @@ public: void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info); bool have_unfound() const { - return missing.num_missing() > missing_loc.size(); + return pg_log.get_missing().num_missing() > missing_loc.size(); } int get_num_unfound() const { - return missing.num_missing() - missing_loc.size(); + return pg_log.get_missing().num_missing() - missing_loc.size(); } virtual void clean_up_local(ObjectStore::Transaction& t) = 0; @@ -1878,35 +1663,18 @@ private: void write_log(ObjectStore::Transaction& t); public: - static void clear_info_log( - pg_t pgid, - const hobject_t &infos_oid, - const hobject_t &log_oid, - ObjectStore::Transaction *t); - static int _write_info(ObjectStore::Transaction& t, epoch_t epoch, pg_info_t &info, coll_t coll, map &past_intervals, interval_set &snap_collections, hobject_t &infos_oid, __u8 info_struct_v, bool dirty_big_info, bool force_ver = false); - static void _write_log(ObjectStore::Transaction& t, pg_log_t &log, - const hobject_t &log_oid, map &divergent_priors); void write_if_dirty(ObjectStore::Transaction& t); void add_log_entry(pg_log_entry_t& e, bufferlist& log_bl); void append_log( vector& logv, eversion_t trim_to, ObjectStore::Transaction &t); - - /// return true if the log should be rewritten - static bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL); - static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, - const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, - pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL); bool check_log_for_corruption(ObjectStore *store); - void trim(ObjectStore::Transaction& t, eversion_t v); void trim_peers(); std::string get_corrupt_pg_log_name() const; @@ -2027,8 +1795,6 @@ public: virtual void on_shutdown() = 0; }; -WRITE_CLASS_ENCODER(PG::OndiskLog) - ostream& operator<<(ostream& out, const PG& pg); #endif diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc new file mode 100644 index 00000000000..638a78697db --- /dev/null +++ b/src/osd/PGLog.cc @@ -0,0 +1,789 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * Copyright (C) 2013 Cloudwatt + * + * Author: Loic Dachary + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PGLog.h" +#include "PG.h" +#include "SnapMapper.h" + +#define dout_subsys ceph_subsys_osd + +//////////////////// PGLog //////////////////// + +void PGLog::IndexedLog::split_into( + pg_t child_pgid, + unsigned split_bits, + PGLog::IndexedLog *olog) +{ + list oldlog; + oldlog.swap(log); + + eversion_t old_tail; + olog->head = head; + olog->tail = tail; + unsigned mask = ~((~0)<::iterator i = oldlog.begin(); + i != oldlog.end(); + ) { + if ((i->soid.hash & mask) == child_pgid.m_seed) { + olog->log.push_back(*i); + if (log.empty()) + tail = i->version; + } else { + log.push_back(*i); + if (olog->empty()) + olog->tail = i->version; + } + oldlog.erase(i++); + } + + if (log.empty()) + tail = head; + else + head = log.rbegin()->version; + + if (olog->empty()) + olog->tail = olog->head; + else + olog->head = olog->log.rbegin()->version; + + olog->index(); + index(); +} + +void PGLog::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s) +{ + if (complete_to != log.end() && + complete_to->version <= s) { + generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version + << " on " << *this << dendl; + } + + set keys_to_rm; + while (!log.empty()) { + pg_log_entry_t &e = *log.begin(); + if (e.version > s) + break; + generic_dout(20) << "trim " << e << dendl; + unindex(e); // remove from index, + keys_to_rm.insert(e.get_key_name()); + log.pop_front(); // from log + } + t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm); + + // raise tail? + if (tail < s) + tail = s; +} + +ostream& PGLog::IndexedLog::print(ostream& out) const +{ + out << *this << std::endl; + for (list::const_iterator p = log.begin(); + p != log.end(); + ++p) { + out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl; + assert(!p->reqid_is_indexed() || logged_req(p->reqid)); + } + return out; +} + +//////////////////// PGLog //////////////////// + +void PGLog::clear_info_log( + pg_t pgid, + const hobject_t &infos_oid, + const hobject_t &log_oid, + ObjectStore::Transaction *t) { + + set keys_to_remove; + keys_to_remove.insert(PG::get_epoch_key(pgid)); + keys_to_remove.insert(PG::get_biginfo_key(pgid)); + keys_to_remove.insert(PG::get_info_key(pgid)); + + t->remove(coll_t::META_COLL, log_oid); + t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove); +} + +void PGLog::trim(ObjectStore::Transaction& t, eversion_t trim_to, pg_info_t &info, hobject_t &log_oid) +{ + // trim? + if (trim_to > log.tail) { + /* If we are trimming, we must be complete up to trim_to, time + * to throw out any divergent_priors + */ + ondisklog.divergent_priors.clear(); + // We shouldn't be trimming the log past last_complete + assert(trim_to <= info.last_complete); + + dout(10) << "trim " << log << " to " << trim_to << dendl; + log.trim(t, log_oid, trim_to); + info.log_tail = log.tail; + } +} + +void PGLog::proc_replica_log(ObjectStore::Transaction& t, + pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing, int from) +{ + dout(10) << "proc_replica_log for osd." << from << ": " + << oinfo << " " << olog << " " << omissing << dendl; + + /* + basically what we're doing here is rewinding the remote log, + dropping divergent entries, until we find something that matches + our master log. we then reset last_update to reflect the new + point up to which missing is accurate. + + later, in activate(), missing will get wound forward again and + we will send the peer enough log to arrive at the same state. + */ + + for (map::iterator i = omissing.missing.begin(); + i != omissing.missing.end(); + ++i) { + dout(20) << " before missing " << i->first << " need " << i->second.need + << " have " << i->second.have << dendl; + } + + list::const_reverse_iterator pp = olog.log.rbegin(); + eversion_t lu(oinfo.last_update); + while (true) { + if (pp == olog.log.rend()) { + if (pp != olog.log.rbegin()) // no last_update adjustment if we discard nothing! + lu = olog.tail; + break; + } + const pg_log_entry_t& oe = *pp; + + // don't continue past the tail of our log. + if (oe.version <= log.tail) + break; + + if (!log.objects.count(oe.soid)) { + dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl; + ++pp; + continue; + } + + pg_log_entry_t& ne = *log.objects[oe.soid]; + if (ne.version == oe.version) { + dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl; + lu = pp->version; + break; + } + + if (oe.soid > oinfo.last_backfill) { + // past backfill line, don't care + dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl; + ++pp; + continue; + } + + if (ne.version > oe.version) { + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + } else { + if (oe.is_delete()) { + if (ne.is_delete()) { + // old and new are delete + dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl; + } else { + // old delete, new update. + dout(10) << " had " << oe << " new " << ne << " : missing" << dendl; + omissing.add(ne.soid, ne.version, eversion_t()); + } + } else { + if (ne.is_delete()) { + // old update, new delete + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + omissing.rm(oe.soid, oe.version); + } else { + // old update, new update + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + omissing.revise_need(ne.soid, ne.version); + } + } + } + + ++pp; + } + + if (lu < oinfo.last_update) { + dout(10) << " peer osd." << from << " last_update now " << lu << dendl; + oinfo.last_update = lu; + } + + if (omissing.have_missing()) { + eversion_t first_missing = + omissing.missing[omissing.rmissing.begin()->second].need; + oinfo.last_complete = eversion_t(); + list::const_iterator i = olog.log.begin(); + for (; + i != olog.log.end(); + ++i) { + if (i->version < first_missing) + oinfo.last_complete = i->version; + else + break; + } + } else { + oinfo.last_complete = oinfo.last_update; + } +} + +/* + * merge an old (possibly divergent) log entry into the new log. this + * happens _after_ new log items have been assimilated. thus, we assume + * the index already references newer entries (if present), and missing + * has been updated accordingly. + * + * return true if entry is not divergent. + */ +bool PGLog::merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe, pg_info_t& info, list& remove_snap, bool &dirty_log) +{ + if (oe.soid > info.last_backfill) { + dout(20) << "merge_old_entry had " << oe << " : beyond last_backfill" << dendl; + return false; + } + if (log.objects.count(oe.soid)) { + pg_log_entry_t &ne = *log.objects[oe.soid]; // new(er?) entry + + if (ne.version > oe.version) { + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl; + assert(ne.is_delete() || missing.is_missing(ne.soid)); + return false; + } + if (ne.version == oe.version) { + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : same" << dendl; + return true; + } + if (oe.is_delete()) { + if (ne.is_delete()) { + // old and new are delete + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : both deletes" << dendl; + } else { + // old delete, new update. + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl; + missing.revise_need(ne.soid, ne.version); + } + } else { + if (ne.is_delete()) { + // old update, new delete + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl; + missing.rm(oe.soid, oe.version); + } else { + // old update, new update + dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl; + missing.revise_need(ne.soid, ne.version); + } + } + } else if (oe.op == pg_log_entry_t::CLONE) { + assert(oe.soid.snap != CEPH_NOSNAP); + dout(20) << "merge_old_entry had " << oe + << ", clone with no non-divergent log entries, " + << "deleting" << dendl; + remove_snap.push_back(oe.soid); + if (missing.is_missing(oe.soid)) + missing.rm(oe.soid, missing.missing[oe.soid].need); + } else if (oe.prior_version > info.log_tail) { + /** + * oe.prior_version is a previously divergent log entry + * oe.soid must have already been handled and the missing + * set updated appropriately + */ + dout(20) << "merge_old_entry had oe " << oe + << " with divergent prior_version " << oe.prior_version + << " oe.soid " << oe.soid + << " must already have been merged" << dendl; + } else { + if (!oe.is_delete()) { + dout(20) << "merge_old_entry had " << oe << " deleting" << dendl; + remove_snap.push_back(oe.soid); + } + dout(20) << "merge_old_entry had " << oe << " updating missing to " + << oe.prior_version << dendl; + if (oe.prior_version > eversion_t()) { + ondisklog.add_divergent_prior(oe.prior_version, oe.soid); + dirty_log = true; + missing.revise_need(oe.soid, oe.prior_version); + } else if (missing.is_missing(oe.soid)) { + missing.rm(oe.soid, missing.missing[oe.soid].need); + } + } + return false; +} + +/** + * rewind divergent entries at the head of the log + * + * This rewinds entries off the head of our log that are divergent. + * This is used by replicas during activation. + * + * @param t transaction + * @param newhead new head to rewind to + */ +void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead, + pg_info_t &info, list& remove_snap, + bool &dirty_log, bool &dirty_info, bool &dirty_big_info) +{ + dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl; + assert(newhead > log.tail); + + list::iterator p = log.log.end(); + list divergent; + while (true) { + if (p == log.log.begin()) { + // yikes, the whole thing is divergent! + divergent.swap(log.log); + break; + } + --p; + if (p->version == newhead) { + ++p; + divergent.splice(divergent.begin(), log.log, p, log.log.end()); + break; + } + assert(p->version > newhead); + dout(10) << "rewind_divergent_log future divergent " << *p << dendl; + log.unindex(*p); + } + + log.head = newhead; + info.last_update = newhead; + if (info.last_complete > newhead) + info.last_complete = newhead; + + for (list::iterator d = divergent.begin(); d != divergent.end(); ++d) + merge_old_entry(t, *d, info, remove_snap, dirty_log); + + dirty_info = true; + dirty_big_info = true; + dirty_log = true; +} + +void PGLog::merge_log(ObjectStore::Transaction& t, + pg_info_t &oinfo, pg_log_t &olog, int fromosd, + pg_info_t &info, list& remove_snap, + bool &dirty_log, bool &dirty_info, bool &dirty_big_info) +{ + dout(10) << "merge_log " << olog << " from osd." << fromosd + << " into " << log << dendl; + + // Check preconditions + + // If our log is empty, the incoming log needs to have not been trimmed. + assert(!log.null() || olog.tail == eversion_t()); + // The logs must overlap. + assert(log.head >= olog.tail && olog.head >= log.tail); + + for (map::iterator i = missing.missing.begin(); + i != missing.missing.end(); + ++i) { + dout(20) << "pg_missing_t sobject: " << i->first << dendl; + } + + bool changed = false; + + // extend on tail? + // this is just filling in history. it does not affect our + // missing set, as that should already be consistent with our + // current log. + if (olog.tail < log.tail) { + dout(10) << "merge_log extending tail to " << olog.tail << dendl; + list::iterator from = olog.log.begin(); + list::iterator to; + for (to = from; + to != olog.log.end(); + ++to) { + if (to->version > log.tail) + break; + log.index(*to); + dout(15) << *to << dendl; + } + assert(to != olog.log.end() || + (olog.head == info.last_update)); + + // splice into our log. + log.log.splice(log.log.begin(), + olog.log, from, to); + + info.log_tail = log.tail = olog.tail; + changed = true; + } + + if (oinfo.stats.reported < info.stats.reported) // make sure reported always increases + oinfo.stats.reported = info.stats.reported; + if (info.last_backfill.is_max()) + info.stats = oinfo.stats; + + // do we have divergent entries to throw out? + if (olog.head < log.head) { + rewind_divergent_log(t, olog.head, info, remove_snap, dirty_log, dirty_info, dirty_big_info); + changed = true; + } + + // extend on head? + if (olog.head > log.head) { + dout(10) << "merge_log extending head to " << olog.head << dendl; + + // find start point in olog + list::iterator to = olog.log.end(); + list::iterator from = olog.log.end(); + eversion_t lower_bound = olog.tail; + while (1) { + if (from == olog.log.begin()) + break; + --from; + dout(20) << " ? " << *from << dendl; + if (from->version <= log.head) { + dout(20) << "merge_log cut point (usually last shared) is " << *from << dendl; + lower_bound = from->version; + ++from; + break; + } + } + + // index, update missing, delete deleted + for (list::iterator p = from; p != to; ++p) { + pg_log_entry_t &ne = *p; + dout(20) << "merge_log " << ne << dendl; + log.index(ne); + if (ne.soid <= info.last_backfill) { + missing.add_next_event(ne); + if (ne.is_delete()) + remove_snap.push_back(ne.soid); + } + } + + // move aside divergent items + list divergent; + while (!log.empty()) { + pg_log_entry_t &oe = *log.log.rbegin(); + /* + * look at eversion.version here. we want to avoid a situation like: + * our log: 100'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 + * new log: 122'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529 + * lower_bound = 100'9 + * i.e, same request, different version. If the eversion.version is > the + * lower_bound, we it is divergent. + */ + if (oe.version.version <= lower_bound.version) + break; + dout(10) << "merge_log divergent " << oe << dendl; + divergent.push_front(oe); + log.unindex(oe); + log.log.pop_back(); + } + + // splice + log.log.splice(log.log.end(), + olog.log, from, to); + log.index(); + + info.last_update = log.head = olog.head; + info.purged_snaps = oinfo.purged_snaps; + + // process divergent items + if (!divergent.empty()) { + for (list::iterator d = divergent.begin(); d != divergent.end(); ++d) + merge_old_entry(t, *d, info, remove_snap, dirty_log); + } + + changed = true; + } + + dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl; + + if (changed) { + dirty_info = true; + dirty_big_info = true; + dirty_log = true; + } +} + +void PGLog::write_log(ObjectStore::Transaction& t, pg_log_t &log, + const hobject_t &log_oid, map &divergent_priors) +{ + //dout(10) << "write_log" << dendl; + t.remove(coll_t::META_COLL, log_oid); + t.touch(coll_t::META_COLL, log_oid); + map keys; + for (list::iterator p = log.log.begin(); + p != log.log.end(); + ++p) { + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + keys[p->get_key_name()].claim(bl); + } + //dout(10) << "write_log " << keys.size() << " keys" << dendl; + + ::encode(divergent_priors, keys["divergent_priors"]); + + t.omap_setkeys(coll_t::META_COLL, log_oid, keys); +} + +bool PGLog::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss) +{ + dout(10) << "read_log" << dendl; + bool rewrite_log = false; + + // legacy? + struct stat st; + int r = store->stat(coll_t::META_COLL, log_oid, &st); + assert(r == 0); + if (st.st_size > 0) { + read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss); + rewrite_log = true; + } else { + log.tail = info.log_tail; + ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid); + if (p) for (p->seek_to_first(); p->valid() ; p->next()) { + bufferlist bl = p->value();//Copy bufferlist before creating iterator + bufferlist::iterator bp = bl.begin(); + if (p->key() == "divergent_priors") { + ::decode(ondisklog.divergent_priors, bp); + dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl; + } else { + pg_log_entry_t e; + e.decode_with_checksum(bp); + dout(20) << "read_log " << e << dendl; + if (!log.log.empty()) { + pg_log_entry_t last_e(log.log.back()); + assert(last_e.version.version < e.version.version); + assert(last_e.version.epoch <= e.version.epoch); + } + log.log.push_back(e); + log.head = e.version; + } + } + } + log.head = info.last_update; + log.index(); + + // build missing + if (info.last_complete < info.last_update) { + dout(10) << "read_log checking for missing items over interval (" << info.last_complete + << "," << info.last_update << "]" << dendl; + + set did; + for (list::reverse_iterator i = log.log.rbegin(); + i != log.log.rend(); + ++i) { + if (i->version <= info.last_complete) break; + if (did.count(i->soid)) continue; + did.insert(i->soid); + + if (i->is_delete()) continue; + + bufferlist bv; + int r = store->getattr(coll, i->soid, OI_ATTR, bv); + if (r >= 0) { + object_info_t oi(bv); + if (oi.version < i->version) { + dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl; + missing.add(i->soid, i->version, oi.version); + } + } else { + dout(15) << "read_log missing " << *i << dendl; + missing.add(i->soid, i->version, eversion_t()); + } + } + for (map::reverse_iterator i = + ondisklog.divergent_priors.rbegin(); + i != ondisklog.divergent_priors.rend(); + ++i) { + if (i->first <= info.last_complete) break; + if (did.count(i->second)) continue; + did.insert(i->second); + bufferlist bv; + int r = store->getattr(coll, i->second, OI_ATTR, bv); + if (r >= 0) { + object_info_t oi(bv); + /** + * 1) we see this entry in the divergent priors mapping + * 2) we didn't see an entry for this object in the log + * + * From 1 & 2 we know that either the object does not exist + * or it is at the version specified in the divergent_priors + * map since the object would have been deleted atomically + * with the addition of the divergent_priors entry, an older + * version would not have been recovered, and a newer version + * would show up in the log above. + */ + assert(oi.version == i->first); + } else { + dout(15) << "read_log missing " << *i << dendl; + missing.add(i->second, i->first, eversion_t()); + } + } + } + dout(10) << "read_log done" << dendl; + return rewrite_log; +} + +void PGLog::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss) +{ + // load bounds, based on old OndiskLog encoding. + uint64_t ondisklog_tail = 0; + uint64_t ondisklog_head = 0; + uint64_t ondisklog_zero_to; + bool ondisklog_has_checksums; + + bufferlist blb; + store->collection_getattr(coll, "ondisklog", blb); + { + bufferlist::iterator bl = blb.begin(); + DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); + ondisklog_has_checksums = (struct_v >= 2); + ::decode(ondisklog_tail, bl); + ::decode(ondisklog_head, bl); + if (struct_v >= 4) + ::decode(ondisklog_zero_to, bl); + else + ondisklog_zero_to = 0; + if (struct_v >= 5) + ::decode(ondisklog.divergent_priors, bl); + DECODE_FINISH(bl); + } + uint64_t ondisklog_length = ondisklog_head - ondisklog_tail; + dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl; + + log.tail = info.log_tail; + + // In case of sobject_t based encoding, may need to list objects in the store + // to find hashes + vector ls; + + if (ondisklog_head > 0) { + // read + bufferlist bl; + store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl); + if (bl.length() < ondisklog_length) { + std::ostringstream oss; + oss << "read_log got " << bl.length() << " bytes, expected " + << ondisklog_head << "-" << ondisklog_tail << "=" + << ondisklog_length; + throw read_log_error(oss.str().c_str()); + } + + pg_log_entry_t e; + bufferlist::iterator p = bl.begin(); + assert(log.empty()); + eversion_t last; + bool reorder = false; + bool listed_collection = false; + + while (!p.end()) { + uint64_t pos = ondisklog_tail + p.get_off(); + if (ondisklog_has_checksums) { + bufferlist ebl; + ::decode(ebl, p); + __u32 crc; + ::decode(crc, p); + + __u32 got = ebl.crc32c(0); + if (crc == got) { + bufferlist::iterator q = ebl.begin(); + ::decode(e, q); + } else { + std::ostringstream oss; + oss << "read_log " << pos << " bad crc got " << got << " expected" << crc; + throw read_log_error(oss.str().c_str()); + } + } else { + ::decode(e, p); + } + dout(20) << "read_log " << pos << " " << e << dendl; + + // [repair] in order? + if (e.version < last) { + dout(0) << "read_log " << pos << " out of order entry " << e << " follows " << last << dendl; + oss << info.pgid << " log has out of order entry " + << e << " following " << last << "\n"; + reorder = true; + } + + if (e.version <= log.tail) { + dout(20) << "read_log ignoring entry at " << pos << " below log.tail" << dendl; + continue; + } + if (last.version == e.version.version) { + dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl; + log.log.pop_back(); + oss << info.pgid << " read_log got dup " + << e.version << " after " << last << "\n"; + } + + if (e.invalid_hash) { + // We need to find the object in the store to get the hash + if (!listed_collection) { + store->collection_list(coll, ls); + listed_collection = true; + } + bool found = false; + for (vector::iterator i = ls.begin(); + i != ls.end(); + ++i) { + if (i->oid == e.soid.oid && i->snap == e.soid.snap) { + e.soid = *i; + found = true; + break; + } + } + if (!found) { + // Didn't find the correct hash + std::ostringstream oss; + oss << "Could not find hash for hoid " << e.soid << std::endl; + throw read_log_error(oss.str().c_str()); + } + } + + if (e.invalid_pool) { + e.soid.pool = info.pgid.pool(); + } + + e.offset = pos; + uint64_t endpos = ondisklog_tail + p.get_off(); + log.log.push_back(e); + last = e.version; + + // [repair] at end of log? + if (!p.end() && e.version == info.last_update) { + oss << info.pgid << " log has extra data at " + << endpos << "~" << (ondisklog_head-endpos) << " after " + << info.last_update << "\n"; + + dout(0) << "read_log " << endpos << " *** extra gunk at end of log, " + << "adjusting ondisklog_head" << dendl; + ondisklog_head = endpos; + break; + } + } + + if (reorder) { + dout(0) << "read_log reordering log" << dendl; + map m; + for (list::iterator p = log.log.begin(); p != log.log.end(); ++p) + m[p->version] = *p; + log.log.clear(); + for (map::iterator p = m.begin(); p != m.end(); ++p) + log.log.push_back(p->second); + } + } +} diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h new file mode 100644 index 00000000000..cfb17f16ce2 --- /dev/null +++ b/src/osd/PGLog.h @@ -0,0 +1,391 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * Copyright (C) 2013 Cloudwatt + * + * Author: Loic Dachary + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_PG_LOG_H +#define CEPH_PG_LOG_H + +// re-include our assert to clobber boost's +#include "include/assert.h" +#include "osd_types.h" +#include "os/ObjectStore.h" +#include +using namespace std; + +struct PGLog { + ////////////////////////////// sub classes ////////////////////////////// + + /* Exceptions */ + class read_log_error : public buffer::error { + public: + explicit read_log_error(const char *what) { + snprintf(buf, sizeof(buf), "read_log_error: %s", what); + } + const char *what() const throw () { + return buf; + } + private: + char buf[512]; + }; + + /** + * IndexLog - adds in-memory index of the log, by oid. + * plus some methods to manipulate it all. + */ + struct IndexedLog : public pg_log_t { + hash_map objects; // ptrs into log. be careful! + hash_map caller_ops; + + // recovery pointers + list::iterator complete_to; // not inclusive of referenced item + version_t last_requested; // last object requested by primary + + /****/ + IndexedLog() : last_requested(0) {} + + void claim_log(const pg_log_t& o) { + log = o.log; + head = o.head; + tail = o.tail; + index(); + } + + void split_into( + pg_t child_pgid, + unsigned split_bits, + IndexedLog *olog); + + void zero() { + unindex(); + pg_log_t::clear(); + reset_recovery_pointers(); + } + void reset_recovery_pointers() { + complete_to = log.end(); + last_requested = 0; + } + + bool logged_object(const hobject_t& oid) const { + return objects.count(oid); + } + bool logged_req(const osd_reqid_t &r) const { + return caller_ops.count(r); + } + eversion_t get_request_version(const osd_reqid_t &r) const { + hash_map::const_iterator p = caller_ops.find(r); + if (p == caller_ops.end()) + return eversion_t(); + return p->second->version; + } + + void index() { + objects.clear(); + caller_ops.clear(); + for (list::iterator i = log.begin(); + i != log.end(); + ++i) { + objects[i->soid] = &(*i); + if (i->reqid_is_indexed()) { + //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old + caller_ops[i->reqid] = &(*i); + } + } + } + + void index(pg_log_entry_t& e) { + if (objects.count(e.soid) == 0 || + objects[e.soid]->version < e.version) + objects[e.soid] = &e; + if (e.reqid_is_indexed()) { + //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old + caller_ops[e.reqid] = &e; + } + } + void unindex() { + objects.clear(); + caller_ops.clear(); + } + void unindex(pg_log_entry_t& e) { + // NOTE: this only works if we remove from the _tail_ of the log! + if (objects.count(e.soid) && objects[e.soid]->version == e.version) + objects.erase(e.soid); + if (e.reqid_is_indexed() && + caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old + caller_ops[e.reqid] == &e) + caller_ops.erase(e.reqid); + } + + // actors + void add(pg_log_entry_t& e) { + // add to log + log.push_back(e); + assert(e.version > head); + assert(head.version == 0 || e.version.version > head.version); + head = e.version; + + // to our index + objects[e.soid] = &(log.back()); + caller_ops[e.reqid] = &(log.back()); + } + + void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s); + + ostream& print(ostream& out) const; + }; + + /** + * OndiskLog - some info about how we store the log on disk. + */ + class OndiskLog { + public: + // ok + uint64_t tail; // first byte of log. + uint64_t head; // byte following end of log. + uint64_t zero_to; // first non-zeroed byte of log. + bool has_checksums; + + /** + * We reconstruct the missing set by comparing the recorded log against + * the objects in the pg collection. Unfortunately, it's possible to + * have an object in the missing set which is not in the log due to + * a divergent operation with a prior_version pointing before the + * pg log tail. To deal with this, we store alongside the log a mapping + * of divergent priors to be checked along with the log during read_state. + */ + map divergent_priors; + void add_divergent_prior(eversion_t version, hobject_t obj) { + divergent_priors.insert(make_pair(version, obj)); + } + + OndiskLog() : tail(0), head(0), zero_to(0), + has_checksums(true) {} + + uint64_t length() const { return head - tail; } + bool trim_to(eversion_t v, ObjectStore::Transaction& t); + + void zero() { + tail = 0; + head = 0; + zero_to = 0; + } + + void encode(bufferlist& bl) const { + ENCODE_START(5, 3, bl); + ::encode(tail, bl); + ::encode(head, bl); + ::encode(zero_to, bl); + ::encode(divergent_priors, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); + has_checksums = (struct_v >= 2); + ::decode(tail, bl); + ::decode(head, bl); + if (struct_v >= 4) + ::decode(zero_to, bl); + else + zero_to = 0; + if (struct_v >= 5) + ::decode(divergent_priors, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const { + f->dump_unsigned("head", head); + f->dump_unsigned("tail", tail); + f->dump_unsigned("zero_to", zero_to); + f->open_array_section("divergent_priors"); + for (map::const_iterator p = divergent_priors.begin(); + p != divergent_priors.end(); + ++p) { + f->open_object_section("prior"); + f->dump_stream("version") << p->first; + f->dump_stream("object") << p->second; + f->close_section(); + } + f->close_section(); + } + static void generate_test_instances(list& o) { + o.push_back(new OndiskLog); + o.push_back(new OndiskLog); + o.back()->tail = 2; + o.back()->head = 3; + o.back()->zero_to = 1; + } + }; + WRITE_CLASS_ENCODER(OndiskLog) + +protected: + //////////////////// data members //////////////////// + + OndiskLog ondisklog; + pg_missing_t missing; + IndexedLog log; + +public: + //////////////////// get or set missing //////////////////// + + const pg_missing_t& get_missing() const { return missing; } + + void missing_got(map::const_iterator m) { + map::iterator p = missing.missing.find(m->first); + missing.got(p); + } + + void revise_have(hobject_t oid, eversion_t have) { + missing.revise_have(oid, have); + } + + void revise_need(hobject_t oid, eversion_t need) { + missing.revise_need(oid, need); + } + + void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) { + missing.add(oid, need, have); + } + + void missing_rm(map::const_iterator m) { + map::iterator p = missing.missing.find(m->first); + missing.rm(p); + } + + //////////////////// get or set ondisklog //////////////////// + + const OndiskLog &get_ondisklog() const { return ondisklog; } + + //////////////////// get or set log //////////////////// + + const IndexedLog &get_log() const { return log; } + + const eversion_t &get_tail() const { return log.tail; } + + void set_tail(eversion_t tail) { log.tail = tail; } + + const eversion_t &get_head() const { return log.head; } + + void set_head(eversion_t head) { log.head = head; } + + void set_last_requested(version_t last_requested) { + log.last_requested = last_requested; + } + + void index() { log.index(); } + + void unindex() { log.unindex(); } + + void add(pg_log_entry_t& e) { log.add(e); } + + void reset_recovery_pointers() { log.reset_recovery_pointers(); } + + static void clear_info_log( + pg_t pgid, + const hobject_t &infos_oid, + const hobject_t &log_oid, + ObjectStore::Transaction *t); + + void trim(ObjectStore::Transaction& t, eversion_t trim_to, pg_info_t &info, hobject_t &log_oid); + + //////////////////// get or set log & missing //////////////////// + + void claim_log(const pg_log_t &o) { + log.claim_log(o); + missing.clear(); + } + + void split_into( + pg_t child_pgid, + unsigned split_bits, + PGLog *opg_log) { + log.split_into(child_pgid, split_bits, &(opg_log->log)); + missing.split_into(child_pgid, split_bits, &(opg_log->missing)); + } + + void recover_got(hobject_t oid, eversion_t v, pg_info_t &info) { + if (missing.is_missing(oid, v)) { + missing.got(oid, v); + + // raise last_complete? + if (missing.missing.empty()) { + log.complete_to = log.log.end(); + info.last_complete = info.last_update; + } + while (log.complete_to != log.log.end()) { + if (missing.missing[missing.rmissing.begin()->second].need <= + log.complete_to->version) + break; + if (info.last_complete < log.complete_to->version) + info.last_complete = log.complete_to->version; + log.complete_to++; + } + } + } + + void activate_not_complete(pg_info_t &info) { + log.complete_to = log.log.begin(); + while (log.complete_to->version < + missing.missing[missing.rmissing.begin()->second].need) + log.complete_to++; + assert(log.complete_to != log.log.end()); + if (log.complete_to == log.log.begin()) { + info.last_complete = eversion_t(); + } else { + log.complete_to--; + info.last_complete = log.complete_to->version; + log.complete_to++; + } + log.last_requested = 0; + } + + void proc_replica_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, + pg_missing_t& omissing, int from); + +protected: + bool merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe, + pg_info_t& info, list& remove_snap, bool &dirty_log); +public: + void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead, + pg_info_t &info, list& remove_snap, + bool &dirty_log, bool &dirty_info, bool &dirty_big_info); + + void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from, + pg_info_t &info, list& remove_snap, + bool &dirty_log, bool &dirty_info, bool &dirty_big_info); + + void write_log(ObjectStore::Transaction& t, const hobject_t &log_oid) { + write_log(t, log, log_oid, ondisklog.divergent_priors); + } + + static void write_log(ObjectStore::Transaction& t, pg_log_t &log, + const hobject_t &log_oid, map &divergent_priors); + + bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, ostringstream &oss) { + return read_log(store, coll, log_oid, info, ondisklog, log, missing, oss); + } + + /// return true if the log should be rewritten + static bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss); + +protected: + static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss); +}; + +WRITE_CLASS_ENCODER(PGLog::OndiskLog) + +#endif // CEPH_PG_LOG_H diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index de60a6a9205..91241fa26cb 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -94,13 +94,15 @@ bool ReplicatedPG::same_for_rep_modify_since(epoch_t e) bool ReplicatedPG::is_missing_object(const hobject_t& soid) { - return missing.missing.count(soid); + return pg_log.get_missing().missing.count(soid); } void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op) { assert(is_missing_object(soid)); + const pg_missing_t &missing = pg_log.get_missing(); + // we don't have it (yet). map::const_iterator g = missing.missing.find(soid); assert(g != missing.missing.end()); @@ -128,7 +130,7 @@ void ReplicatedPG::wait_for_all_missing(OpRequestRef op) bool ReplicatedPG::is_degraded_object(const hobject_t& soid) { - if (missing.missing.count(soid)) + if (pg_log.get_missing().missing.count(soid)) return true; for (unsigned i = 1; i < acting.size(); i++) { int peer = acting[i]; @@ -262,6 +264,8 @@ int ReplicatedPG::get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilt int ReplicatedPG::do_command(vector& cmd, ostream& ss, bufferlist& idata, bufferlist& odata) { + const pg_missing_t &missing = pg_log.get_missing(); + if (cmd.size() && cmd[0] == "query") { JSONFormatter jsf(true); jsf.open_object_section("pg"); @@ -348,7 +352,7 @@ int ReplicatedPG::do_command(vector& cmd, ostream& ss, } jf.dump_int("num_missing", missing.num_missing()); jf.dump_int("num_unfound", get_num_unfound()); - map::iterator p = missing.missing.upper_bound(offset); + map::const_iterator p = missing.missing.upper_bound(offset); { jf.open_array_section("objects"); int32_t num = 0; @@ -391,7 +395,7 @@ int ReplicatedPG::do_command(vector& cmd, ostream& ss, bool ReplicatedPG::pg_op_must_wait(MOSDOp *op) { - if (missing.missing.empty()) + if (pg_log.get_missing().missing.empty()) return false; for (vector::iterator p = op->ops.begin(); p != op->ops.end(); ++p) { if (p->op.op == CEPH_OSD_OP_PGLS) { @@ -474,9 +478,9 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) break; } - assert(snapid == CEPH_NOSNAP || missing.missing.empty()); - map::iterator missing_iter = - missing.missing.lower_bound(current); + assert(snapid == CEPH_NOSNAP || pg_log.get_missing().missing.empty()); + map::const_iterator missing_iter = + pg_log.get_missing().missing.lower_bound(current); vector::iterator ls_iter = sentries.begin(); while (1) { if (ls_iter == sentries.end()) { @@ -484,7 +488,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) } hobject_t candidate; - if (missing_iter == missing.missing.end() || + if (missing_iter == pg_log.get_missing().missing.end() || *ls_iter < missing_iter->first) { candidate = *(ls_iter++); } else { @@ -529,7 +533,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) candidate.get_key())); } if (next.is_max() && - missing_iter == missing.missing.end() && + missing_iter == pg_log.get_missing().missing.end() && ls_iter == sentries.end()) { result = 1; } @@ -577,9 +581,9 @@ void ReplicatedPG::calc_trim_to() if (min_last_complete_ondisk != eversion_t() && min_last_complete_ondisk != pg_trim_to && - log.approx_size() > target) { - size_t num_to_trim = log.approx_size() - target; - list::const_iterator it = log.log.begin(); + pg_log.get_log().approx_size() > target) { + size_t num_to_trim = pg_log.get_log().approx_size() - target; + list::const_iterator it = pg_log.get_log().log.begin(); eversion_t new_trim_to; for (size_t i = 0; i < num_to_trim; ++i) { new_trim_to = it->version; @@ -592,7 +596,7 @@ void ReplicatedPG::calc_trim_to() } dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl; pg_trim_to = new_trim_to; - assert(pg_trim_to <= log.head); + assert(pg_trim_to <= pg_log.get_head()); assert(pg_trim_to <= min_last_complete_ondisk); } } @@ -915,7 +919,7 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - eversion_t oldv = log.get_request_version(ctx->reqid); + eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid); if (oldv != eversion_t()) { dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl; delete ctx; @@ -944,12 +948,12 @@ void ReplicatedPG::do_op(OpRequestRef op) op->mark_started(); // version - ctx->at_version = log.head; + ctx->at_version = pg_log.get_head(); ctx->at_version.epoch = get_osdmap()->get_epoch(); ctx->at_version.version++; assert(ctx->at_version > info.last_update); - assert(ctx->at_version > log.head); + assert(ctx->at_version > pg_log.get_head()); ctx->mtime = m->get_mtime(); @@ -968,7 +972,7 @@ void ReplicatedPG::do_op(OpRequestRef op) utime_t now = ceph_clock_now(g_ceph_context); // note some basic context for op replication that prepare_transaction may clobber - eversion_t old_last_update = log.head; + eversion_t old_last_update = pg_log.get_head(); bool old_exists = obc->obs.exists; uint64_t old_size = obc->obs.oi.size; eversion_t old_version = obc->obs.oi.version; @@ -1415,7 +1419,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) ctx->mtime = ceph_clock_now(g_ceph_context); ctx->at_version.epoch = get_osdmap()->get_epoch(); - ctx->at_version.version = log.head.version + 1; + ctx->at_version.version = pg_log.get_head().version + 1; RepGather *repop = new_repop(ctx, obc, rep_tid); @@ -4177,9 +4181,11 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc) { assert(is_active()); assert(!is_missing_object(obc->obs.oi.soid) || - (log.objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary() - log.objects[obc->obs.oi.soid]->op == pg_log_entry_t::LOST_REVERT && - log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version)); + (pg_log.get_log().objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary() + pg_log.get_log().objects.find(obc->obs.oi.soid)->second->op == + pg_log_entry_t::LOST_REVERT && + pg_log.get_log().objects.find(obc->obs.oi.soid)->second->reverting_to == + obc->obs.oi.version)); dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl; assert(obc->watchers.empty()); @@ -4241,7 +4247,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) ctx->mtime = ceph_clock_now(g_ceph_context); ctx->at_version.epoch = get_osdmap()->get_epoch(); - ctx->at_version.version = log.head.version + 1; + ctx->at_version.version = pg_log.get_head().version + 1; entity_inst_t nobody; @@ -4257,7 +4263,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) obc->obs.oi.version, osd_reqid_t(), ctx->mtime)); - eversion_t old_last_update = log.head; + eversion_t old_last_update = pg_log.get_head(); bool old_exists = repop->obc->obs.exists; uint64_t old_size = repop->obc->obs.oi.size; eversion_t old_version = repop->obc->obs.oi.version; @@ -4465,7 +4471,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, put_snapset_context(ssc); // we're done with ssc ssc = 0; - if (missing.is_missing(soid)) { + if (pg_log.get_missing().is_missing(soid)) { dout(20) << "find_object_context " << soid << " missing, try again later" << dendl; if (psnapid) *psnapid = soid.snap; @@ -4656,7 +4662,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) assert(is_replica()); // we better not be missing this. - assert(!missing.is_missing(soid)); + assert(!pg_log.get_missing().is_missing(soid)); int ackerosd = acting[0]; @@ -4742,7 +4748,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) } else { // just trim the log if (m->pg_trim_to != eversion_t()) { - trim(rm->localt, m->pg_trim_to); + pg_log.trim(rm->localt, m->pg_trim_to, info, log_oid); rm->tls.push_back(&rm->localt); } } @@ -4915,7 +4921,7 @@ void ReplicatedPG::calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const } void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid, - pg_missing_t& missing, + const pg_missing_t& missing, const hobject_t &last_backfill, interval_set& data_subset, map >& clone_subsets) @@ -5030,13 +5036,13 @@ int ReplicatedPG::pull( << " at version " << peer_missing[fromosd].missing[soid].have << " rather than at version " << v << dendl; v = peer_missing[fromosd].missing[soid].have; - assert(log.objects.count(soid) && - log.objects[soid]->op == pg_log_entry_t::LOST_REVERT && - log.objects[soid]->reverting_to == v); + assert(pg_log.get_log().objects.count(soid) && + pg_log.get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT && + pg_log.get_log().objects.find(soid)->second->reverting_to == v); } dout(7) << "pull " << soid - << " v " << v + << " v " << v << " on osds " << missing_loc[soid] << " from osd." << fromosd << dendl; @@ -5048,24 +5054,24 @@ int ReplicatedPG::pull( // do we have the head and/or snapdir? hobject_t head = soid; head.snap = CEPH_NOSNAP; - if (missing.is_missing(head)) { + if (pg_log.get_missing().is_missing(head)) { if (pulling.count(head)) { dout(10) << " missing but already pulling head " << head << dendl; return PULL_NONE; } else { - int r = pull(head, missing.missing[head].need, priority); + int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority); if (r != PULL_NONE) return PULL_OTHER; return PULL_NONE; } } head.snap = CEPH_SNAPDIR; - if (missing.is_missing(head)) { + if (pg_log.get_missing().is_missing(head)) { if (pulling.count(head)) { dout(10) << " missing but already pulling snapdir " << head << dendl; return PULL_NONE; } else { - int r = pull(head, missing.missing[head].need, priority); + int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority); if (r != PULL_NONE) return PULL_OTHER; return PULL_NONE; @@ -5076,7 +5082,7 @@ int ReplicatedPG::pull( SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); assert(ssc); dout(10) << " snapset " << ssc->snapset << dendl; - calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill, + calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill, recovery_info.copy_subset, recovery_info.clone_subset); put_snapset_context(ssc); @@ -5116,7 +5122,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) dout(10) << "send_remove_op " << oid << " from osd." << peer << " tid " << tid << dendl; - + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK, get_osdmap()->get_epoch(), tid, v); subop->ops = vector(1); @@ -5130,8 +5136,8 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) * clones/heads and dup data ranges where possible. */ void ReplicatedPG::push_to_replica( - ObjectContext *obc, const hobject_t& soid, int peer, - int prio) + ObjectContext *obc, const hobject_t& soid, int peer, + int prio) { const object_info_t& oi = obc->obs.oi; uint64_t size = obc->obs.oi.size; @@ -5140,7 +5146,7 @@ void ReplicatedPG::push_to_replica( map > clone_subsets; interval_set data_subset; - + // are we doing a clone on the replica? if (soid.snap && soid.snap < CEPH_NOSNAP) { hobject_t head = soid; @@ -5148,13 +5154,13 @@ void ReplicatedPG::push_to_replica( // try to base push off of clones that succeed/preceed poid // we need the head (and current SnapSet) locally to do that. - if (missing.is_missing(head)) { + if (pg_log.get_missing().is_missing(head)) { dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; return push_start(prio, obc, soid, peer); } hobject_t snapdir = head; snapdir.snap = CEPH_SNAPDIR; - if (missing.is_missing(snapdir)) { + if (pg_log.get_missing().is_missing(snapdir)) { dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl; return push_start(prio, obc, soid, peer); } @@ -5267,7 +5273,7 @@ void ReplicatedPG::submit_push_data( ObjectStore::Transaction *t) { if (first) { - missing.revise_have(recovery_info.soid, eversion_t()); + pg_log.revise_have(recovery_info.soid, eversion_t()); remove_snap_mapped_object(*t, recovery_info.soid); t->remove(get_temp_coll(t), recovery_info.soid); t->touch(get_temp_coll(t), recovery_info.soid); @@ -5320,10 +5326,10 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, &_t); } - if (missing.is_missing(recovery_info.soid) && - missing.missing[recovery_info.soid].need > recovery_info.version) { + if (pg_log.get_missing().is_missing(recovery_info.soid) && + pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { assert(is_primary()); - pg_log_entry_t *latest = log.objects[recovery_info.soid]; + const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; if (latest->op == pg_log_entry_t::LOST_REVERT && latest->reverting_to == recovery_info.version) { dout(10) << " got old revert version " << recovery_info.version @@ -5358,7 +5364,7 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove new_info.copy_subset.clear(); new_info.clone_subset.clear(); assert(ssc); - calc_clone_subsets(ssc->snapset, new_info.soid, missing, info.last_backfill, + calc_clone_subsets(ssc->snapset, new_info.soid, pg_log.get_missing(), info.last_backfill, new_info.copy_subset, new_info.clone_subset); put_snapset_context(ssc); return new_info; @@ -5500,7 +5506,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) dout(20) << " kicking waiters on " << hoid << dendl; requeue_ops(waiting_for_missing_object[hoid]); waiting_for_missing_object.erase(hoid); - if (missing.missing.size() == 0) { + if (pg_log.get_missing().missing.size() == 0) { requeue_ops(waiting_for_all_missing); waiting_for_all_missing.clear(); } @@ -5911,36 +5917,22 @@ void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) { - if (missing.is_missing(oid, v)) { - dout(10) << "got missing " << oid << " v " << v << dendl; - missing.got(oid, v); - if (is_primary()) - missing_loc.erase(oid); - - // raise last_complete? - if (missing.missing.empty()) { - log.complete_to = log.log.end(); - info.last_complete = info.last_update; - } - while (log.complete_to != log.log.end()) { - if (missing.missing[missing.rmissing.begin()->second].need <= - log.complete_to->version) - break; - if (info.last_complete < log.complete_to->version) - info.last_complete = log.complete_to->version; - log.complete_to++; - } - if (log.complete_to != log.log.end()) { - dout(10) << "last_complete now " << info.last_complete - << " log.complete_to " << log.complete_to->version - << dendl; - } else { - dout(10) << "last_complete now " << info.last_complete - << " log.complete_to at end" << dendl; - //below is not true in the repair case. - //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong. - assert(info.last_complete == info.last_update); - } + dout(10) << "got missing " << oid << " v " << v << dendl; + if (pg_log.get_missing().is_missing(oid, v)) { + if (is_primary()) + missing_loc.erase(oid); + } + pg_log.recover_got(oid, v, info); + if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) { + dout(10) << "last_complete now " << info.last_complete + << " log.complete_to " << pg_log.get_log().complete_to->version + << dendl; + } else { + dout(10) << "last_complete now " << info.last_complete + << " log.complete_to at end" << dendl; + //below is not true in the repair case. + //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong. + assert(info.last_complete == info.last_update); } } @@ -6047,8 +6039,8 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid) { eversion_t v; - assert(missing.is_missing(oid)); - v = missing.missing[oid].have; + assert(pg_log.get_missing().is_missing(oid)); + v = pg_log.get_missing().missing.find(oid)->second.have; dout(10) << "pick_newest_available " << oid << " " << v << " on osd." << osd->whoami << " (local)" << dendl; for (unsigned i=1; iget_epoch(); - map::iterator m = missing.missing.begin(); - map::iterator mend = missing.missing.end(); + const pg_missing_t &missing = pg_log.get_missing(); + map::const_iterator m = missing.missing.begin(); + map::const_iterator mend = missing.missing.end(); while (m != mend) { const hobject_t &oid(m->first); if (missing_loc.find(oid) != missing_loc.end()) { @@ -6145,7 +6138,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what) switch (what) { case pg_log_entry_t::LOST_MARK: obc = mark_object_lost(t, oid, m->second.need, mtime, pg_log_entry_t::LOST_MARK); - missing.got(m++); + pg_log.missing_got(m++); assert(0 == "actually, not implemented yet!"); // we need to be careful about how this is handled on the replica! break; @@ -6159,12 +6152,12 @@ void ReplicatedPG::mark_all_unfound_lost(int what) pg_log_entry_t::LOST_REVERT, oid, info.last_update, m->second.need, osd_reqid_t(), mtime); e.reverting_to = prev; - log.add(e); + pg_log.add(e); dout(10) << e << dendl; // we are now missing the new version; recovery code will sort it out. ++m; - missing.revise_need(oid, info.last_update); + pg_log.revise_need(oid, info.last_update); break; } /** fall-thru **/ @@ -6175,14 +6168,14 @@ void ReplicatedPG::mark_all_unfound_lost(int what) ++info.last_update.version; pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need, osd_reqid_t(), mtime); - log.add(e); + pg_log.add(e); dout(10) << e << dendl; // delete local copy? NOT YET! FIXME if (m->second.have != eversion_t()) { assert(0 == "not implemented.. tho i'm not sure how useful it really would be."); } - missing.rm(m++); + pg_log.missing_rm(m++); } break; @@ -6195,7 +6188,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what) } dout(30) << __func__ << ": log after:\n"; - log.print(*_dout); + pg_log.get_log().print(*_dout); *_dout << dendl; if (missing.num_missing() == 0) { @@ -6432,7 +6425,7 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) pulling.erase(*i); finish_recovery_op(*i); } - log.last_requested = 0; + pg_log.set_last_requested(0); pull_from_peer.erase(j++); } @@ -6498,6 +6491,8 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) return 0; } + const pg_missing_t &missing = pg_log.get_missing(); + int num_missing = missing.num_missing(); int num_unfound = get_num_unfound(); @@ -6607,6 +6602,8 @@ int ReplicatedPG::recover_primary(int max) { assert(is_primary()); + const pg_missing_t &missing = pg_log.get_missing(); + dout(10) << "recover_primary pulling " << pulling.size() << " in pg" << dendl; dout(10) << "recover_primary " << missing << dendl; dout(25) << "recover_primary " << missing.missing << dendl; @@ -6616,20 +6613,21 @@ int ReplicatedPG::recover_primary(int max) int started = 0; int skipped = 0; - map::iterator p = missing.rmissing.lower_bound(log.last_requested); + map::const_iterator p = + missing.rmissing.lower_bound(pg_log.get_log().last_requested); while (p != missing.rmissing.end()) { hobject_t soid; version_t v = p->first; - if (log.objects.count(p->second)) { - latest = log.objects[p->second]; + if (pg_log.get_log().objects.count(p->second)) { + latest = pg_log.get_log().objects.find(p->second)->second; assert(latest->is_update()); soid = latest->soid; } else { latest = 0; soid = p->second; } - pg_missing_t::item& item = missing.missing[p->second]; + const pg_missing_t::item& item = missing.missing.find(p->second)->second; ++p; hobject_t head = soid; @@ -6749,7 +6747,7 @@ int ReplicatedPG::recover_primary(int max) // only advance last_requested if we haven't skipped anything if (!skipped) - log.last_requested = v; + pg_log.set_last_requested(v); } return started; @@ -6763,7 +6761,7 @@ int ReplicatedPG::recover_object_replicas( // NOTE: we know we will get a valid oloc off of disk here. ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false); if (!obc) { - missing.add(soid, v, eversion_t()); + pg_log.missing_add(soid, v, eversion_t()); bool uhoh = true; for (unsigned i=1; i= log.tail); // otherwise we need some help! + assert(info.last_update >= pg_log.get_tail()); // otherwise we need some help! // just scan the log. set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); + for (list::const_reverse_iterator p = pg_log.get_log().log.rbegin(); + p != pg_log.get_log().log.rend(); ++p) { if (did.count(p->soid)) continue; @@ -7425,7 +7423,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) assert(repop); repop->queue_snap_trimmer = true; - eversion_t old_last_update = pg->log.head; + eversion_t old_last_update = pg->pg_log.get_head(); bool old_exists = repop->obc->obs.exists; uint64_t old_size = repop->obc->obs.oi.size; eversion_t old_version = repop->obc->obs.oi.version; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 644a277f0dc..dcfecd3e61a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -677,7 +677,7 @@ protected: const hobject_t &last_backfill, interval_set& data_subset, map >& clone_subsets); - void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, pg_missing_t& missing, + void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing, const hobject_t &last_backfill, interval_set& data_subset, map >& clone_subsets); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index c1b61d8b68f..44755bc7b1e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1400,6 +1400,24 @@ struct pg_log_t { return head.version - tail.version; } + list::const_iterator find_entry(eversion_t v) const { + int fromhead = head.version - v.version; + int fromtail = v.version - tail.version; + list::const_iterator p; + if (fromhead < fromtail) { + p = log.end(); + --p; + while (p->version > v) + --p; + return p; + } else { + p = log.begin(); + while (p->version < v) + ++p; + return p; + } + } + list::iterator find_entry(eversion_t v) { int fromhead = head.version - v.version; int fromtail = v.version - tail.version; diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index a1d2773a946..1838586d84c 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -29,8 +29,8 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental) #include "crush/CrushWrapper.h" TYPE(CrushWrapper) -#include "osd/PG.h" -TYPE(PG::OndiskLog) +#include "osd/PGLog.h" +TYPE(PGLog::OndiskLog) #include "osd/osd_types.h" TYPE(osd_reqid_t) diff --git a/src/test/test_osd_types.cc b/src/test/test_osd_types.cc index b696ace3ac2..d3f5364f793 100644 --- a/src/test/test_osd_types.cc +++ b/src/test/test_osd_types.cc @@ -216,3 +216,35 @@ TEST(pg_t, split) ASSERT_TRUE(s.count(pg_t(7, 0, -1))); } + +TEST(pg_missing_t, constructor) +{ + pg_missing_t missing; + EXPECT_EQ((unsigned int)0, missing.num_missing()); + EXPECT_FALSE(missing.have_missing()); +} + +TEST(pg_missing_t, add_next_event) +{ + pg_missing_t missing; + + // adding a DELETE entry + hobject_t oid(object_t("objname"), "key", 123, 456, 0); + eversion_t version(1,2); + eversion_t prior_version(3,4); + pg_log_entry_t e(pg_log_entry_t::DELETE, oid, version, prior_version, + osd_reqid_t(entity_name_t::CLIENT(777), 8, 999), utime_t(8,9)); + EXPECT_FALSE(missing.have_missing()); + missing.add_next_event(e); + EXPECT_FALSE(missing.have_missing()); + + // adding a MODIFY entry + e.op = pg_log_entry_t::MODIFY; + EXPECT_FALSE(missing.have_missing()); + missing.add_next_event(e); + EXPECT_TRUE(missing.have_missing()); +} + +// Local Variables: +// compile-command: "cd .. ; make unittest_osd_types ; ./unittest_osd_types # --gtest_filter=pg_missing_t.constructor " +// End: diff --git a/src/tools/ceph-filestore-dump.cc b/src/tools/ceph-filestore-dump.cc index 90a7d40cba1..4953bb6062e 100644 --- a/src/tools/ceph-filestore-dump.cc +++ b/src/tools/ceph-filestore-dump.cc @@ -32,7 +32,7 @@ #include "os/FileStore.h" #include "common/perf_counters.h" #include "common/errno.h" -#include "osd/PG.h" +#include "osd/PGLog.h" #include "osd/OSD.h" namespace po = boost::program_options; @@ -322,12 +322,12 @@ static void invalid_path(string &path) } int get_log(ObjectStore *fs, coll_t coll, pg_t pgid, const pg_info_t &info, - PG::IndexedLog &log, pg_missing_t &missing) + PGLog::IndexedLog &log, pg_missing_t &missing) { - PG::OndiskLog ondisklog; + PGLog::OndiskLog ondisklog; try { ostringstream oss; - PG::read_log(fs, coll, log_oid, info, ondisklog, log, missing, oss); + PGLog::read_log(fs, coll, log_oid, info, ondisklog, log, missing, oss); if (debug && oss.str().size()) cerr << oss.str() << std::endl; } @@ -514,7 +514,7 @@ int write_info(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info, void write_log(ObjectStore::Transaction &t, pg_log_t &log) { map divergent_priors; - PG::_write_log(t, log, log_oid, divergent_priors); + PGLog::write_log(t, log, log_oid, divergent_priors); } int write_pg(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info, @@ -666,7 +666,7 @@ void write_super() int do_export(ObjectStore *fs, coll_t coll, pg_t pgid, pg_info_t &info, epoch_t map_epoch, __u8 struct_ver) { - PG::IndexedLog log; + PGLog::IndexedLog log; pg_missing_t missing; int ret = get_log(fs, coll, pgid, info, log, missing); @@ -913,7 +913,7 @@ int do_import(ObjectStore *store) { bufferlist ebl; pg_info_t info; - PG::IndexedLog log; + PGLog::IndexedLog log; uint64_t next_removal_seq = 0; //My local seq finish_remove_pgs(store, &next_removal_seq); @@ -1268,7 +1268,7 @@ int main(int argc, char **argv) formatter->flush(cout); cout << std::endl; } else if (type == "log") { - PG::IndexedLog log; + PGLog::IndexedLog log; pg_missing_t missing; ret = get_log(fs, coll, pgid, info, log, missing); if (ret > 0) -- cgit v1.2.1