summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-05-30 22:39:12 -0700
committerSamuel Just <sam.just@inktank.com>2013-05-30 22:39:12 -0700
commitfbf5a242d91e293e4e24fbb94e31e163374c7912 (patch)
treeb169d33bf7c5f08bba58903379a0ac681f265e1a
parentf4eddd7be92efa2f2935f97237964c2252f9023f (diff)
parentec7731f737bcd061d4d1189c391ccff2661ca7ee (diff)
downloadceph-fbf5a242d91e293e4e24fbb94e31e163374c7912.tar.gz
Merge branch 'wip-5046'
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/Makefile.am2
-rw-r--r--src/osd/OSD.cc12
-rw-r--r--src/osd/PG.cc987
-rw-r--r--src/osd/PG.h242
-rw-r--r--src/osd/PGLog.cc789
-rw-r--r--src/osd/PGLog.h391
-rw-r--r--src/osd/ReplicatedPG.cc202
-rw-r--r--src/osd/ReplicatedPG.h2
-rw-r--r--src/osd/osd_types.h18
-rw-r--r--src/test/encoding/types.h4
-rw-r--r--src/test/test_osd_types.cc32
-rw-r--r--src/tools/ceph-filestore-dump.cc16
12 files changed, 1472 insertions, 1225 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 5e176874b11..ad80eac4a74 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1497,6 +1497,7 @@ noinst_LIBRARIES += libos.a
libosd_a_SOURCES = \
osd/PG.cc \
+ osd/PGLog.cc \
osd/ReplicatedPG.cc \
osd/Ager.cc \
osd/OSD.cc \
@@ -1993,6 +1994,7 @@ noinst_HEADERS = \
osd/OpRequest.h\
osd/SnapMapper.h\
osd/PG.h\
+ osd/PGLog.h\
osd/ReplicatedPG.h\
osd/Watch.h\
osd/osd_types.h\
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 0ca3092372f..b6bdf2de409 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1829,7 +1829,7 @@ void OSD::load_pgs()
PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0);
pg->handle_loaded(&rctx);
- dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl;
+ dout(10) << "load_pgs loaded " << *pg << " " << pg->pg_log.get_log() << dendl;
pg->unlock();
}
dout(10) << "load_pgs done" << dendl;
@@ -3034,7 +3034,7 @@ void OSD::RemoveWQ::_process(pair<PGRef, DeletingStateRef> item)
return;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- PG::clear_info_log(
+ PGLog::clear_info_log(
pg->info.pgid,
OSD::make_infos_oid(),
pg->log_oid,
@@ -3645,8 +3645,10 @@ void OSD::do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist
pg->lock();
fout << *pg << std::endl;
- std::map<hobject_t, pg_missing_t::item>::iterator mend = pg->missing.missing.end();
- std::map<hobject_t, pg_missing_t::item>::iterator mi = pg->missing.missing.begin();
+ std::map<hobject_t, pg_missing_t::item>::const_iterator mend =
+ pg->pg_log.get_missing().missing.end();
+ std::map<hobject_t, pg_missing_t::item>::const_iterator mi =
+ pg->pg_log.get_missing().missing.begin();
for (; mi != mend; ++mi) {
fout << mi->first << " -> " << mi->second << std::endl;
map<hobject_t, set<int> >::const_iterator mli =
@@ -5756,7 +5758,7 @@ void OSD::handle_pg_trim(OpRequestRef op)
} else {
// primary is instructing us to trim
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- pg->trim(*t, m->trim_to);
+ pg->pg_log.trim(*t, m->trim_to, pg->info, pg->log_oid);
pg->dirty_info = true;
pg->write_if_dirty(*t);
int tr = store->queue_transaction(pg->osr.get(), t,
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index da6a68ed387..94c10d0ab6e 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -211,33 +211,6 @@ std::string PG::gen_prefix() const
return out.str();
}
-
-
-void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s)
-{
- if (complete_to != log.end() &&
- complete_to->version <= s) {
- generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version
- << " on " << *this << dendl;
- }
-
- set<string> keys_to_rm;
- while (!log.empty()) {
- pg_log_entry_t &e = *log.begin();
- if (e.version > s)
- break;
- generic_dout(20) << "trim " << e << dendl;
- unindex(e); // remove from index,
- keys_to_rm.insert(e.get_key_name());
- log.pop_front(); // from log
- }
- t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm);
-
- // raise tail?
- if (tail < s)
- tail = s;
-}
-
/********* PG **********/
void PG::proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing, int from)
@@ -264,106 +237,7 @@ void PG::proc_replica_log(ObjectStore::Transaction& t,
dout(10) << "proc_replica_log for osd." << from << ": "
<< oinfo << " " << olog << " " << omissing << dendl;
- /*
- basically what we're doing here is rewinding the remote log,
- dropping divergent entries, until we find something that matches
- our master log. we then reset last_update to reflect the new
- point up to which missing is accurate.
-
- later, in activate(), missing will get wound forward again and
- we will send the peer enough log to arrive at the same state.
- */
-
- for (map<hobject_t, pg_missing_t::item>::iterator i = omissing.missing.begin();
- i != omissing.missing.end();
- ++i) {
- dout(20) << " before missing " << i->first << " need " << i->second.need
- << " have " << i->second.have << dendl;
- }
-
- list<pg_log_entry_t>::const_reverse_iterator pp = olog.log.rbegin();
- eversion_t lu(oinfo.last_update);
- while (true) {
- if (pp == olog.log.rend()) {
- if (pp != olog.log.rbegin()) // no last_update adjustment if we discard nothing!
- lu = olog.tail;
- break;
- }
- const pg_log_entry_t& oe = *pp;
-
- // don't continue past the tail of our log.
- if (oe.version <= log.tail)
- break;
-
- if (!log.objects.count(oe.soid)) {
- dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
- ++pp;
- continue;
- }
-
- pg_log_entry_t& ne = *log.objects[oe.soid];
- if (ne.version == oe.version) {
- dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
- lu = pp->version;
- break;
- }
-
- if (oe.soid > oinfo.last_backfill) {
- // past backfill line, don't care
- dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl;
- ++pp;
- continue;
- }
-
- if (ne.version > oe.version) {
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- } else {
- if (oe.is_delete()) {
- if (ne.is_delete()) {
- // old and new are delete
- dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl;
- } else {
- // old delete, new update.
- dout(10) << " had " << oe << " new " << ne << " : missing" << dendl;
- omissing.add(ne.soid, ne.version, eversion_t());
- }
- } else {
- if (ne.is_delete()) {
- // old update, new delete
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.rm(oe.soid, oe.version);
- } else {
- // old update, new update
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.revise_need(ne.soid, ne.version);
- }
- }
- }
-
- ++pp;
- }
-
- if (lu < oinfo.last_update) {
- dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
- oinfo.last_update = lu;
- }
-
- if (omissing.have_missing()) {
- eversion_t first_missing =
- omissing.missing[omissing.rmissing.begin()->second].need;
- oinfo.last_complete = eversion_t();
- list<pg_log_entry_t>::const_iterator i = olog.log.begin();
- for (;
- i != olog.log.end();
- ++i) {
- if (i->version < first_missing)
- oinfo.last_complete = i->version;
- else
- break;
- }
- } else {
- oinfo.last_complete = oinfo.last_update;
- }
+ pg_log.proc_replica_log(t, oinfo, olog, omissing, from);
peer_info[from] = oinfo;
dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl;
@@ -429,271 +303,24 @@ void PG::remove_snap_mapped_object(
}
}
-
-/*
- * merge an old (possibly divergent) log entry into the new log. this
- * happens _after_ new log items have been assimilated. thus, we assume
- * the index already references newer entries (if present), and missing
- * has been updated accordingly.
- *
- * return true if entry is not divergent.
- */
-bool PG::merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe)
+void PG::merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from)
{
- if (oe.soid > info.last_backfill) {
- dout(20) << "merge_old_entry had " << oe << " : beyond last_backfill" << dendl;
- return false;
- }
- if (log.objects.count(oe.soid)) {
- pg_log_entry_t &ne = *log.objects[oe.soid]; // new(er?) entry
-
- if (ne.version > oe.version) {
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl;
- assert(ne.is_delete() || missing.is_missing(ne.soid));
- return false;
- }
- if (ne.version == oe.version) {
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : same" << dendl;
- return true;
- }
- if (oe.is_delete()) {
- if (ne.is_delete()) {
- // old and new are delete
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : both deletes" << dendl;
- } else {
- // old delete, new update.
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl;
- missing.revise_need(ne.soid, ne.version);
- }
- } else {
- if (ne.is_delete()) {
- // old update, new delete
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl;
- missing.rm(oe.soid, oe.version);
- } else {
- // old update, new update
- dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl;
- missing.revise_need(ne.soid, ne.version);
- }
- }
- } else if (oe.op == pg_log_entry_t::CLONE) {
- assert(oe.soid.snap != CEPH_NOSNAP);
- dout(20) << "merge_old_entry had " << oe
- << ", clone with no non-divergent log entries, "
- << "deleting" << dendl;
- remove_snap_mapped_object(t, oe.soid);
- if (missing.is_missing(oe.soid))
- missing.rm(oe.soid, missing.missing[oe.soid].need);
- } else if (oe.prior_version > info.log_tail) {
- /**
- * oe.prior_version is a previously divergent log entry
- * oe.soid must have already been handled and the missing
- * set updated appropriately
- */
- dout(20) << "merge_old_entry had oe " << oe
- << " with divergent prior_version " << oe.prior_version
- << " oe.soid " << oe.soid
- << " must already have been merged" << dendl;
- } else {
- if (!oe.is_delete()) {
- dout(20) << "merge_old_entry had " << oe << " deleting" << dendl;
- remove_snap_mapped_object(t, oe.soid);
- }
- dout(20) << "merge_old_entry had " << oe << " updating missing to "
- << oe.prior_version << dendl;
- if (oe.prior_version > eversion_t()) {
- ondisklog.add_divergent_prior(oe.prior_version, oe.soid);
- dirty_log = true;
- missing.revise_need(oe.soid, oe.prior_version);
- } else if (missing.is_missing(oe.soid)) {
- missing.rm(oe.soid, missing.missing[oe.soid].need);
- }
- }
- return false;
+ list<hobject_t> to_remove;
+ pg_log.merge_log(t, oinfo, olog, from, info, to_remove, dirty_log, dirty_info, dirty_big_info);
+ for(list<hobject_t>::iterator i = to_remove.begin();
+ i != to_remove.end();
+ i++)
+ remove_snap_mapped_object(t, *i);
}
-/**
- * rewind divergent entries at the head of the log
- *
- * This rewinds entries off the head of our log that are divergent.
- * This is used by replicas during activation.
- *
- * @param t transaction
- * @param newhead new head to rewind to
- */
void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
{
- dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl;
- assert(newhead > log.tail);
-
- list<pg_log_entry_t>::iterator p = log.log.end();
- list<pg_log_entry_t> divergent;
- while (true) {
- if (p == log.log.begin()) {
- // yikes, the whole thing is divergent!
- divergent.swap(log.log);
- break;
- }
- --p;
- if (p->version == newhead) {
- ++p;
- divergent.splice(divergent.begin(), log.log, p, log.log.end());
- break;
- }
- assert(p->version > newhead);
- dout(10) << "rewind_divergent_log future divergent " << *p << dendl;
- log.unindex(*p);
- }
-
- log.head = newhead;
- info.last_update = newhead;
- if (info.last_complete > newhead)
- info.last_complete = newhead;
-
- for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
- merge_old_entry(t, *d);
-
- dirty_info = true;
- dirty_big_info = true;
- dirty_log = true;
-}
-
-void PG::merge_log(ObjectStore::Transaction& t,
- pg_info_t &oinfo, pg_log_t &olog, int fromosd)
-{
- dout(10) << "merge_log " << olog << " from osd." << fromosd
- << " into " << log << dendl;
-
- // Check preconditions
-
- // If our log is empty, the incoming log needs to have not been trimmed.
- assert(!log.null() || olog.tail == eversion_t());
- // The logs must overlap.
- assert(log.head >= olog.tail && olog.head >= log.tail);
-
- for (map<hobject_t, pg_missing_t::item>::iterator i = missing.missing.begin();
- i != missing.missing.end();
- ++i) {
- dout(20) << "pg_missing_t sobject: " << i->first << dendl;
- }
-
- bool changed = false;
-
- // extend on tail?
- // this is just filling in history. it does not affect our
- // missing set, as that should already be consistent with our
- // current log.
- if (olog.tail < log.tail) {
- dout(10) << "merge_log extending tail to " << olog.tail << dendl;
- list<pg_log_entry_t>::iterator from = olog.log.begin();
- list<pg_log_entry_t>::iterator to;
- for (to = from;
- to != olog.log.end();
- ++to) {
- if (to->version > log.tail)
- break;
- log.index(*to);
- dout(15) << *to << dendl;
- }
- assert(to != olog.log.end() ||
- (olog.head == info.last_update));
-
- // splice into our log.
- log.log.splice(log.log.begin(),
- olog.log, from, to);
-
- info.log_tail = log.tail = olog.tail;
- changed = true;
- }
-
- if (oinfo.stats.reported < info.stats.reported) // make sure reported always increases
- oinfo.stats.reported = info.stats.reported;
- if (info.last_backfill.is_max())
- info.stats = oinfo.stats;
-
- // do we have divergent entries to throw out?
- if (olog.head < log.head) {
- rewind_divergent_log(t, olog.head);
- changed = true;
- }
-
- // extend on head?
- if (olog.head > log.head) {
- dout(10) << "merge_log extending head to " << olog.head << dendl;
-
- // find start point in olog
- list<pg_log_entry_t>::iterator to = olog.log.end();
- list<pg_log_entry_t>::iterator from = olog.log.end();
- eversion_t lower_bound = olog.tail;
- while (1) {
- if (from == olog.log.begin())
- break;
- --from;
- dout(20) << " ? " << *from << dendl;
- if (from->version <= log.head) {
- dout(20) << "merge_log cut point (usually last shared) is " << *from << dendl;
- lower_bound = from->version;
- ++from;
- break;
- }
- }
-
- // index, update missing, delete deleted
- for (list<pg_log_entry_t>::iterator p = from; p != to; ++p) {
- pg_log_entry_t &ne = *p;
- dout(20) << "merge_log " << ne << dendl;
- log.index(ne);
- if (ne.soid <= info.last_backfill) {
- missing.add_next_event(ne);
- if (ne.is_delete())
- remove_snap_mapped_object(t, ne.soid);
- }
- }
-
- // move aside divergent items
- list<pg_log_entry_t> divergent;
- while (!log.empty()) {
- pg_log_entry_t &oe = *log.log.rbegin();
- /*
- * look at eversion.version here. we want to avoid a situation like:
- * our log: 100'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529
- * new log: 122'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529
- * lower_bound = 100'9
- * i.e, same request, different version. If the eversion.version is > the
- * lower_bound, we it is divergent.
- */
- if (oe.version.version <= lower_bound.version)
- break;
- dout(10) << "merge_log divergent " << oe << dendl;
- divergent.push_front(oe);
- log.unindex(oe);
- log.log.pop_back();
- }
-
- // splice
- log.log.splice(log.log.end(),
- olog.log, from, to);
- log.index();
-
- info.last_update = log.head = olog.head;
- info.purged_snaps = oinfo.purged_snaps;
-
- // process divergent items
- if (!divergent.empty()) {
- for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
- merge_old_entry(t, *d);
- }
-
- changed = true;
- }
-
- dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl;
-
- if (changed) {
- dirty_info = true;
- dirty_big_info = true;
- dirty_log = true;
- }
+ list<hobject_t> to_remove;
+ pg_log.rewind_divergent_log(t, newhead, info, to_remove, dirty_log, dirty_info, dirty_big_info);
+ for(list<hobject_t>::iterator i = to_remove.begin();
+ i != to_remove.end();
+ i++)
+ remove_snap_mapped_object(t, *i);
}
/*
@@ -714,8 +341,8 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
peer_missing[fromosd];
// found items?
- for (map<hobject_t,pg_missing_t::item>::iterator p = missing.missing.begin();
- p != missing.missing.end();
+ for (map<hobject_t,pg_missing_t::item>::const_iterator p = pg_log.get_missing().missing.begin();
+ p != pg_log.get_missing().missing.end();
++p) {
const hobject_t &soid(p->first);
eversion_t need = p->second.need;
@@ -775,12 +402,13 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
publish_stats_to_osd();
}
- dout(20) << "search_for_missing missing " << missing.missing << dendl;
+ dout(20) << "search_for_missing missing " << pg_log.get_missing().missing << dendl;
return found_missing;
}
void PG::discover_all_missing(map< int, map<pg_t,pg_query_t> > &query_map)
{
+ const pg_missing_t &missing = pg_log.get_missing();
assert(missing.have_missing());
dout(10) << __func__ << " "
@@ -826,23 +454,6 @@ void PG::discover_all_missing(map< int, map<pg_t,pg_query_t> > &query_map)
}
}
-
-ostream& PG::IndexedLog::print(ostream& out) const
-{
- out << *this << std::endl;
- for (list<pg_log_entry_t>::const_iterator p = log.begin();
- p != log.end();
- ++p) {
- out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl;
- assert(!p->reqid_is_indexed() || logged_req(p->reqid));
- }
- return out;
-}
-
-
-
-
-
/******* PG ***********/
bool PG::needs_recovery() const
{
@@ -850,6 +461,8 @@ bool PG::needs_recovery() const
bool ret = false;
+ const pg_missing_t &missing = pg_log.get_missing();
+
if (missing.num_missing()) {
dout(10) << __func__ << " primary has " << missing.num_missing() << dendl;
ret = true;
@@ -1120,7 +733,7 @@ void PG::clear_primary_state()
missing_loc.clear();
missing_loc_sources.clear();
- log.reset_recovery_pointers();
+ pg_log.reset_recovery_pointers();
scrubber.reserved_peers.clear();
scrub_after_recovery = false;
@@ -1486,6 +1099,8 @@ void PG::activate(ObjectStore::Transaction& t,
info.last_epoch_started = query_epoch;
+ const pg_missing_t &missing = pg_log.get_missing();
+
if (is_primary()) {
// If necessary, create might_have_unfound to help us find our unfound objects.
// NOTE: It's important that we build might_have_unfound before trimming the
@@ -1530,24 +1145,10 @@ void PG::activate(ObjectStore::Transaction& t,
dout(10) << "activate - no missing, moving last_complete " << info.last_complete
<< " -> " << info.last_update << dendl;
info.last_complete = info.last_update;
- log.reset_recovery_pointers();
+ pg_log.reset_recovery_pointers();
} else {
dout(10) << "activate - not complete, " << missing << dendl;
- log.complete_to = log.log.begin();
- while (log.complete_to->version <
- missing.missing[missing.rmissing.begin()->second].need)
- log.complete_to++;
- assert(log.complete_to != log.log.end());
- if (log.complete_to == log.log.begin()) {
- info.last_complete = eversion_t();
- } else {
- log.complete_to--;
- info.last_complete = log.complete_to->version;
- log.complete_to++;
- }
- log.last_requested = 0;
- dout(10) << "activate - complete_to = " << log.complete_to->version
- << dendl;
+ pg_log.activate_not_complete(info);
if (is_primary()) {
dout(10) << "activate - starting recovery" << dendl;
osd->queue_for_recovery(this);
@@ -1592,7 +1193,7 @@ void PG::activate(ObjectStore::Transaction& t,
dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl;
m = new MOSDPGLog(get_osdmap()->get_epoch(), info);
}
- } else if (log.tail > pi.last_update || pi.last_backfill == hobject_t()) {
+ } else if (pg_log.get_tail() > pi.last_update || pi.last_backfill == hobject_t()) {
// backfill
osd->clog.info() << info.pgid << " restarting backfill on osd." << peer
<< " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill
@@ -1607,17 +1208,17 @@ void PG::activate(ObjectStore::Transaction& t,
m = new MOSDPGLog(get_osdmap()->get_epoch(), pi);
// send some recent log, so that op dup detection works well.
- m->log.copy_up_to(log, g_conf->osd_min_pg_log_entries);
+ m->log.copy_up_to(pg_log.get_log(), g_conf->osd_min_pg_log_entries);
m->info.log_tail = m->log.tail;
pi.log_tail = m->log.tail; // sigh...
pm.clear();
} else {
// catch up
- assert(log.tail <= pi.last_update);
+ assert(pg_log.get_tail() <= pi.last_update);
m = new MOSDPGLog(get_osdmap()->get_epoch(), info);
// send new stuff to append to replicas log
- m->log.copy_after(log, pi.last_update);
+ m->log.copy_after(pg_log.get_log(), pi.last_update);
}
// share past_intervals if we are creating the pg on the replica
@@ -2047,47 +1648,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
osd->osd->finish_recovery_op(this, soid, dequeue);
}
-void PG::IndexedLog::split_into(
- pg_t child_pgid,
- unsigned split_bits,
- PG::IndexedLog *olog)
-{
- list<pg_log_entry_t> oldlog;
- oldlog.swap(log);
-
- eversion_t old_tail;
- olog->head = head;
- olog->tail = tail;
- unsigned mask = ~((~0)<<split_bits);
- for (list<pg_log_entry_t>::iterator i = oldlog.begin();
- i != oldlog.end();
- ) {
- if ((i->soid.hash & mask) == child_pgid.m_seed) {
- olog->log.push_back(*i);
- if (log.empty())
- tail = i->version;
- } else {
- log.push_back(*i);
- if (olog->empty())
- olog->tail = i->version;
- }
- oldlog.erase(i++);
- }
-
- if (log.empty())
- tail = head;
- else
- head = log.rbegin()->version;
-
- if (olog->empty())
- olog->tail = olog->head;
- else
- olog->head = olog->log.rbegin()->version;
-
- olog->index();
- index();
-}
-
static void split_list(
list<OpRequestRef> *from,
list<OpRequestRef> *to,
@@ -2149,22 +1709,19 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
child->pool = pool;
// Log
- log.split_into(child_pgid, split_bits, &(child->log));
+ pg_log.split_into(child_pgid, split_bits, &(child->pg_log));
child->info.last_complete = info.last_complete;
- info.last_update = log.head;
- child->info.last_update = child->log.head;
-
- info.log_tail = log.tail;
- child->info.log_tail = child->log.tail;
+ info.last_update = pg_log.get_head();
+ child->info.last_update = child->pg_log.get_head();
- if (info.last_complete < log.tail)
- info.last_complete = log.tail;
- if (child->info.last_complete < child->log.tail)
- child->info.last_complete = child->log.tail;
+ info.log_tail = pg_log.get_tail();
+ child->info.log_tail = child->pg_log.get_tail();
- // Missing
- missing.split_into(child_pgid, split_bits, &(child->missing));
+ if (info.last_complete < pg_log.get_tail())
+ info.last_complete = pg_log.get_tail();
+ if (child->info.last_complete < child->pg_log.get_tail())
+ child->info.last_complete = child->pg_log.get_tail();
// Info
child->info.history = info.history;
@@ -2201,7 +1758,7 @@ void PG::clear_recovery_state()
{
dout(10) << "clear_recovery_state" << dendl;
- log.reset_recovery_pointers();
+ pg_log.reset_recovery_pointers();
finish_sync_event = 0;
hobject_t soid;
@@ -2339,10 +1896,10 @@ void PG::publish_stats_to_osd()
info.stats.last_active = now;
info.stats.last_unstale = now;
- info.stats.log_size = ondisklog.length();
- info.stats.ondisk_log_size = ondisklog.length();
- info.stats.log_start = log.tail;
- info.stats.ondisk_log_start = log.tail;
+ info.stats.log_size = pg_log.get_ondisklog().length();
+ info.stats.ondisk_log_size = pg_log.get_ondisklog().length();
+ info.stats.log_start = pg_log.get_tail();
+ info.stats.ondisk_log_start = pg_log.get_tail();
pg_stats_publish_valid = true;
pg_stats_publish = info.stats;
@@ -2364,8 +1921,9 @@ void PG::publish_stats_to_osd()
degraded += (target - acting.size()) * num_objects;
// missing on primary
- pg_stats_publish.stats.sum.num_objects_missing_on_primary = missing.num_missing();
- degraded += missing.num_missing();
+ pg_stats_publish.stats.sum.num_objects_missing_on_primary =
+ pg_log.get_missing().num_missing();
+ degraded += pg_log.get_missing().num_missing();
for (unsigned i=1; i<acting.size(); i++) {
assert(peer_missing.count(acting[i]));
@@ -2648,21 +2206,6 @@ void PG::write_info(ObjectStore::Transaction& t)
dirty_big_info = false;
}
-void PG::clear_info_log(
- pg_t pgid,
- const hobject_t &infos_oid,
- const hobject_t &log_oid,
- ObjectStore::Transaction *t) {
-
- set<string> keys_to_remove;
- keys_to_remove.insert(get_epoch_key(pgid));
- keys_to_remove.insert(get_biginfo_key(pgid));
- keys_to_remove.insert(get_info_key(pgid));
-
- t->remove(coll_t::META_COLL, log_oid);
- t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove);
-}
-
epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid, bufferlist *bl)
{
assert(bl);
@@ -2695,30 +2238,9 @@ epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid
return cur_epoch;
}
-void PG::_write_log(ObjectStore::Transaction& t, pg_log_t &log,
- const hobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors)
-{
- //dout(10) << "write_log" << dendl;
- t.remove(coll_t::META_COLL, log_oid);
- t.touch(coll_t::META_COLL, log_oid);
- map<string,bufferlist> keys;
- for (list<pg_log_entry_t>::iterator p = log.log.begin();
- p != log.log.end();
- ++p) {
- bufferlist bl(sizeof(*p) * 2);
- p->encode_with_checksum(bl);
- keys[p->get_key_name()].claim(bl);
- }
- //dout(10) << "write_log " << keys.size() << " keys" << dendl;
-
- ::encode(divergent_priors, keys["divergent_priors"]);
-
- t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
-}
-
void PG::write_log(ObjectStore::Transaction& t)
{
- _write_log(t, log, log_oid, ondisklog.divergent_priors);
+ pg_log.write_log(t, log_oid);
dirty_log = false;
}
@@ -2730,23 +2252,6 @@ void PG::write_if_dirty(ObjectStore::Transaction& t)
write_log(t);
}
-void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to)
-{
- // trim?
- if (trim_to > log.tail) {
- /* If we are trimming, we must be complete up to trim_to, time
- * to throw out any divergent_priors
- */
- ondisklog.divergent_priors.clear();
- // We shouldn't be trimming the log past last_complete
- assert(trim_to <= info.last_complete);
-
- dout(10) << "trim " << log << " to " << trim_to << dendl;
- log.trim(t, log_oid, trim_to);
- info.log_tail = log.tail;
- }
-}
-
void PG::trim_peers()
{
calc_trim_to();
@@ -2771,7 +2276,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl)
info.last_update = e.version;
// log mutation
- log.add(e);
+ pg_log.add(e);
dout(10) << "add_log_entry " << e << dendl;
e.encode_with_checksum(log_bl);
@@ -2781,7 +2286,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl)
void PG::append_log(
vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t)
{
- dout(10) << "append_log " << log << " " << logv << dendl;
+ dout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl;
map<string,bufferlist> keys;
for (vector<pg_log_entry_t>::iterator p = logv.begin();
@@ -2794,7 +2299,7 @@ void PG::append_log(
dout(10) << "append_log adding " << keys.size() << " keys" << dendl;
t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
- trim(t, trim_to);
+ pg_log.trim(t, trim_to, info, log_oid);
// update the local pg, pg log
dirty_info = true;
@@ -2803,7 +2308,7 @@ void PG::append_log(
bool PG::check_log_for_corruption(ObjectStore *store)
{
- OndiskLog bounds;
+ PGLog::OndiskLog bounds;
bufferlist blb;
store->collection_getattr(coll, "ondisklog", blb);
bufferlist::iterator p = blb.begin();
@@ -2958,14 +2463,14 @@ void PG::read_state(ObjectStore *store, bufferlist &bl)
assert(r >= 0);
ostringstream oss;
- if (read_log(
+ if (pg_log.read_log(
store, coll, log_oid, info,
- ondisklog, log, missing, oss, this)) {
+ oss)) {
/* We don't want to leave the old format around in case the next log
* write happens to be an append_log()
*/
ObjectStore::Transaction t;
- write_log(t);
+ pg_log.write_log(t, log_oid);
int r = osd->store->apply_transaction(t);
assert(!r);
}
@@ -2978,36 +2483,40 @@ void PG::read_state(ObjectStore *store, bufferlist &bl)
void PG::log_weirdness()
{
- if (log.tail != info.log_tail)
+ if (pg_log.get_tail() != info.log_tail)
osd->clog.error() << info.pgid
- << " info mismatch, log.tail " << log.tail
+ << " info mismatch, log.tail " << pg_log.get_tail()
<< " != info.log_tail " << info.log_tail
<< "\n";
- if (log.head != info.last_update)
+ if (pg_log.get_head() != info.last_update)
osd->clog.error() << info.pgid
- << " info mismatch, log.head " << log.head
+ << " info mismatch, log.head " << pg_log.get_head()
<< " != info.last_update " << info.last_update
<< "\n";
- if (log.empty()) {
+ if (pg_log.get_log().empty()) {
// shoudl it be?
- if (log.head != log.tail)
+ if (pg_log.get_head() != pg_log.get_tail())
osd->clog.error() << info.pgid
- << " log bound mismatch, empty but (" << log.tail << "," << log.head << "]\n";
+ << " log bound mismatch, empty but (" << pg_log.get_tail() << ","
+ << pg_log.get_head() << "]\n";
} else {
- if ((log.log.begin()->version <= log.tail) || // sloppy check
- (log.log.rbegin()->version != log.head && !(log.head == log.tail)))
+ if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail()) || // sloppy check
+ (pg_log.get_log().log.rbegin()->version != pg_log.get_head() &&
+ !(pg_log.get_head() == pg_log.get_tail())))
osd->clog.error() << info.pgid
- << " log bound mismatch, info (" << log.tail << "," << log.head << "]"
+ << " log bound mismatch, info (" << pg_log.get_tail() << ","
+ << pg_log.get_head() << "]"
<< " actual ["
- << log.log.begin()->version << "," << log.log.rbegin()->version << "]"
+ << pg_log.get_log().log.begin()->version << ","
+ << pg_log.get_log().log.rbegin()->version << "]"
<< "\n";
}
- if (log.caller_ops.size() > log.log.size()) {
+ if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) {
osd->clog.error() << info.pgid
- << " caller_ops.size " << log.caller_ops.size()
- << " > log size " << log.log.size()
+ << " caller_ops.size " << pg_log.get_log().caller_ops.size()
+ << " > log size " << pg_log.get_log().log.size()
<< "\n";
}
}
@@ -3691,17 +3200,17 @@ void PG::build_inc_scrub_map(
map.valid_through = last_update_applied;
map.incr_since = v;
vector<hobject_t> ls;
- list<pg_log_entry_t>::iterator p;
- if (v == log.tail) {
- p = log.log.begin();
- } else if (v > log.tail) {
- p = log.find_entry(v);
+ list<pg_log_entry_t>::const_iterator p;
+ if (v == pg_log.get_tail()) {
+ p = pg_log.get_log().log.begin();
+ } else if (v > pg_log.get_tail()) {
+ p = pg_log.get_log().find_entry(v);
++p;
} else {
assert(0);
}
- for (; p != log.log.end(); ++p) {
+ for (; p != pg_log.get_log().log.end(); ++p) {
if (p->is_update()) {
ls.push_back(p->soid);
map.objects[p->soid].negative = false;
@@ -3731,11 +3240,11 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer
// We should only be scrubbing if the PG is clean.
assert(waiting_for_missing_object.empty());
- missing.add(soid, oi.version, eversion_t());
+ pg_log.missing_add(soid, oi.version, eversion_t());
missing_loc[soid].insert(ok_peer);
missing_loc_sources.insert(ok_peer);
- log.last_requested = 0;
+ pg_log.set_last_requested(0);
}
}
@@ -4010,7 +3519,7 @@ void PG::classic_scrub(ThreadPool::TPHandle &handle)
return;
}
- if (scrubber.primary_scrubmap.valid_through != log.head) {
+ if (scrubber.primary_scrubmap.valid_through != pg_log.get_head()) {
ScrubMap incr;
build_inc_scrub_map(incr, scrubber.primary_scrubmap.valid_through, handle);
scrubber.primary_scrubmap.merge_incr(incr);
@@ -4179,9 +3688,9 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
scrubber.block_writes = true;
// walk the log to find the latest update that affects our chunk
- scrubber.subset_last_update = log.tail;
- for (list<pg_log_entry_t>::iterator p = log.log.begin();
- p != log.log.end();
+ scrubber.subset_last_update = pg_log.get_tail();
+ for (list<pg_log_entry_t>::const_iterator p = pg_log.get_log().log.begin();
+ p != pg_log.get_log().log.end();
++p) {
if (p->soid >= scrubber.start && p->soid < scrubber.end)
scrubber.subset_last_update = p->version;
@@ -4324,7 +3833,7 @@ bool PG::scrub_gather_replica_maps()
p != scrubber.received_maps.end();
++p) {
- if (scrubber.received_maps[p->first].valid_through != log.head) {
+ if (scrubber.received_maps[p->first].valid_through != pg_log.get_head()) {
scrubber.waiting_on++;
scrubber.waiting_on_whom.insert(p->first);
// Need to request another incremental map
@@ -4831,7 +4340,7 @@ void PG::share_pg_log()
pg_info_t& pinfo(peer_info[peer]);
MOSDPGLog *m = new MOSDPGLog(info.last_update.epoch, info);
- m->log.copy_after(log, pinfo.last_update);
+ m->log.copy_after(pg_log.get_log(), pinfo.last_update);
for (list<pg_log_entry_t>::const_iterator i = m->log.log.begin();
i != m->log.log.end();
@@ -4871,23 +4380,23 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
MOSDPGLog *mlog = new MOSDPGLog(get_osdmap()->get_epoch(),
info, query_epoch);
- mlog->missing = missing;
+ mlog->missing = pg_log.get_missing();
// primary -> other, when building master log
if (query.type == pg_query_t::LOG) {
dout(10) << " sending info+missing+log since " << query.since
<< dendl;
- if (query.since != eversion_t() && query.since < log.tail) {
+ if (query.since != eversion_t() && query.since < pg_log.get_tail()) {
osd->clog.error() << info.pgid << " got broken pg_query_t::LOG since " << query.since
- << " when my log.tail is " << log.tail
+ << " when my log.tail is " << pg_log.get_tail()
<< ", sending full log instead\n";
- mlog->log = log; // primary should not have requested this!!
+ mlog->log = pg_log.get_log(); // primary should not have requested this!!
} else
- mlog->log.copy_after(log, query.since);
+ mlog->log.copy_after(pg_log.get_log(), query.since);
}
else if (query.type == pg_query_t::FULLLOG) {
dout(10) << " sending info+missing+full log" << dendl;
- mlog->log = log;
+ mlog->log = pg_log.get_log();
}
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
@@ -5259,21 +4768,22 @@ ostream& operator<<(ostream& out, const PG& pg)
if (pg.recovery_ops_active)
out << " rops=" << pg.recovery_ops_active;
- if (pg.log.tail != pg.info.log_tail ||
- pg.log.head != pg.info.last_update)
- out << " (info mismatch, " << pg.log << ")";
+ if (pg.pg_log.get_tail() != pg.info.log_tail ||
+ pg.pg_log.get_head() != pg.info.last_update)
+ out << " (info mismatch, " << pg.pg_log.get_log() << ")";
- if (pg.log.empty()) {
+ if (pg.pg_log.get_log().empty()) {
// shoudl it be?
- if (pg.log.head.version - pg.log.tail.version != 0) {
+ if (pg.pg_log.get_head().version - pg.pg_log.get_tail().version != 0) {
out << " (log bound mismatch, empty)";
}
} else {
- if ((pg.log.log.begin()->version <= pg.log.tail) || // sloppy check
- (pg.log.log.rbegin()->version != pg.log.head && !(pg.log.head == pg.log.tail))) {
+ if ((pg.pg_log.get_log().log.begin()->version <= pg.pg_log.get_tail()) || // sloppy check
+ (pg.pg_log.get_log().log.rbegin()->version != pg.pg_log.get_head() &&
+ !(pg.pg_log.get_head() == pg.pg_log.get_tail()))) {
out << " (log bound mismatch, actual=["
- << pg.log.log.begin()->version << ","
- << pg.log.log.rbegin()->version << "]";
+ << pg.pg_log.get_log().log.begin()->version << ","
+ << pg.pg_log.get_log().log.rbegin()->version << "]";
//out << "len=" << pg.log.log.size();
out << ")";
}
@@ -5300,9 +4810,9 @@ ostream& operator<<(ostream& out, const PG& pg)
if (pg.scrubber.must_scrub)
out << " MUST_SCRUB";
- //out << " (" << pg.log.tail << "," << pg.log.head << "]";
- if (pg.missing.num_missing()) {
- out << " m=" << pg.missing.num_missing();
+ //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
+ if (pg.pg_log.get_missing().num_missing()) {
+ out << " m=" << pg.pg_log.get_missing().num_missing();
if (pg.is_primary()) {
int unfound = pg.get_num_unfound();
if (unfound)
@@ -5601,265 +5111,6 @@ std::ostream& operator<<(std::ostream& oss,
return oss;
}
-/*---------------------------------------------------*/
-// Handle staitc function so it can use dout()
-#undef dout_prefix
-#define dout_prefix if (passedpg) _prefix(_dout, passedpg)
-
-bool PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
- const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
- pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
-{
- dout(10) << "read_log" << dendl;
- bool rewrite_log = false;
-
- // legacy?
- struct stat st;
- int r = store->stat(coll_t::META_COLL, log_oid, &st);
- assert(r == 0);
- if (st.st_size > 0) {
- read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg);
- rewrite_log = true;
- } else {
- log.tail = info.log_tail;
- ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid);
- if (p) for (p->seek_to_first(); p->valid() ; p->next()) {
- bufferlist bl = p->value();//Copy bufferlist before creating iterator
- bufferlist::iterator bp = bl.begin();
- if (p->key() == "divergent_priors") {
- ::decode(ondisklog.divergent_priors, bp);
- dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl;
- } else {
- pg_log_entry_t e;
- e.decode_with_checksum(bp);
- dout(20) << "read_log " << e << dendl;
- if (!log.log.empty()) {
- pg_log_entry_t last_e(log.log.back());
- assert(last_e.version.version < e.version.version);
- assert(last_e.version.epoch <= e.version.epoch);
- }
- log.log.push_back(e);
- log.head = e.version;
- }
- }
- }
- log.head = info.last_update;
- log.index();
-
- // build missing
- if (info.last_complete < info.last_update) {
- dout(10) << "read_log checking for missing items over interval (" << info.last_complete
- << "," << info.last_update << "]" << dendl;
-
- set<hobject_t> did;
- for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
- i != log.log.rend();
- ++i) {
- if (i->version <= info.last_complete) break;
- if (did.count(i->soid)) continue;
- did.insert(i->soid);
-
- if (i->is_delete()) continue;
-
- bufferlist bv;
- int r = store->getattr(coll, i->soid, OI_ATTR, bv);
- if (r >= 0) {
- object_info_t oi(bv);
- if (oi.version < i->version) {
- dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl;
- missing.add(i->soid, i->version, oi.version);
- }
- } else {
- dout(15) << "read_log missing " << *i << dendl;
- missing.add(i->soid, i->version, eversion_t());
- }
- }
- for (map<eversion_t, hobject_t>::reverse_iterator i =
- ondisklog.divergent_priors.rbegin();
- i != ondisklog.divergent_priors.rend();
- ++i) {
- if (i->first <= info.last_complete) break;
- if (did.count(i->second)) continue;
- did.insert(i->second);
- bufferlist bv;
- int r = store->getattr(coll, i->second, OI_ATTR, bv);
- if (r >= 0) {
- object_info_t oi(bv);
- /**
- * 1) we see this entry in the divergent priors mapping
- * 2) we didn't see an entry for this object in the log
- *
- * From 1 & 2 we know that either the object does not exist
- * or it is at the version specified in the divergent_priors
- * map since the object would have been deleted atomically
- * with the addition of the divergent_priors entry, an older
- * version would not have been recovered, and a newer version
- * would show up in the log above.
- */
- assert(oi.version == i->first);
- } else {
- dout(15) << "read_log missing " << *i << dendl;
- missing.add(i->second, i->first, eversion_t());
- }
- }
- }
- dout(10) << "read_log done" << dendl;
- return rewrite_log;
-}
-
-void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
- const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
- pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
-{
- // load bounds, based on old OndiskLog encoding.
- uint64_t ondisklog_tail = 0;
- uint64_t ondisklog_head = 0;
- uint64_t ondisklog_zero_to;
- bool ondisklog_has_checksums;
-
- bufferlist blb;
- store->collection_getattr(coll, "ondisklog", blb);
- {
- bufferlist::iterator bl = blb.begin();
- DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
- ondisklog_has_checksums = (struct_v >= 2);
- ::decode(ondisklog_tail, bl);
- ::decode(ondisklog_head, bl);
- if (struct_v >= 4)
- ::decode(ondisklog_zero_to, bl);
- else
- ondisklog_zero_to = 0;
- if (struct_v >= 5)
- ::decode(ondisklog.divergent_priors, bl);
- DECODE_FINISH(bl);
- }
- uint64_t ondisklog_length = ondisklog_head - ondisklog_tail;
- dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl;
-
- log.tail = info.log_tail;
-
- // In case of sobject_t based encoding, may need to list objects in the store
- // to find hashes
- vector<hobject_t> ls;
-
- if (ondisklog_head > 0) {
- // read
- bufferlist bl;
- store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl);
- if (bl.length() < ondisklog_length) {
- std::ostringstream oss;
- oss << "read_log got " << bl.length() << " bytes, expected "
- << ondisklog_head << "-" << ondisklog_tail << "="
- << ondisklog_length;
- throw read_log_error(oss.str().c_str());
- }
-
- pg_log_entry_t e;
- bufferlist::iterator p = bl.begin();
- assert(log.empty());
- eversion_t last;
- bool reorder = false;
- bool listed_collection = false;
-
- while (!p.end()) {
- uint64_t pos = ondisklog_tail + p.get_off();
- if (ondisklog_has_checksums) {
- bufferlist ebl;
- ::decode(ebl, p);
- __u32 crc;
- ::decode(crc, p);
-
- __u32 got = ebl.crc32c(0);
- if (crc == got) {
- bufferlist::iterator q = ebl.begin();
- ::decode(e, q);
- } else {
- std::ostringstream oss;
- oss << "read_log " << pos << " bad crc got " << got << " expected" << crc;
- throw read_log_error(oss.str().c_str());
- }
- } else {
- ::decode(e, p);
- }
- dout(20) << "read_log " << pos << " " << e << dendl;
-
- // [repair] in order?
- if (e.version < last) {
- dout(0) << "read_log " << pos << " out of order entry " << e << " follows " << last << dendl;
- oss << info.pgid << " log has out of order entry "
- << e << " following " << last << "\n";
- reorder = true;
- }
-
- if (e.version <= log.tail) {
- dout(20) << "read_log ignoring entry at " << pos << " below log.tail" << dendl;
- continue;
- }
- if (last.version == e.version.version) {
- dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl;
- log.log.pop_back();
- oss << info.pgid << " read_log got dup "
- << e.version << " after " << last << "\n";
- }
-
- if (e.invalid_hash) {
- // We need to find the object in the store to get the hash
- if (!listed_collection) {
- store->collection_list(coll, ls);
- listed_collection = true;
- }
- bool found = false;
- for (vector<hobject_t>::iterator i = ls.begin();
- i != ls.end();
- ++i) {
- if (i->oid == e.soid.oid && i->snap == e.soid.snap) {
- e.soid = *i;
- found = true;
- break;
- }
- }
- if (!found) {
- // Didn't find the correct hash
- std::ostringstream oss;
- oss << "Could not find hash for hoid " << e.soid << std::endl;
- throw read_log_error(oss.str().c_str());
- }
- }
-
- if (e.invalid_pool) {
- e.soid.pool = info.pgid.pool();
- }
-
- e.offset = pos;
- uint64_t endpos = ondisklog_tail + p.get_off();
- log.log.push_back(e);
- last = e.version;
-
- // [repair] at end of log?
- if (!p.end() && e.version == info.last_update) {
- oss << info.pgid << " log has extra data at "
- << endpos << "~" << (ondisklog_head-endpos) << " after "
- << info.last_update << "\n";
-
- dout(0) << "read_log " << endpos << " *** extra gunk at end of log, "
- << "adjusting ondisklog_head" << dendl;
- ondisklog_head = endpos;
- break;
- }
- }
-
- if (reorder) {
- dout(0) << "read_log reordering log" << dendl;
- map<eversion_t, pg_log_entry_t> m;
- for (list<pg_log_entry_t>::iterator p = log.log.begin(); p != log.log.end(); ++p)
- m[p->version] = *p;
- log.log.clear();
- for (map<eversion_t, pg_log_entry_t>::iterator p = m.begin(); p != m.end(); ++p)
- log.log.push_back(p->second);
- }
- }
-}
-
/*------------ Recovery State Machine----------------*/
#undef dout_prefix
#define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() \
@@ -6539,7 +5790,7 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx)
void PG::RecoveryState::Recovering::release_reservations()
{
PG *pg = context< RecoveryMachine >().pg;
- assert(!pg->missing.have_missing());
+ assert(!pg->pg_log.get_missing().have_missing());
pg->state_clear(PG_STATE_RECOVERING);
// release remote reservations
@@ -6718,7 +5969,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
if (g_conf->osd_check_for_log_corruption)
pg->check_log_for_corruption(pg->osd->store);
- int unfound = pg->missing.num_missing() - pg->missing_loc.size();
+ int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size();
if (unfound > 0 &&
pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) {
if (g_conf->osd_auto_mark_unfound_lost) {
@@ -6917,10 +6168,9 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const MLogRec&
{
PG *pg = context< RecoveryMachine >().pg;
dout(10) << "received log from " << logevt.from << dendl;
- pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
- logevt.msg->info, logevt.msg->log, logevt.from);
-
- assert(pg->log.head == pg->info.last_update);
+ ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction();
+ pg->merge_log(*t,logevt.msg->info, logevt.msg->log, logevt.from);
+ assert(pg->pg_log.get_head() == pg->info.last_update);
return discard_event();
}
@@ -6994,14 +6244,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt)
pg->dirty_info = true;
pg->dirty_big_info = true; // maybe.
pg->dirty_log = true;
- pg->log.claim_log(msg->log);
- pg->missing.clear();
+ pg->pg_log.claim_log(msg->log);
} else {
- pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
- msg->info, msg->log, logevt.from);
+ ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction();
+ pg->merge_log(*t, msg->info, msg->log, logevt.from);
}
- assert(pg->log.head == pg->info.last_update);
+ assert(pg->pg_log.get_head() == pg->info.last_update);
post_event(Activate(logevt.msg->get_epoch()));
return transit<ReplicaActive>();
@@ -7014,13 +6263,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoev
if (pg->info.last_update > infoevt.info.last_update) {
// rewind divergent log entries
- pg->rewind_divergent_log(*context< RecoveryMachine >().get_cur_transaction(),
- infoevt.info.last_update);
+ ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction();
+ pg->rewind_divergent_log(*t, infoevt.info.last_update);
pg->info.stats = infoevt.info.stats;
}
assert(infoevt.info.last_update == pg->info.last_update);
- assert(pg->log.head == pg->info.last_update);
+ assert(pg->pg_log.get_head() == pg->info.last_update);
post_event(Activate(infoevt.msg_epoch));
return transit<ReplicaActive>();
@@ -7451,7 +6700,7 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
if (pi.is_empty())
continue; // no pg data, nothing divergent
- if (pi.last_update < pg->log.tail) {
+ if (pi.last_update < pg->pg_log.get_tail()) {
dout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl;
pg->peer_missing[*i];
continue;
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 8d8ad5c4c45..134f5ec470f 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -36,6 +36,7 @@
#include "include/atomic.h"
#include "SnapMapper.h"
+#include "PGLog.h"
#include "OpRequest.h"
#include "OSDMap.h"
#include "os/ObjectStore.h"
@@ -155,212 +156,8 @@ struct PGPool {
class PG {
public:
- /* Exceptions */
- class read_log_error : public buffer::error {
- public:
- explicit read_log_error(const char *what) {
- snprintf(buf, sizeof(buf), "read_log_error: %s", what);
- }
- const char *what() const throw () {
- return buf;
- }
- private:
- char buf[512];
- };
-
std::string gen_prefix() const;
-
- /**
- * IndexLog - adds in-memory index of the log, by oid.
- * plus some methods to manipulate it all.
- */
- struct IndexedLog : public pg_log_t {
- hash_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
- hash_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
-
- // recovery pointers
- list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
- version_t last_requested; // last object requested by primary
-
- /****/
- IndexedLog() : last_requested(0) {}
-
- void claim_log(const pg_log_t& o) {
- log = o.log;
- head = o.head;
- tail = o.tail;
- index();
- }
-
- void split_into(
- pg_t child_pgid,
- unsigned split_bits,
- IndexedLog *olog);
-
- void zero() {
- unindex();
- pg_log_t::clear();
- reset_recovery_pointers();
- }
- void reset_recovery_pointers() {
- complete_to = log.end();
- last_requested = 0;
- }
-
- bool logged_object(const hobject_t& oid) const {
- return objects.count(oid);
- }
- bool logged_req(const osd_reqid_t &r) const {
- return caller_ops.count(r);
- }
- eversion_t get_request_version(const osd_reqid_t &r) const {
- hash_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
- if (p == caller_ops.end())
- return eversion_t();
- return p->second->version;
- }
-
- void index() {
- objects.clear();
- caller_ops.clear();
- for (list<pg_log_entry_t>::iterator i = log.begin();
- i != log.end();
- ++i) {
- objects[i->soid] = &(*i);
- if (i->reqid_is_indexed()) {
- //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
- caller_ops[i->reqid] = &(*i);
- }
- }
- }
-
- void index(pg_log_entry_t& e) {
- if (objects.count(e.soid) == 0 ||
- objects[e.soid]->version < e.version)
- objects[e.soid] = &e;
- if (e.reqid_is_indexed()) {
- //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
- caller_ops[e.reqid] = &e;
- }
- }
- void unindex() {
- objects.clear();
- caller_ops.clear();
- }
- void unindex(pg_log_entry_t& e) {
- // NOTE: this only works if we remove from the _tail_ of the log!
- if (objects.count(e.soid) && objects[e.soid]->version == e.version)
- objects.erase(e.soid);
- if (e.reqid_is_indexed() &&
- caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
- caller_ops[e.reqid] == &e)
- caller_ops.erase(e.reqid);
- }
-
- // actors
- void add(pg_log_entry_t& e) {
- // add to log
- log.push_back(e);
- assert(e.version > head);
- assert(head.version == 0 || e.version.version > head.version);
- head = e.version;
-
- // to our index
- objects[e.soid] = &(log.back());
- caller_ops[e.reqid] = &(log.back());
- }
-
- void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s);
-
- ostream& print(ostream& out) const;
- };
-
-
- /**
- * OndiskLog - some info about how we store the log on disk.
- */
- class OndiskLog {
- public:
- // ok
- uint64_t tail; // first byte of log.
- uint64_t head; // byte following end of log.
- uint64_t zero_to; // first non-zeroed byte of log.
- bool has_checksums;
-
- /**
- * We reconstruct the missing set by comparing the recorded log against
- * the objects in the pg collection. Unfortunately, it's possible to
- * have an object in the missing set which is not in the log due to
- * a divergent operation with a prior_version pointing before the
- * pg log tail. To deal with this, we store alongside the log a mapping
- * of divergent priors to be checked along with the log during read_state.
- */
- map<eversion_t, hobject_t> divergent_priors;
- void add_divergent_prior(eversion_t version, hobject_t obj) {
- divergent_priors.insert(make_pair(version, obj));
- }
-
- OndiskLog() : tail(0), head(0), zero_to(0),
- has_checksums(true) {}
-
- uint64_t length() { return head - tail; }
- bool trim_to(eversion_t v, ObjectStore::Transaction& t);
-
- void zero() {
- tail = 0;
- head = 0;
- zero_to = 0;
- }
-
- void encode(bufferlist& bl) const {
- ENCODE_START(5, 3, bl);
- ::encode(tail, bl);
- ::encode(head, bl);
- ::encode(zero_to, bl);
- ::encode(divergent_priors, bl);
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::iterator& bl) {
- DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
- has_checksums = (struct_v >= 2);
- ::decode(tail, bl);
- ::decode(head, bl);
- if (struct_v >= 4)
- ::decode(zero_to, bl);
- else
- zero_to = 0;
- if (struct_v >= 5)
- ::decode(divergent_priors, bl);
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const {
- f->dump_unsigned("head", head);
- f->dump_unsigned("tail", tail);
- f->dump_unsigned("zero_to", zero_to);
- f->open_array_section("divergent_priors");
- for (map<eversion_t, hobject_t>::const_iterator p = divergent_priors.begin();
- p != divergent_priors.end();
- ++p) {
- f->open_object_section("prior");
- f->dump_stream("version") << p->first;
- f->dump_stream("object") << p->second;
- f->close_section();
- }
- f->close_section();
- }
- static void generate_test_instances(list<OndiskLog*>& o) {
- o.push_back(new OndiskLog);
- o.push_back(new OndiskLog);
- o.back()->tail = 2;
- o.back()->head = 3;
- o.back()->zero_to = 1;
- }
- };
- WRITE_CLASS_ENCODER(OndiskLog)
-
-
-
/*** PG ****/
protected:
OSDService *osd;
@@ -466,7 +263,7 @@ public:
const interval_set<snapid_t> &snapcolls);
const coll_t coll;
- IndexedLog log;
+ PGLog pg_log;
static string get_info_key(pg_t pgid) {
return stringify(pgid) + "_info";
}
@@ -478,8 +275,6 @@ public:
}
hobject_t log_oid;
hobject_t biginfo_oid;
- OndiskLog ondisklog;
- pg_missing_t missing;
map<hobject_t, set<int> > missing_loc;
set<int> missing_loc_sources; // superset of missing_loc locations
@@ -784,16 +579,6 @@ public:
bool proc_replica_info(int from, const pg_info_t &info);
void remove_snap_mapped_object(
ObjectStore::Transaction& t, const hobject_t& soid);
- bool merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe);
-
- /**
- * Merges authoratative log/info into current log/info/store
- *
- * @param [in,out] t used to delete obsolete objects
- * @param [in,out] oinfo recieved authoritative info
- * @param [in,out] olog recieved authoritative log
- * @param [in] from peer which sent the information
- */
void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
bool search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing,
@@ -822,10 +607,10 @@ public:
void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
bool have_unfound() const {
- return missing.num_missing() > missing_loc.size();
+ return pg_log.get_missing().num_missing() > missing_loc.size();
}
int get_num_unfound() const {
- return missing.num_missing() - missing_loc.size();
+ return pg_log.get_missing().num_missing() - missing_loc.size();
}
virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
@@ -1878,35 +1663,18 @@ private:
void write_log(ObjectStore::Transaction& t);
public:
- static void clear_info_log(
- pg_t pgid,
- const hobject_t &infos_oid,
- const hobject_t &log_oid,
- ObjectStore::Transaction *t);
-
static int _write_info(ObjectStore::Transaction& t, epoch_t epoch,
pg_info_t &info, coll_t coll,
map<epoch_t,pg_interval_t> &past_intervals,
interval_set<snapid_t> &snap_collections,
hobject_t &infos_oid,
__u8 info_struct_v, bool dirty_big_info, bool force_ver = false);
- static void _write_log(ObjectStore::Transaction& t, pg_log_t &log,
- const hobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors);
void write_if_dirty(ObjectStore::Transaction& t);
void add_log_entry(pg_log_entry_t& e, bufferlist& log_bl);
void append_log(
vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t);
-
- /// return true if the log should be rewritten
- static bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
- const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
- pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL);
- static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
- const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
- pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL);
bool check_log_for_corruption(ObjectStore *store);
- void trim(ObjectStore::Transaction& t, eversion_t v);
void trim_peers();
std::string get_corrupt_pg_log_name() const;
@@ -2027,8 +1795,6 @@ public:
virtual void on_shutdown() = 0;
};
-WRITE_CLASS_ENCODER(PG::OndiskLog)
-
ostream& operator<<(ostream& out, const PG& pg);
#endif
diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc
new file mode 100644
index 00000000000..638a78697db
--- /dev/null
+++ b/src/osd/PGLog.cc
@@ -0,0 +1,789 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PGLog.h"
+#include "PG.h"
+#include "SnapMapper.h"
+
+#define dout_subsys ceph_subsys_osd
+
+//////////////////// PGLog ////////////////////
+
+void PGLog::IndexedLog::split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ PGLog::IndexedLog *olog)
+{
+ list<pg_log_entry_t> oldlog;
+ oldlog.swap(log);
+
+ eversion_t old_tail;
+ olog->head = head;
+ olog->tail = tail;
+ unsigned mask = ~((~0)<<split_bits);
+ for (list<pg_log_entry_t>::iterator i = oldlog.begin();
+ i != oldlog.end();
+ ) {
+ if ((i->soid.hash & mask) == child_pgid.m_seed) {
+ olog->log.push_back(*i);
+ if (log.empty())
+ tail = i->version;
+ } else {
+ log.push_back(*i);
+ if (olog->empty())
+ olog->tail = i->version;
+ }
+ oldlog.erase(i++);
+ }
+
+ if (log.empty())
+ tail = head;
+ else
+ head = log.rbegin()->version;
+
+ if (olog->empty())
+ olog->tail = olog->head;
+ else
+ olog->head = olog->log.rbegin()->version;
+
+ olog->index();
+ index();
+}
+
+void PGLog::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s)
+{
+ if (complete_to != log.end() &&
+ complete_to->version <= s) {
+ generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version
+ << " on " << *this << dendl;
+ }
+
+ set<string> keys_to_rm;
+ while (!log.empty()) {
+ pg_log_entry_t &e = *log.begin();
+ if (e.version > s)
+ break;
+ generic_dout(20) << "trim " << e << dendl;
+ unindex(e); // remove from index,
+ keys_to_rm.insert(e.get_key_name());
+ log.pop_front(); // from log
+ }
+ t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm);
+
+ // raise tail?
+ if (tail < s)
+ tail = s;
+}
+
+ostream& PGLog::IndexedLog::print(ostream& out) const
+{
+ out << *this << std::endl;
+ for (list<pg_log_entry_t>::const_iterator p = log.begin();
+ p != log.end();
+ ++p) {
+ out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl;
+ assert(!p->reqid_is_indexed() || logged_req(p->reqid));
+ }
+ return out;
+}
+
+//////////////////// PGLog ////////////////////
+
+void PGLog::clear_info_log(
+ pg_t pgid,
+ const hobject_t &infos_oid,
+ const hobject_t &log_oid,
+ ObjectStore::Transaction *t) {
+
+ set<string> keys_to_remove;
+ keys_to_remove.insert(PG::get_epoch_key(pgid));
+ keys_to_remove.insert(PG::get_biginfo_key(pgid));
+ keys_to_remove.insert(PG::get_info_key(pgid));
+
+ t->remove(coll_t::META_COLL, log_oid);
+ t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove);
+}
+
+void PGLog::trim(ObjectStore::Transaction& t, eversion_t trim_to, pg_info_t &info, hobject_t &log_oid)
+{
+ // trim?
+ if (trim_to > log.tail) {
+ /* If we are trimming, we must be complete up to trim_to, time
+ * to throw out any divergent_priors
+ */
+ ondisklog.divergent_priors.clear();
+ // We shouldn't be trimming the log past last_complete
+ assert(trim_to <= info.last_complete);
+
+ dout(10) << "trim " << log << " to " << trim_to << dendl;
+ log.trim(t, log_oid, trim_to);
+ info.log_tail = log.tail;
+ }
+}
+
+void PGLog::proc_replica_log(ObjectStore::Transaction& t,
+ pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing, int from)
+{
+ dout(10) << "proc_replica_log for osd." << from << ": "
+ << oinfo << " " << olog << " " << omissing << dendl;
+
+ /*
+ basically what we're doing here is rewinding the remote log,
+ dropping divergent entries, until we find something that matches
+ our master log. we then reset last_update to reflect the new
+ point up to which missing is accurate.
+
+ later, in activate(), missing will get wound forward again and
+ we will send the peer enough log to arrive at the same state.
+ */
+
+ for (map<hobject_t, pg_missing_t::item>::iterator i = omissing.missing.begin();
+ i != omissing.missing.end();
+ ++i) {
+ dout(20) << " before missing " << i->first << " need " << i->second.need
+ << " have " << i->second.have << dendl;
+ }
+
+ list<pg_log_entry_t>::const_reverse_iterator pp = olog.log.rbegin();
+ eversion_t lu(oinfo.last_update);
+ while (true) {
+ if (pp == olog.log.rend()) {
+ if (pp != olog.log.rbegin()) // no last_update adjustment if we discard nothing!
+ lu = olog.tail;
+ break;
+ }
+ const pg_log_entry_t& oe = *pp;
+
+ // don't continue past the tail of our log.
+ if (oe.version <= log.tail)
+ break;
+
+ if (!log.objects.count(oe.soid)) {
+ dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
+ ++pp;
+ continue;
+ }
+
+ pg_log_entry_t& ne = *log.objects[oe.soid];
+ if (ne.version == oe.version) {
+ dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
+ lu = pp->version;
+ break;
+ }
+
+ if (oe.soid > oinfo.last_backfill) {
+ // past backfill line, don't care
+ dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl;
+ ++pp;
+ continue;
+ }
+
+ if (ne.version > oe.version) {
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ } else {
+ if (oe.is_delete()) {
+ if (ne.is_delete()) {
+ // old and new are delete
+ dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl;
+ } else {
+ // old delete, new update.
+ dout(10) << " had " << oe << " new " << ne << " : missing" << dendl;
+ omissing.add(ne.soid, ne.version, eversion_t());
+ }
+ } else {
+ if (ne.is_delete()) {
+ // old update, new delete
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ omissing.rm(oe.soid, oe.version);
+ } else {
+ // old update, new update
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ omissing.revise_need(ne.soid, ne.version);
+ }
+ }
+ }
+
+ ++pp;
+ }
+
+ if (lu < oinfo.last_update) {
+ dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
+ oinfo.last_update = lu;
+ }
+
+ if (omissing.have_missing()) {
+ eversion_t first_missing =
+ omissing.missing[omissing.rmissing.begin()->second].need;
+ oinfo.last_complete = eversion_t();
+ list<pg_log_entry_t>::const_iterator i = olog.log.begin();
+ for (;
+ i != olog.log.end();
+ ++i) {
+ if (i->version < first_missing)
+ oinfo.last_complete = i->version;
+ else
+ break;
+ }
+ } else {
+ oinfo.last_complete = oinfo.last_update;
+ }
+}
+
+/*
+ * merge an old (possibly divergent) log entry into the new log. this
+ * happens _after_ new log items have been assimilated. thus, we assume
+ * the index already references newer entries (if present), and missing
+ * has been updated accordingly.
+ *
+ * return true if entry is not divergent.
+ */
+bool PGLog::merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe, pg_info_t& info, list<hobject_t>& remove_snap, bool &dirty_log)
+{
+ if (oe.soid > info.last_backfill) {
+ dout(20) << "merge_old_entry had " << oe << " : beyond last_backfill" << dendl;
+ return false;
+ }
+ if (log.objects.count(oe.soid)) {
+ pg_log_entry_t &ne = *log.objects[oe.soid]; // new(er?) entry
+
+ if (ne.version > oe.version) {
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl;
+ assert(ne.is_delete() || missing.is_missing(ne.soid));
+ return false;
+ }
+ if (ne.version == oe.version) {
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : same" << dendl;
+ return true;
+ }
+ if (oe.is_delete()) {
+ if (ne.is_delete()) {
+ // old and new are delete
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : both deletes" << dendl;
+ } else {
+ // old delete, new update.
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl;
+ missing.revise_need(ne.soid, ne.version);
+ }
+ } else {
+ if (ne.is_delete()) {
+ // old update, new delete
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl;
+ missing.rm(oe.soid, oe.version);
+ } else {
+ // old update, new update
+ dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl;
+ missing.revise_need(ne.soid, ne.version);
+ }
+ }
+ } else if (oe.op == pg_log_entry_t::CLONE) {
+ assert(oe.soid.snap != CEPH_NOSNAP);
+ dout(20) << "merge_old_entry had " << oe
+ << ", clone with no non-divergent log entries, "
+ << "deleting" << dendl;
+ remove_snap.push_back(oe.soid);
+ if (missing.is_missing(oe.soid))
+ missing.rm(oe.soid, missing.missing[oe.soid].need);
+ } else if (oe.prior_version > info.log_tail) {
+ /**
+ * oe.prior_version is a previously divergent log entry
+ * oe.soid must have already been handled and the missing
+ * set updated appropriately
+ */
+ dout(20) << "merge_old_entry had oe " << oe
+ << " with divergent prior_version " << oe.prior_version
+ << " oe.soid " << oe.soid
+ << " must already have been merged" << dendl;
+ } else {
+ if (!oe.is_delete()) {
+ dout(20) << "merge_old_entry had " << oe << " deleting" << dendl;
+ remove_snap.push_back(oe.soid);
+ }
+ dout(20) << "merge_old_entry had " << oe << " updating missing to "
+ << oe.prior_version << dendl;
+ if (oe.prior_version > eversion_t()) {
+ ondisklog.add_divergent_prior(oe.prior_version, oe.soid);
+ dirty_log = true;
+ missing.revise_need(oe.soid, oe.prior_version);
+ } else if (missing.is_missing(oe.soid)) {
+ missing.rm(oe.soid, missing.missing[oe.soid].need);
+ }
+ }
+ return false;
+}
+
+/**
+ * rewind divergent entries at the head of the log
+ *
+ * This rewinds entries off the head of our log that are divergent.
+ * This is used by replicas during activation.
+ *
+ * @param t transaction
+ * @param newhead new head to rewind to
+ */
+void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead,
+ pg_info_t &info, list<hobject_t>& remove_snap,
+ bool &dirty_log, bool &dirty_info, bool &dirty_big_info)
+{
+ dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl;
+ assert(newhead > log.tail);
+
+ list<pg_log_entry_t>::iterator p = log.log.end();
+ list<pg_log_entry_t> divergent;
+ while (true) {
+ if (p == log.log.begin()) {
+ // yikes, the whole thing is divergent!
+ divergent.swap(log.log);
+ break;
+ }
+ --p;
+ if (p->version == newhead) {
+ ++p;
+ divergent.splice(divergent.begin(), log.log, p, log.log.end());
+ break;
+ }
+ assert(p->version > newhead);
+ dout(10) << "rewind_divergent_log future divergent " << *p << dendl;
+ log.unindex(*p);
+ }
+
+ log.head = newhead;
+ info.last_update = newhead;
+ if (info.last_complete > newhead)
+ info.last_complete = newhead;
+
+ for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
+ merge_old_entry(t, *d, info, remove_snap, dirty_log);
+
+ dirty_info = true;
+ dirty_big_info = true;
+ dirty_log = true;
+}
+
+void PGLog::merge_log(ObjectStore::Transaction& t,
+ pg_info_t &oinfo, pg_log_t &olog, int fromosd,
+ pg_info_t &info, list<hobject_t>& remove_snap,
+ bool &dirty_log, bool &dirty_info, bool &dirty_big_info)
+{
+ dout(10) << "merge_log " << olog << " from osd." << fromosd
+ << " into " << log << dendl;
+
+ // Check preconditions
+
+ // If our log is empty, the incoming log needs to have not been trimmed.
+ assert(!log.null() || olog.tail == eversion_t());
+ // The logs must overlap.
+ assert(log.head >= olog.tail && olog.head >= log.tail);
+
+ for (map<hobject_t, pg_missing_t::item>::iterator i = missing.missing.begin();
+ i != missing.missing.end();
+ ++i) {
+ dout(20) << "pg_missing_t sobject: " << i->first << dendl;
+ }
+
+ bool changed = false;
+
+ // extend on tail?
+ // this is just filling in history. it does not affect our
+ // missing set, as that should already be consistent with our
+ // current log.
+ if (olog.tail < log.tail) {
+ dout(10) << "merge_log extending tail to " << olog.tail << dendl;
+ list<pg_log_entry_t>::iterator from = olog.log.begin();
+ list<pg_log_entry_t>::iterator to;
+ for (to = from;
+ to != olog.log.end();
+ ++to) {
+ if (to->version > log.tail)
+ break;
+ log.index(*to);
+ dout(15) << *to << dendl;
+ }
+ assert(to != olog.log.end() ||
+ (olog.head == info.last_update));
+
+ // splice into our log.
+ log.log.splice(log.log.begin(),
+ olog.log, from, to);
+
+ info.log_tail = log.tail = olog.tail;
+ changed = true;
+ }
+
+ if (oinfo.stats.reported < info.stats.reported) // make sure reported always increases
+ oinfo.stats.reported = info.stats.reported;
+ if (info.last_backfill.is_max())
+ info.stats = oinfo.stats;
+
+ // do we have divergent entries to throw out?
+ if (olog.head < log.head) {
+ rewind_divergent_log(t, olog.head, info, remove_snap, dirty_log, dirty_info, dirty_big_info);
+ changed = true;
+ }
+
+ // extend on head?
+ if (olog.head > log.head) {
+ dout(10) << "merge_log extending head to " << olog.head << dendl;
+
+ // find start point in olog
+ list<pg_log_entry_t>::iterator to = olog.log.end();
+ list<pg_log_entry_t>::iterator from = olog.log.end();
+ eversion_t lower_bound = olog.tail;
+ while (1) {
+ if (from == olog.log.begin())
+ break;
+ --from;
+ dout(20) << " ? " << *from << dendl;
+ if (from->version <= log.head) {
+ dout(20) << "merge_log cut point (usually last shared) is " << *from << dendl;
+ lower_bound = from->version;
+ ++from;
+ break;
+ }
+ }
+
+ // index, update missing, delete deleted
+ for (list<pg_log_entry_t>::iterator p = from; p != to; ++p) {
+ pg_log_entry_t &ne = *p;
+ dout(20) << "merge_log " << ne << dendl;
+ log.index(ne);
+ if (ne.soid <= info.last_backfill) {
+ missing.add_next_event(ne);
+ if (ne.is_delete())
+ remove_snap.push_back(ne.soid);
+ }
+ }
+
+ // move aside divergent items
+ list<pg_log_entry_t> divergent;
+ while (!log.empty()) {
+ pg_log_entry_t &oe = *log.log.rbegin();
+ /*
+ * look at eversion.version here. we want to avoid a situation like:
+ * our log: 100'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529
+ * new log: 122'10 (0'0) m 10000004d3a.00000000/head by client4225.1:18529
+ * lower_bound = 100'9
+ * i.e, same request, different version. If the eversion.version is > the
+ * lower_bound, we it is divergent.
+ */
+ if (oe.version.version <= lower_bound.version)
+ break;
+ dout(10) << "merge_log divergent " << oe << dendl;
+ divergent.push_front(oe);
+ log.unindex(oe);
+ log.log.pop_back();
+ }
+
+ // splice
+ log.log.splice(log.log.end(),
+ olog.log, from, to);
+ log.index();
+
+ info.last_update = log.head = olog.head;
+ info.purged_snaps = oinfo.purged_snaps;
+
+ // process divergent items
+ if (!divergent.empty()) {
+ for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
+ merge_old_entry(t, *d, info, remove_snap, dirty_log);
+ }
+
+ changed = true;
+ }
+
+ dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl;
+
+ if (changed) {
+ dirty_info = true;
+ dirty_big_info = true;
+ dirty_log = true;
+ }
+}
+
+void PGLog::write_log(ObjectStore::Transaction& t, pg_log_t &log,
+ const hobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors)
+{
+ //dout(10) << "write_log" << dendl;
+ t.remove(coll_t::META_COLL, log_oid);
+ t.touch(coll_t::META_COLL, log_oid);
+ map<string,bufferlist> keys;
+ for (list<pg_log_entry_t>::iterator p = log.log.begin();
+ p != log.log.end();
+ ++p) {
+ bufferlist bl(sizeof(*p) * 2);
+ p->encode_with_checksum(bl);
+ keys[p->get_key_name()].claim(bl);
+ }
+ //dout(10) << "write_log " << keys.size() << " keys" << dendl;
+
+ ::encode(divergent_priors, keys["divergent_priors"]);
+
+ t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
+}
+
+bool PGLog::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+ pg_missing_t &missing, ostringstream &oss)
+{
+ dout(10) << "read_log" << dendl;
+ bool rewrite_log = false;
+
+ // legacy?
+ struct stat st;
+ int r = store->stat(coll_t::META_COLL, log_oid, &st);
+ assert(r == 0);
+ if (st.st_size > 0) {
+ read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss);
+ rewrite_log = true;
+ } else {
+ log.tail = info.log_tail;
+ ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid);
+ if (p) for (p->seek_to_first(); p->valid() ; p->next()) {
+ bufferlist bl = p->value();//Copy bufferlist before creating iterator
+ bufferlist::iterator bp = bl.begin();
+ if (p->key() == "divergent_priors") {
+ ::decode(ondisklog.divergent_priors, bp);
+ dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl;
+ } else {
+ pg_log_entry_t e;
+ e.decode_with_checksum(bp);
+ dout(20) << "read_log " << e << dendl;
+ if (!log.log.empty()) {
+ pg_log_entry_t last_e(log.log.back());
+ assert(last_e.version.version < e.version.version);
+ assert(last_e.version.epoch <= e.version.epoch);
+ }
+ log.log.push_back(e);
+ log.head = e.version;
+ }
+ }
+ }
+ log.head = info.last_update;
+ log.index();
+
+ // build missing
+ if (info.last_complete < info.last_update) {
+ dout(10) << "read_log checking for missing items over interval (" << info.last_complete
+ << "," << info.last_update << "]" << dendl;
+
+ set<hobject_t> did;
+ for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
+ i != log.log.rend();
+ ++i) {
+ if (i->version <= info.last_complete) break;
+ if (did.count(i->soid)) continue;
+ did.insert(i->soid);
+
+ if (i->is_delete()) continue;
+
+ bufferlist bv;
+ int r = store->getattr(coll, i->soid, OI_ATTR, bv);
+ if (r >= 0) {
+ object_info_t oi(bv);
+ if (oi.version < i->version) {
+ dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl;
+ missing.add(i->soid, i->version, oi.version);
+ }
+ } else {
+ dout(15) << "read_log missing " << *i << dendl;
+ missing.add(i->soid, i->version, eversion_t());
+ }
+ }
+ for (map<eversion_t, hobject_t>::reverse_iterator i =
+ ondisklog.divergent_priors.rbegin();
+ i != ondisklog.divergent_priors.rend();
+ ++i) {
+ if (i->first <= info.last_complete) break;
+ if (did.count(i->second)) continue;
+ did.insert(i->second);
+ bufferlist bv;
+ int r = store->getattr(coll, i->second, OI_ATTR, bv);
+ if (r >= 0) {
+ object_info_t oi(bv);
+ /**
+ * 1) we see this entry in the divergent priors mapping
+ * 2) we didn't see an entry for this object in the log
+ *
+ * From 1 & 2 we know that either the object does not exist
+ * or it is at the version specified in the divergent_priors
+ * map since the object would have been deleted atomically
+ * with the addition of the divergent_priors entry, an older
+ * version would not have been recovered, and a newer version
+ * would show up in the log above.
+ */
+ assert(oi.version == i->first);
+ } else {
+ dout(15) << "read_log missing " << *i << dendl;
+ missing.add(i->second, i->first, eversion_t());
+ }
+ }
+ }
+ dout(10) << "read_log done" << dendl;
+ return rewrite_log;
+}
+
+void PGLog::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+ pg_missing_t &missing, ostringstream &oss)
+{
+ // load bounds, based on old OndiskLog encoding.
+ uint64_t ondisklog_tail = 0;
+ uint64_t ondisklog_head = 0;
+ uint64_t ondisklog_zero_to;
+ bool ondisklog_has_checksums;
+
+ bufferlist blb;
+ store->collection_getattr(coll, "ondisklog", blb);
+ {
+ bufferlist::iterator bl = blb.begin();
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ ondisklog_has_checksums = (struct_v >= 2);
+ ::decode(ondisklog_tail, bl);
+ ::decode(ondisklog_head, bl);
+ if (struct_v >= 4)
+ ::decode(ondisklog_zero_to, bl);
+ else
+ ondisklog_zero_to = 0;
+ if (struct_v >= 5)
+ ::decode(ondisklog.divergent_priors, bl);
+ DECODE_FINISH(bl);
+ }
+ uint64_t ondisklog_length = ondisklog_head - ondisklog_tail;
+ dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl;
+
+ log.tail = info.log_tail;
+
+ // In case of sobject_t based encoding, may need to list objects in the store
+ // to find hashes
+ vector<hobject_t> ls;
+
+ if (ondisklog_head > 0) {
+ // read
+ bufferlist bl;
+ store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl);
+ if (bl.length() < ondisklog_length) {
+ std::ostringstream oss;
+ oss << "read_log got " << bl.length() << " bytes, expected "
+ << ondisklog_head << "-" << ondisklog_tail << "="
+ << ondisklog_length;
+ throw read_log_error(oss.str().c_str());
+ }
+
+ pg_log_entry_t e;
+ bufferlist::iterator p = bl.begin();
+ assert(log.empty());
+ eversion_t last;
+ bool reorder = false;
+ bool listed_collection = false;
+
+ while (!p.end()) {
+ uint64_t pos = ondisklog_tail + p.get_off();
+ if (ondisklog_has_checksums) {
+ bufferlist ebl;
+ ::decode(ebl, p);
+ __u32 crc;
+ ::decode(crc, p);
+
+ __u32 got = ebl.crc32c(0);
+ if (crc == got) {
+ bufferlist::iterator q = ebl.begin();
+ ::decode(e, q);
+ } else {
+ std::ostringstream oss;
+ oss << "read_log " << pos << " bad crc got " << got << " expected" << crc;
+ throw read_log_error(oss.str().c_str());
+ }
+ } else {
+ ::decode(e, p);
+ }
+ dout(20) << "read_log " << pos << " " << e << dendl;
+
+ // [repair] in order?
+ if (e.version < last) {
+ dout(0) << "read_log " << pos << " out of order entry " << e << " follows " << last << dendl;
+ oss << info.pgid << " log has out of order entry "
+ << e << " following " << last << "\n";
+ reorder = true;
+ }
+
+ if (e.version <= log.tail) {
+ dout(20) << "read_log ignoring entry at " << pos << " below log.tail" << dendl;
+ continue;
+ }
+ if (last.version == e.version.version) {
+ dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl;
+ log.log.pop_back();
+ oss << info.pgid << " read_log got dup "
+ << e.version << " after " << last << "\n";
+ }
+
+ if (e.invalid_hash) {
+ // We need to find the object in the store to get the hash
+ if (!listed_collection) {
+ store->collection_list(coll, ls);
+ listed_collection = true;
+ }
+ bool found = false;
+ for (vector<hobject_t>::iterator i = ls.begin();
+ i != ls.end();
+ ++i) {
+ if (i->oid == e.soid.oid && i->snap == e.soid.snap) {
+ e.soid = *i;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ // Didn't find the correct hash
+ std::ostringstream oss;
+ oss << "Could not find hash for hoid " << e.soid << std::endl;
+ throw read_log_error(oss.str().c_str());
+ }
+ }
+
+ if (e.invalid_pool) {
+ e.soid.pool = info.pgid.pool();
+ }
+
+ e.offset = pos;
+ uint64_t endpos = ondisklog_tail + p.get_off();
+ log.log.push_back(e);
+ last = e.version;
+
+ // [repair] at end of log?
+ if (!p.end() && e.version == info.last_update) {
+ oss << info.pgid << " log has extra data at "
+ << endpos << "~" << (ondisklog_head-endpos) << " after "
+ << info.last_update << "\n";
+
+ dout(0) << "read_log " << endpos << " *** extra gunk at end of log, "
+ << "adjusting ondisklog_head" << dendl;
+ ondisklog_head = endpos;
+ break;
+ }
+ }
+
+ if (reorder) {
+ dout(0) << "read_log reordering log" << dendl;
+ map<eversion_t, pg_log_entry_t> m;
+ for (list<pg_log_entry_t>::iterator p = log.log.begin(); p != log.log.end(); ++p)
+ m[p->version] = *p;
+ log.log.clear();
+ for (map<eversion_t, pg_log_entry_t>::iterator p = m.begin(); p != m.end(); ++p)
+ log.log.push_back(p->second);
+ }
+ }
+}
diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h
new file mode 100644
index 00000000000..cfb17f16ce2
--- /dev/null
+++ b/src/osd/PGLog.h
@@ -0,0 +1,391 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_PG_LOG_H
+#define CEPH_PG_LOG_H
+
+// re-include our assert to clobber boost's
+#include "include/assert.h"
+#include "osd_types.h"
+#include "os/ObjectStore.h"
+#include <list>
+using namespace std;
+
+struct PGLog {
+ ////////////////////////////// sub classes //////////////////////////////
+
+ /* Exceptions */
+ class read_log_error : public buffer::error {
+ public:
+ explicit read_log_error(const char *what) {
+ snprintf(buf, sizeof(buf), "read_log_error: %s", what);
+ }
+ const char *what() const throw () {
+ return buf;
+ }
+ private:
+ char buf[512];
+ };
+
+ /**
+ * IndexLog - adds in-memory index of the log, by oid.
+ * plus some methods to manipulate it all.
+ */
+ struct IndexedLog : public pg_log_t {
+ hash_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
+ hash_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
+
+ // recovery pointers
+ list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
+ version_t last_requested; // last object requested by primary
+
+ /****/
+ IndexedLog() : last_requested(0) {}
+
+ void claim_log(const pg_log_t& o) {
+ log = o.log;
+ head = o.head;
+ tail = o.tail;
+ index();
+ }
+
+ void split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ IndexedLog *olog);
+
+ void zero() {
+ unindex();
+ pg_log_t::clear();
+ reset_recovery_pointers();
+ }
+ void reset_recovery_pointers() {
+ complete_to = log.end();
+ last_requested = 0;
+ }
+
+ bool logged_object(const hobject_t& oid) const {
+ return objects.count(oid);
+ }
+ bool logged_req(const osd_reqid_t &r) const {
+ return caller_ops.count(r);
+ }
+ eversion_t get_request_version(const osd_reqid_t &r) const {
+ hash_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
+ if (p == caller_ops.end())
+ return eversion_t();
+ return p->second->version;
+ }
+
+ void index() {
+ objects.clear();
+ caller_ops.clear();
+ for (list<pg_log_entry_t>::iterator i = log.begin();
+ i != log.end();
+ ++i) {
+ objects[i->soid] = &(*i);
+ if (i->reqid_is_indexed()) {
+ //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
+ caller_ops[i->reqid] = &(*i);
+ }
+ }
+ }
+
+ void index(pg_log_entry_t& e) {
+ if (objects.count(e.soid) == 0 ||
+ objects[e.soid]->version < e.version)
+ objects[e.soid] = &e;
+ if (e.reqid_is_indexed()) {
+ //assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
+ caller_ops[e.reqid] = &e;
+ }
+ }
+ void unindex() {
+ objects.clear();
+ caller_ops.clear();
+ }
+ void unindex(pg_log_entry_t& e) {
+ // NOTE: this only works if we remove from the _tail_ of the log!
+ if (objects.count(e.soid) && objects[e.soid]->version == e.version)
+ objects.erase(e.soid);
+ if (e.reqid_is_indexed() &&
+ caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
+ caller_ops[e.reqid] == &e)
+ caller_ops.erase(e.reqid);
+ }
+
+ // actors
+ void add(pg_log_entry_t& e) {
+ // add to log
+ log.push_back(e);
+ assert(e.version > head);
+ assert(head.version == 0 || e.version.version > head.version);
+ head = e.version;
+
+ // to our index
+ objects[e.soid] = &(log.back());
+ caller_ops[e.reqid] = &(log.back());
+ }
+
+ void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s);
+
+ ostream& print(ostream& out) const;
+ };
+
+ /**
+ * OndiskLog - some info about how we store the log on disk.
+ */
+ class OndiskLog {
+ public:
+ // ok
+ uint64_t tail; // first byte of log.
+ uint64_t head; // byte following end of log.
+ uint64_t zero_to; // first non-zeroed byte of log.
+ bool has_checksums;
+
+ /**
+ * We reconstruct the missing set by comparing the recorded log against
+ * the objects in the pg collection. Unfortunately, it's possible to
+ * have an object in the missing set which is not in the log due to
+ * a divergent operation with a prior_version pointing before the
+ * pg log tail. To deal with this, we store alongside the log a mapping
+ * of divergent priors to be checked along with the log during read_state.
+ */
+ map<eversion_t, hobject_t> divergent_priors;
+ void add_divergent_prior(eversion_t version, hobject_t obj) {
+ divergent_priors.insert(make_pair(version, obj));
+ }
+
+ OndiskLog() : tail(0), head(0), zero_to(0),
+ has_checksums(true) {}
+
+ uint64_t length() const { return head - tail; }
+ bool trim_to(eversion_t v, ObjectStore::Transaction& t);
+
+ void zero() {
+ tail = 0;
+ head = 0;
+ zero_to = 0;
+ }
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(5, 3, bl);
+ ::encode(tail, bl);
+ ::encode(head, bl);
+ ::encode(zero_to, bl);
+ ::encode(divergent_priors, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ has_checksums = (struct_v >= 2);
+ ::decode(tail, bl);
+ ::decode(head, bl);
+ if (struct_v >= 4)
+ ::decode(zero_to, bl);
+ else
+ zero_to = 0;
+ if (struct_v >= 5)
+ ::decode(divergent_priors, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const {
+ f->dump_unsigned("head", head);
+ f->dump_unsigned("tail", tail);
+ f->dump_unsigned("zero_to", zero_to);
+ f->open_array_section("divergent_priors");
+ for (map<eversion_t, hobject_t>::const_iterator p = divergent_priors.begin();
+ p != divergent_priors.end();
+ ++p) {
+ f->open_object_section("prior");
+ f->dump_stream("version") << p->first;
+ f->dump_stream("object") << p->second;
+ f->close_section();
+ }
+ f->close_section();
+ }
+ static void generate_test_instances(list<OndiskLog*>& o) {
+ o.push_back(new OndiskLog);
+ o.push_back(new OndiskLog);
+ o.back()->tail = 2;
+ o.back()->head = 3;
+ o.back()->zero_to = 1;
+ }
+ };
+ WRITE_CLASS_ENCODER(OndiskLog)
+
+protected:
+ //////////////////// data members ////////////////////
+
+ OndiskLog ondisklog;
+ pg_missing_t missing;
+ IndexedLog log;
+
+public:
+ //////////////////// get or set missing ////////////////////
+
+ const pg_missing_t& get_missing() const { return missing; }
+
+ void missing_got(map<hobject_t, pg_missing_t::item>::const_iterator m) {
+ map<hobject_t, pg_missing_t::item>::iterator p = missing.missing.find(m->first);
+ missing.got(p);
+ }
+
+ void revise_have(hobject_t oid, eversion_t have) {
+ missing.revise_have(oid, have);
+ }
+
+ void revise_need(hobject_t oid, eversion_t need) {
+ missing.revise_need(oid, need);
+ }
+
+ void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) {
+ missing.add(oid, need, have);
+ }
+
+ void missing_rm(map<hobject_t, pg_missing_t::item>::const_iterator m) {
+ map<hobject_t, pg_missing_t::item>::iterator p = missing.missing.find(m->first);
+ missing.rm(p);
+ }
+
+ //////////////////// get or set ondisklog ////////////////////
+
+ const OndiskLog &get_ondisklog() const { return ondisklog; }
+
+ //////////////////// get or set log ////////////////////
+
+ const IndexedLog &get_log() const { return log; }
+
+ const eversion_t &get_tail() const { return log.tail; }
+
+ void set_tail(eversion_t tail) { log.tail = tail; }
+
+ const eversion_t &get_head() const { return log.head; }
+
+ void set_head(eversion_t head) { log.head = head; }
+
+ void set_last_requested(version_t last_requested) {
+ log.last_requested = last_requested;
+ }
+
+ void index() { log.index(); }
+
+ void unindex() { log.unindex(); }
+
+ void add(pg_log_entry_t& e) { log.add(e); }
+
+ void reset_recovery_pointers() { log.reset_recovery_pointers(); }
+
+ static void clear_info_log(
+ pg_t pgid,
+ const hobject_t &infos_oid,
+ const hobject_t &log_oid,
+ ObjectStore::Transaction *t);
+
+ void trim(ObjectStore::Transaction& t, eversion_t trim_to, pg_info_t &info, hobject_t &log_oid);
+
+ //////////////////// get or set log & missing ////////////////////
+
+ void claim_log(const pg_log_t &o) {
+ log.claim_log(o);
+ missing.clear();
+ }
+
+ void split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ PGLog *opg_log) {
+ log.split_into(child_pgid, split_bits, &(opg_log->log));
+ missing.split_into(child_pgid, split_bits, &(opg_log->missing));
+ }
+
+ void recover_got(hobject_t oid, eversion_t v, pg_info_t &info) {
+ if (missing.is_missing(oid, v)) {
+ missing.got(oid, v);
+
+ // raise last_complete?
+ if (missing.missing.empty()) {
+ log.complete_to = log.log.end();
+ info.last_complete = info.last_update;
+ }
+ while (log.complete_to != log.log.end()) {
+ if (missing.missing[missing.rmissing.begin()->second].need <=
+ log.complete_to->version)
+ break;
+ if (info.last_complete < log.complete_to->version)
+ info.last_complete = log.complete_to->version;
+ log.complete_to++;
+ }
+ }
+ }
+
+ void activate_not_complete(pg_info_t &info) {
+ log.complete_to = log.log.begin();
+ while (log.complete_to->version <
+ missing.missing[missing.rmissing.begin()->second].need)
+ log.complete_to++;
+ assert(log.complete_to != log.log.end());
+ if (log.complete_to == log.log.begin()) {
+ info.last_complete = eversion_t();
+ } else {
+ log.complete_to--;
+ info.last_complete = log.complete_to->version;
+ log.complete_to++;
+ }
+ log.last_requested = 0;
+ }
+
+ void proc_replica_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
+ pg_missing_t& omissing, int from);
+
+protected:
+ bool merge_old_entry(ObjectStore::Transaction& t, pg_log_entry_t& oe,
+ pg_info_t& info, list<hobject_t>& remove_snap, bool &dirty_log);
+public:
+ void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead,
+ pg_info_t &info, list<hobject_t>& remove_snap,
+ bool &dirty_log, bool &dirty_info, bool &dirty_big_info);
+
+ void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from,
+ pg_info_t &info, list<hobject_t>& remove_snap,
+ bool &dirty_log, bool &dirty_info, bool &dirty_big_info);
+
+ void write_log(ObjectStore::Transaction& t, const hobject_t &log_oid) {
+ write_log(t, log, log_oid, ondisklog.divergent_priors);
+ }
+
+ static void write_log(ObjectStore::Transaction& t, pg_log_t &log,
+ const hobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors);
+
+ bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, ostringstream &oss) {
+ return read_log(store, coll, log_oid, info, ondisklog, log, missing, oss);
+ }
+
+ /// return true if the log should be rewritten
+ static bool read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+ pg_missing_t &missing, ostringstream &oss);
+
+protected:
+ static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+ pg_missing_t &missing, ostringstream &oss);
+};
+
+WRITE_CLASS_ENCODER(PGLog::OndiskLog)
+
+#endif // CEPH_PG_LOG_H
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index de60a6a9205..91241fa26cb 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -94,13 +94,15 @@ bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
bool ReplicatedPG::is_missing_object(const hobject_t& soid)
{
- return missing.missing.count(soid);
+ return pg_log.get_missing().missing.count(soid);
}
void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
{
assert(is_missing_object(soid));
+ const pg_missing_t &missing = pg_log.get_missing();
+
// we don't have it (yet).
map<hobject_t, pg_missing_t::item>::const_iterator g = missing.missing.find(soid);
assert(g != missing.missing.end());
@@ -128,7 +130,7 @@ void ReplicatedPG::wait_for_all_missing(OpRequestRef op)
bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
{
- if (missing.missing.count(soid))
+ if (pg_log.get_missing().missing.count(soid))
return true;
for (unsigned i = 1; i < acting.size(); i++) {
int peer = acting[i];
@@ -262,6 +264,8 @@ int ReplicatedPG::get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilt
int ReplicatedPG::do_command(vector<string>& cmd, ostream& ss,
bufferlist& idata, bufferlist& odata)
{
+ const pg_missing_t &missing = pg_log.get_missing();
+
if (cmd.size() && cmd[0] == "query") {
JSONFormatter jsf(true);
jsf.open_object_section("pg");
@@ -348,7 +352,7 @@ int ReplicatedPG::do_command(vector<string>& cmd, ostream& ss,
}
jf.dump_int("num_missing", missing.num_missing());
jf.dump_int("num_unfound", get_num_unfound());
- map<hobject_t,pg_missing_t::item>::iterator p = missing.missing.upper_bound(offset);
+ map<hobject_t,pg_missing_t::item>::const_iterator p = missing.missing.upper_bound(offset);
{
jf.open_array_section("objects");
int32_t num = 0;
@@ -391,7 +395,7 @@ int ReplicatedPG::do_command(vector<string>& cmd, ostream& ss,
bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
{
- if (missing.missing.empty())
+ if (pg_log.get_missing().missing.empty())
return false;
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
if (p->op.op == CEPH_OSD_OP_PGLS) {
@@ -474,9 +478,9 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
break;
}
- assert(snapid == CEPH_NOSNAP || missing.missing.empty());
- map<hobject_t, pg_missing_t::item>::iterator missing_iter =
- missing.missing.lower_bound(current);
+ assert(snapid == CEPH_NOSNAP || pg_log.get_missing().missing.empty());
+ map<hobject_t, pg_missing_t::item>::const_iterator missing_iter =
+ pg_log.get_missing().missing.lower_bound(current);
vector<hobject_t>::iterator ls_iter = sentries.begin();
while (1) {
if (ls_iter == sentries.end()) {
@@ -484,7 +488,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
}
hobject_t candidate;
- if (missing_iter == missing.missing.end() ||
+ if (missing_iter == pg_log.get_missing().missing.end() ||
*ls_iter < missing_iter->first) {
candidate = *(ls_iter++);
} else {
@@ -529,7 +533,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
candidate.get_key()));
}
if (next.is_max() &&
- missing_iter == missing.missing.end() &&
+ missing_iter == pg_log.get_missing().missing.end() &&
ls_iter == sentries.end()) {
result = 1;
}
@@ -577,9 +581,9 @@ void ReplicatedPG::calc_trim_to()
if (min_last_complete_ondisk != eversion_t() &&
min_last_complete_ondisk != pg_trim_to &&
- log.approx_size() > target) {
- size_t num_to_trim = log.approx_size() - target;
- list<pg_log_entry_t>::const_iterator it = log.log.begin();
+ pg_log.get_log().approx_size() > target) {
+ size_t num_to_trim = pg_log.get_log().approx_size() - target;
+ list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
eversion_t new_trim_to;
for (size_t i = 0; i < num_to_trim; ++i) {
new_trim_to = it->version;
@@ -592,7 +596,7 @@ void ReplicatedPG::calc_trim_to()
}
dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
pg_trim_to = new_trim_to;
- assert(pg_trim_to <= log.head);
+ assert(pg_trim_to <= pg_log.get_head());
assert(pg_trim_to <= min_last_complete_ondisk);
}
}
@@ -915,7 +919,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
- eversion_t oldv = log.get_request_version(ctx->reqid);
+ eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid);
if (oldv != eversion_t()) {
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
delete ctx;
@@ -944,12 +948,12 @@ void ReplicatedPG::do_op(OpRequestRef op)
op->mark_started();
// version
- ctx->at_version = log.head;
+ ctx->at_version = pg_log.get_head();
ctx->at_version.epoch = get_osdmap()->get_epoch();
ctx->at_version.version++;
assert(ctx->at_version > info.last_update);
- assert(ctx->at_version > log.head);
+ assert(ctx->at_version > pg_log.get_head());
ctx->mtime = m->get_mtime();
@@ -968,7 +972,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
utime_t now = ceph_clock_now(g_ceph_context);
// note some basic context for op replication that prepare_transaction may clobber
- eversion_t old_last_update = log.head;
+ eversion_t old_last_update = pg_log.get_head();
bool old_exists = obc->obs.exists;
uint64_t old_size = obc->obs.oi.size;
eversion_t old_version = obc->obs.oi.version;
@@ -1415,7 +1419,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
- ctx->at_version.version = log.head.version + 1;
+ ctx->at_version.version = pg_log.get_head().version + 1;
RepGather *repop = new_repop(ctx, obc, rep_tid);
@@ -4177,9 +4181,11 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
{
assert(is_active());
assert(!is_missing_object(obc->obs.oi.soid) ||
- (log.objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary()
- log.objects[obc->obs.oi.soid]->op == pg_log_entry_t::LOST_REVERT &&
- log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version));
+ (pg_log.get_log().objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary()
+ pg_log.get_log().objects.find(obc->obs.oi.soid)->second->op ==
+ pg_log_entry_t::LOST_REVERT &&
+ pg_log.get_log().objects.find(obc->obs.oi.soid)->second->reverting_to ==
+ obc->obs.oi.version));
dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl;
assert(obc->watchers.empty());
@@ -4241,7 +4247,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
- ctx->at_version.version = log.head.version + 1;
+ ctx->at_version.version = pg_log.get_head().version + 1;
entity_inst_t nobody;
@@ -4257,7 +4263,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
obc->obs.oi.version,
osd_reqid_t(), ctx->mtime));
- eversion_t old_last_update = log.head;
+ eversion_t old_last_update = pg_log.get_head();
bool old_exists = repop->obc->obs.exists;
uint64_t old_size = repop->obc->obs.oi.size;
eversion_t old_version = repop->obc->obs.oi.version;
@@ -4465,7 +4471,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
put_snapset_context(ssc); // we're done with ssc
ssc = 0;
- if (missing.is_missing(soid)) {
+ if (pg_log.get_missing().is_missing(soid)) {
dout(20) << "find_object_context " << soid << " missing, try again later" << dendl;
if (psnapid)
*psnapid = soid.snap;
@@ -4656,7 +4662,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
assert(is_replica());
// we better not be missing this.
- assert(!missing.is_missing(soid));
+ assert(!pg_log.get_missing().is_missing(soid));
int ackerosd = acting[0];
@@ -4742,7 +4748,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
} else {
// just trim the log
if (m->pg_trim_to != eversion_t()) {
- trim(rm->localt, m->pg_trim_to);
+ pg_log.trim(rm->localt, m->pg_trim_to, info, log_oid);
rm->tls.push_back(&rm->localt);
}
}
@@ -4915,7 +4921,7 @@ void ReplicatedPG::calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const
}
void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
- pg_missing_t& missing,
+ const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets)
@@ -5030,13 +5036,13 @@ int ReplicatedPG::pull(
<< " at version " << peer_missing[fromosd].missing[soid].have
<< " rather than at version " << v << dendl;
v = peer_missing[fromosd].missing[soid].have;
- assert(log.objects.count(soid) &&
- log.objects[soid]->op == pg_log_entry_t::LOST_REVERT &&
- log.objects[soid]->reverting_to == v);
+ assert(pg_log.get_log().objects.count(soid) &&
+ pg_log.get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT &&
+ pg_log.get_log().objects.find(soid)->second->reverting_to == v);
}
dout(7) << "pull " << soid
- << " v " << v
+ << " v " << v
<< " on osds " << missing_loc[soid]
<< " from osd." << fromosd
<< dendl;
@@ -5048,24 +5054,24 @@ int ReplicatedPG::pull(
// do we have the head and/or snapdir?
hobject_t head = soid;
head.snap = CEPH_NOSNAP;
- if (missing.is_missing(head)) {
+ if (pg_log.get_missing().is_missing(head)) {
if (pulling.count(head)) {
dout(10) << " missing but already pulling head " << head << dendl;
return PULL_NONE;
} else {
- int r = pull(head, missing.missing[head].need, priority);
+ int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
if (r != PULL_NONE)
return PULL_OTHER;
return PULL_NONE;
}
}
head.snap = CEPH_SNAPDIR;
- if (missing.is_missing(head)) {
+ if (pg_log.get_missing().is_missing(head)) {
if (pulling.count(head)) {
dout(10) << " missing but already pulling snapdir " << head << dendl;
return PULL_NONE;
} else {
- int r = pull(head, missing.missing[head].need, priority);
+ int r = pull(head, pg_log.get_missing().missing.find(head)->second.need, priority);
if (r != PULL_NONE)
return PULL_OTHER;
return PULL_NONE;
@@ -5076,7 +5082,7 @@ int ReplicatedPG::pull(
SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
assert(ssc);
dout(10) << " snapset " << ssc->snapset << dendl;
- calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
+ calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset);
put_snapset_context(ssc);
@@ -5116,7 +5122,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
dout(10) << "send_remove_op " << oid << " from osd." << peer
<< " tid " << tid << dendl;
-
+
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK,
get_osdmap()->get_epoch(), tid, v);
subop->ops = vector<OSDOp>(1);
@@ -5130,8 +5136,8 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
* clones/heads and dup data ranges where possible.
*/
void ReplicatedPG::push_to_replica(
- ObjectContext *obc, const hobject_t& soid, int peer,
- int prio)
+ ObjectContext *obc, const hobject_t& soid, int peer,
+ int prio)
{
const object_info_t& oi = obc->obs.oi;
uint64_t size = obc->obs.oi.size;
@@ -5140,7 +5146,7 @@ void ReplicatedPG::push_to_replica(
map<hobject_t, interval_set<uint64_t> > clone_subsets;
interval_set<uint64_t> data_subset;
-
+
// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
hobject_t head = soid;
@@ -5148,13 +5154,13 @@ void ReplicatedPG::push_to_replica(
// try to base push off of clones that succeed/preceed poid
// we need the head (and current SnapSet) locally to do that.
- if (missing.is_missing(head)) {
+ if (pg_log.get_missing().is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
return push_start(prio, obc, soid, peer);
}
hobject_t snapdir = head;
snapdir.snap = CEPH_SNAPDIR;
- if (missing.is_missing(snapdir)) {
+ if (pg_log.get_missing().is_missing(snapdir)) {
dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
return push_start(prio, obc, soid, peer);
}
@@ -5267,7 +5273,7 @@ void ReplicatedPG::submit_push_data(
ObjectStore::Transaction *t)
{
if (first) {
- missing.revise_have(recovery_info.soid, eversion_t());
+ pg_log.revise_have(recovery_info.soid, eversion_t());
remove_snap_mapped_object(*t, recovery_info.soid);
t->remove(get_temp_coll(t), recovery_info.soid);
t->touch(get_temp_coll(t), recovery_info.soid);
@@ -5320,10 +5326,10 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
&_t);
}
- if (missing.is_missing(recovery_info.soid) &&
- missing.missing[recovery_info.soid].need > recovery_info.version) {
+ if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
assert(is_primary());
- pg_log_entry_t *latest = log.objects[recovery_info.soid];
+ const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
if (latest->op == pg_log_entry_t::LOST_REVERT &&
latest->reverting_to == recovery_info.version) {
dout(10) << " got old revert version " << recovery_info.version
@@ -5358,7 +5364,7 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
new_info.copy_subset.clear();
new_info.clone_subset.clear();
assert(ssc);
- calc_clone_subsets(ssc->snapset, new_info.soid, missing, info.last_backfill,
+ calc_clone_subsets(ssc->snapset, new_info.soid, pg_log.get_missing(), info.last_backfill,
new_info.copy_subset, new_info.clone_subset);
put_snapset_context(ssc);
return new_info;
@@ -5500,7 +5506,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
dout(20) << " kicking waiters on " << hoid << dendl;
requeue_ops(waiting_for_missing_object[hoid]);
waiting_for_missing_object.erase(hoid);
- if (missing.missing.size() == 0) {
+ if (pg_log.get_missing().missing.size() == 0) {
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
}
@@ -5911,36 +5917,22 @@ void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t
void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
{
- if (missing.is_missing(oid, v)) {
- dout(10) << "got missing " << oid << " v " << v << dendl;
- missing.got(oid, v);
- if (is_primary())
- missing_loc.erase(oid);
-
- // raise last_complete?
- if (missing.missing.empty()) {
- log.complete_to = log.log.end();
- info.last_complete = info.last_update;
- }
- while (log.complete_to != log.log.end()) {
- if (missing.missing[missing.rmissing.begin()->second].need <=
- log.complete_to->version)
- break;
- if (info.last_complete < log.complete_to->version)
- info.last_complete = log.complete_to->version;
- log.complete_to++;
- }
- if (log.complete_to != log.log.end()) {
- dout(10) << "last_complete now " << info.last_complete
- << " log.complete_to " << log.complete_to->version
- << dendl;
- } else {
- dout(10) << "last_complete now " << info.last_complete
- << " log.complete_to at end" << dendl;
- //below is not true in the repair case.
- //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong.
- assert(info.last_complete == info.last_update);
- }
+ dout(10) << "got missing " << oid << " v " << v << dendl;
+ if (pg_log.get_missing().is_missing(oid, v)) {
+ if (is_primary())
+ missing_loc.erase(oid);
+ }
+ pg_log.recover_got(oid, v, info);
+ if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) {
+ dout(10) << "last_complete now " << info.last_complete
+ << " log.complete_to " << pg_log.get_log().complete_to->version
+ << dendl;
+ } else {
+ dout(10) << "last_complete now " << info.last_complete
+ << " log.complete_to at end" << dendl;
+ //below is not true in the repair case.
+ //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong.
+ assert(info.last_complete == info.last_update);
}
}
@@ -6047,8 +6039,8 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
{
eversion_t v;
- assert(missing.is_missing(oid));
- v = missing.missing[oid].have;
+ assert(pg_log.get_missing().is_missing(oid));
+ v = pg_log.get_missing().missing.find(oid)->second.have;
dout(10) << "pick_newest_available " << oid << " " << v << " on osd." << osd->whoami << " (local)" << dendl;
for (unsigned i=1; i<acting.size(); ++i) {
@@ -6085,7 +6077,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
// Add log entry
++info.last_update.version;
pg_log_entry_t e(what, oid, info.last_update, version, osd_reqid_t(), mtime);
- log.add(e);
+ pg_log.add(e);
object_locator_t oloc;
oloc.pool = info.pgid.pool();
@@ -6121,7 +6113,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
dout(30) << __func__ << ": log before:\n";
- log.print(*_dout);
+ pg_log.get_log().print(*_dout);
*_dout << dendl;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
@@ -6129,8 +6121,9 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
utime_t mtime = ceph_clock_now(g_ceph_context);
info.last_update.epoch = get_osdmap()->get_epoch();
- map<hobject_t, pg_missing_t::item>::iterator m = missing.missing.begin();
- map<hobject_t, pg_missing_t::item>::iterator mend = missing.missing.end();
+ const pg_missing_t &missing = pg_log.get_missing();
+ map<hobject_t, pg_missing_t::item>::const_iterator m = missing.missing.begin();
+ map<hobject_t, pg_missing_t::item>::const_iterator mend = missing.missing.end();
while (m != mend) {
const hobject_t &oid(m->first);
if (missing_loc.find(oid) != missing_loc.end()) {
@@ -6145,7 +6138,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
switch (what) {
case pg_log_entry_t::LOST_MARK:
obc = mark_object_lost(t, oid, m->second.need, mtime, pg_log_entry_t::LOST_MARK);
- missing.got(m++);
+ pg_log.missing_got(m++);
assert(0 == "actually, not implemented yet!");
// we need to be careful about how this is handled on the replica!
break;
@@ -6159,12 +6152,12 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
pg_log_entry_t::LOST_REVERT, oid, info.last_update,
m->second.need, osd_reqid_t(), mtime);
e.reverting_to = prev;
- log.add(e);
+ pg_log.add(e);
dout(10) << e << dendl;
// we are now missing the new version; recovery code will sort it out.
++m;
- missing.revise_need(oid, info.last_update);
+ pg_log.revise_need(oid, info.last_update);
break;
}
/** fall-thru **/
@@ -6175,14 +6168,14 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
++info.last_update.version;
pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need,
osd_reqid_t(), mtime);
- log.add(e);
+ pg_log.add(e);
dout(10) << e << dendl;
// delete local copy? NOT YET! FIXME
if (m->second.have != eversion_t()) {
assert(0 == "not implemented.. tho i'm not sure how useful it really would be.");
}
- missing.rm(m++);
+ pg_log.missing_rm(m++);
}
break;
@@ -6195,7 +6188,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
}
dout(30) << __func__ << ": log after:\n";
- log.print(*_dout);
+ pg_log.get_log().print(*_dout);
*_dout << dendl;
if (missing.num_missing() == 0) {
@@ -6432,7 +6425,7 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
pulling.erase(*i);
finish_recovery_op(*i);
}
- log.last_requested = 0;
+ pg_log.set_last_requested(0);
pull_from_peer.erase(j++);
}
@@ -6498,6 +6491,8 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
return 0;
}
+ const pg_missing_t &missing = pg_log.get_missing();
+
int num_missing = missing.num_missing();
int num_unfound = get_num_unfound();
@@ -6607,6 +6602,8 @@ int ReplicatedPG::recover_primary(int max)
{
assert(is_primary());
+ const pg_missing_t &missing = pg_log.get_missing();
+
dout(10) << "recover_primary pulling " << pulling.size() << " in pg" << dendl;
dout(10) << "recover_primary " << missing << dendl;
dout(25) << "recover_primary " << missing.missing << dendl;
@@ -6616,20 +6613,21 @@ int ReplicatedPG::recover_primary(int max)
int started = 0;
int skipped = 0;
- map<version_t, hobject_t>::iterator p = missing.rmissing.lower_bound(log.last_requested);
+ map<version_t, hobject_t>::const_iterator p =
+ missing.rmissing.lower_bound(pg_log.get_log().last_requested);
while (p != missing.rmissing.end()) {
hobject_t soid;
version_t v = p->first;
- if (log.objects.count(p->second)) {
- latest = log.objects[p->second];
+ if (pg_log.get_log().objects.count(p->second)) {
+ latest = pg_log.get_log().objects.find(p->second)->second;
assert(latest->is_update());
soid = latest->soid;
} else {
latest = 0;
soid = p->second;
}
- pg_missing_t::item& item = missing.missing[p->second];
+ const pg_missing_t::item& item = missing.missing.find(p->second)->second;
++p;
hobject_t head = soid;
@@ -6749,7 +6747,7 @@ int ReplicatedPG::recover_primary(int max)
// only advance last_requested if we haven't skipped anything
if (!skipped)
- log.last_requested = v;
+ pg_log.set_last_requested(v);
}
return started;
@@ -6763,7 +6761,7 @@ int ReplicatedPG::recover_object_replicas(
// NOTE: we know we will get a valid oloc off of disk here.
ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false);
if (!obc) {
- missing.add(soid, v, eversion_t());
+ pg_log.missing_add(soid, v, eversion_t());
bool uhoh = true;
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
@@ -6834,7 +6832,7 @@ int ReplicatedPG::recover_replicas(int max)
continue;
}
- if (missing.is_missing(soid)) {
+ if (pg_log.get_missing().is_missing(soid)) {
if (missing_loc.find(soid) == missing_loc.end())
dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
else
@@ -7112,12 +7110,12 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
{
dout(10) << "clean_up_local" << dendl;
- assert(info.last_update >= log.tail); // otherwise we need some help!
+ assert(info.last_update >= pg_log.get_tail()); // otherwise we need some help!
// just scan the log.
set<hobject_t> did;
- for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
- p != log.log.rend();
+ for (list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
+ p != pg_log.get_log().log.rend();
++p) {
if (did.count(p->soid))
continue;
@@ -7425,7 +7423,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
assert(repop);
repop->queue_snap_trimmer = true;
- eversion_t old_last_update = pg->log.head;
+ eversion_t old_last_update = pg->pg_log.get_head();
bool old_exists = repop->obc->obs.exists;
uint64_t old_size = repop->obc->obs.oi.size;
eversion_t old_version = repop->obc->obs.oi.version;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 644a277f0dc..dcfecd3e61a 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -677,7 +677,7 @@ protected:
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
- void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, pg_missing_t& missing,
+ void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index c1b61d8b68f..44755bc7b1e 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -1400,6 +1400,24 @@ struct pg_log_t {
return head.version - tail.version;
}
+ list<pg_log_entry_t>::const_iterator find_entry(eversion_t v) const {
+ int fromhead = head.version - v.version;
+ int fromtail = v.version - tail.version;
+ list<pg_log_entry_t>::const_iterator p;
+ if (fromhead < fromtail) {
+ p = log.end();
+ --p;
+ while (p->version > v)
+ --p;
+ return p;
+ } else {
+ p = log.begin();
+ while (p->version < v)
+ ++p;
+ return p;
+ }
+ }
+
list<pg_log_entry_t>::iterator find_entry(eversion_t v) {
int fromhead = head.version - v.version;
int fromtail = v.version - tail.version;
diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h
index a1d2773a946..1838586d84c 100644
--- a/src/test/encoding/types.h
+++ b/src/test/encoding/types.h
@@ -29,8 +29,8 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental)
#include "crush/CrushWrapper.h"
TYPE(CrushWrapper)
-#include "osd/PG.h"
-TYPE(PG::OndiskLog)
+#include "osd/PGLog.h"
+TYPE(PGLog::OndiskLog)
#include "osd/osd_types.h"
TYPE(osd_reqid_t)
diff --git a/src/test/test_osd_types.cc b/src/test/test_osd_types.cc
index b696ace3ac2..d3f5364f793 100644
--- a/src/test/test_osd_types.cc
+++ b/src/test/test_osd_types.cc
@@ -216,3 +216,35 @@ TEST(pg_t, split)
ASSERT_TRUE(s.count(pg_t(7, 0, -1)));
}
+
+TEST(pg_missing_t, constructor)
+{
+ pg_missing_t missing;
+ EXPECT_EQ((unsigned int)0, missing.num_missing());
+ EXPECT_FALSE(missing.have_missing());
+}
+
+TEST(pg_missing_t, add_next_event)
+{
+ pg_missing_t missing;
+
+ // adding a DELETE entry
+ hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+ eversion_t version(1,2);
+ eversion_t prior_version(3,4);
+ pg_log_entry_t e(pg_log_entry_t::DELETE, oid, version, prior_version,
+ osd_reqid_t(entity_name_t::CLIENT(777), 8, 999), utime_t(8,9));
+ EXPECT_FALSE(missing.have_missing());
+ missing.add_next_event(e);
+ EXPECT_FALSE(missing.have_missing());
+
+ // adding a MODIFY entry
+ e.op = pg_log_entry_t::MODIFY;
+ EXPECT_FALSE(missing.have_missing());
+ missing.add_next_event(e);
+ EXPECT_TRUE(missing.have_missing());
+}
+
+// Local Variables:
+// compile-command: "cd .. ; make unittest_osd_types ; ./unittest_osd_types # --gtest_filter=pg_missing_t.constructor "
+// End:
diff --git a/src/tools/ceph-filestore-dump.cc b/src/tools/ceph-filestore-dump.cc
index 90a7d40cba1..4953bb6062e 100644
--- a/src/tools/ceph-filestore-dump.cc
+++ b/src/tools/ceph-filestore-dump.cc
@@ -32,7 +32,7 @@
#include "os/FileStore.h"
#include "common/perf_counters.h"
#include "common/errno.h"
-#include "osd/PG.h"
+#include "osd/PGLog.h"
#include "osd/OSD.h"
namespace po = boost::program_options;
@@ -322,12 +322,12 @@ static void invalid_path(string &path)
}
int get_log(ObjectStore *fs, coll_t coll, pg_t pgid, const pg_info_t &info,
- PG::IndexedLog &log, pg_missing_t &missing)
+ PGLog::IndexedLog &log, pg_missing_t &missing)
{
- PG::OndiskLog ondisklog;
+ PGLog::OndiskLog ondisklog;
try {
ostringstream oss;
- PG::read_log(fs, coll, log_oid, info, ondisklog, log, missing, oss);
+ PGLog::read_log(fs, coll, log_oid, info, ondisklog, log, missing, oss);
if (debug && oss.str().size())
cerr << oss.str() << std::endl;
}
@@ -514,7 +514,7 @@ int write_info(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info,
void write_log(ObjectStore::Transaction &t, pg_log_t &log)
{
map<eversion_t, hobject_t> divergent_priors;
- PG::_write_log(t, log, log_oid, divergent_priors);
+ PGLog::write_log(t, log, log_oid, divergent_priors);
}
int write_pg(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info,
@@ -666,7 +666,7 @@ void write_super()
int do_export(ObjectStore *fs, coll_t coll, pg_t pgid, pg_info_t &info,
epoch_t map_epoch, __u8 struct_ver)
{
- PG::IndexedLog log;
+ PGLog::IndexedLog log;
pg_missing_t missing;
int ret = get_log(fs, coll, pgid, info, log, missing);
@@ -913,7 +913,7 @@ int do_import(ObjectStore *store)
{
bufferlist ebl;
pg_info_t info;
- PG::IndexedLog log;
+ PGLog::IndexedLog log;
uint64_t next_removal_seq = 0; //My local seq
finish_remove_pgs(store, &next_removal_seq);
@@ -1268,7 +1268,7 @@ int main(int argc, char **argv)
formatter->flush(cout);
cout << std::endl;
} else if (type == "log") {
- PG::IndexedLog log;
+ PGLog::IndexedLog log;
pg_missing_t missing;
ret = get_log(fs, coll, pgid, info, log, missing);
if (ret > 0)