diff options
author | Sage Weil <sage@inktank.com> | 2013-10-11 16:06:07 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-10-22 13:45:39 -0700 |
commit | 057bdb92a065abcc9d3b51a3aea4c24246621a5b (patch) | |
tree | 0180243c7b107fb594a256831b535fd823702ed6 | |
parent | f8054bb39345b934e7954e02d33415cc0c5c9833 (diff) | |
download | ceph-057bdb92a065abcc9d3b51a3aea4c24246621a5b.tar.gz |
osd/ReplicatedPG: add basic HitSet tracking
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/osd/PG.cc | 2 | ||||
-rw-r--r-- | src/osd/PG.h | 1 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 218 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 19 |
5 files changed, 231 insertions, 10 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 0b3938ecb9e..dea23bae02f 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -393,6 +393,7 @@ OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Conf OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num OPTION(osd_pool_default_flags, OPT_INT, 0) // default flags for new pools OPTION(osd_pool_default_flag_hashpspool, OPT_BOOL, false) // use new pg hashing to prevent pool/pg overlap +OPTION(osd_hit_set_min_size, OPT_INT, 1000) // min target size for a HitSet OPTION(osd_map_dedup, OPT_BOOL, true) OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 8f7d3ccb684..91e71236819 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4980,6 +4980,8 @@ void PG::handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap, dout(10) << "handle_advance_map " << newup << "/" << newacting << dendl; update_osdmap_ref(osdmap); pool.update(osdmap); + if (pool.info.last_change == osdmap_ref->get_epoch()) + on_pool_change(); AdvMap evt(osdmap, lastmap, newup, newacting); recovery_state.handle_event(evt, rctx); } diff --git a/src/osd/PG.h b/src/osd/PG.h index de7f0b82cac..08bf2f91610 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1828,6 +1828,7 @@ public: virtual bool same_for_rep_modify_since(epoch_t e) = 0; virtual void on_role_change() = 0; + virtual void on_pool_change() = 0; virtual void on_change(ObjectStore::Transaction *t) = 0; virtual void on_activate() = 0; virtual void on_flushed() = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 775147d667f..ae93db3035e 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -940,14 +940,13 @@ void ReplicatedPG::do_op(OpRequestRef op) ObjectContextRef obc; bool can_create = op->may_write(); snapid_t snapid; - int r = find_object_context( - hobject_t(m->get_oid(), - m->get_object_locator().key, - m->get_snapid(), - m->get_pg().ps(), - m->get_object_locator().get_pool(), - m->get_object_locator().nspace), - &obc, can_create, &snapid); + hobject_t oid(m->get_oid(), + m->get_object_locator().key, + m->get_snapid(), + m->get_pg().ps(), + m->get_object_locator().get_pool(), + m->get_object_locator().nspace); + int r = find_object_context(oid, &obc, can_create, &snapid); if (r == -EAGAIN) { // If we're not the primary of this OSD, and we have @@ -966,6 +965,14 @@ void ReplicatedPG::do_op(OpRequestRef op) } } + if (hit_set) { + hit_set->insert(oid); + if (hit_set->insert_count() >= info.hit_set.current_info.target_size || + hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) { + hit_set_persist(); + } + } + if (maybe_handle_cache(op, obc, r)) return; @@ -7309,12 +7316,19 @@ void ReplicatedPG::on_activate() << " from " << backfill_pos << dendl; } } + + hit_set_setup(); } void ReplicatedPG::on_change(ObjectStore::Transaction *t) { dout(10) << "on_change" << dendl; + if (hit_set && hit_set->insert_count() == 0) { + dout(20) << " discarding empty hit_set" << dendl; + hit_set_clear(); + } + // requeue everything in the reverse order they should be // reexamined. @@ -7372,8 +7386,17 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) void ReplicatedPG::on_role_change() { dout(10) << "on_role_change" << dendl; + if (get_role() != 0 && hit_set) { + dout(10) << " clearing hit set" << dendl; + hit_set_clear(); + } } +void ReplicatedPG::on_pool_change() +{ + dout(10) << __func__ << dendl; + hit_set_setup(); +} // clear state. called on recovery completion AND cancellation. void ReplicatedPG::_clear_recovery_state() @@ -8254,6 +8277,183 @@ void ReplicatedPG::check_local() +// =========================== +// hit sets + +hobject_t ReplicatedPG::get_hit_set_current_object(utime_t stamp) +{ + ostringstream ss; + ss << "hit_set_" << info.pgid << "_current_" << stamp; + hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "", + info.pgid.ps(), info.pgid.pool(), ""); + dout(20) << __func__ << " " << hoid << dendl; + return hoid; +} + +hobject_t ReplicatedPG::get_hit_set_archive_object(utime_t start, utime_t end) +{ + ostringstream ss; + ss << "hit_set_" << info.pgid << "_archive_" << start << "_" << end; + hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "", + info.pgid.ps(), info.pgid.pool(), ""); + dout(20) << __func__ << " " << hoid << dendl; + return hoid; +} + +void ReplicatedPG::hit_set_clear() +{ + hit_set.reset(NULL); + hit_set_start_stats.reset(NULL); + hit_set_start_stamp = utime_t(); +} + +void ReplicatedPG::hit_set_setup() +{ + if (!pool.info.hit_set_count || + !pool.info.hit_set_period || + pool.info.hit_set_fpp_micro == 0 || + pool.info.hit_set_type == HitSet::TYPE_NONE) { + hit_set_clear(); + return; + } + + // FIXME: discard any previous data for now + hit_set_create(); +} + +void ReplicatedPG::hit_set_create() +{ + utime_t now = ceph_clock_now(NULL); + + int target_size = 0; + double actual_fpp = (double)pool.info.hit_set_fpp_micro / 1000000.0; + double bin_fpp = actual_fpp / pool.info.hit_set_count; + + // oh yay, estimate target size based on the previous bin! + if (hit_set) { + /* + pg_stat_t cur = info.stats; + cur.sub(*hit_set_start_stats); + double ops = cur.stats.sum.num_rd + cur.stats.sum.num_wr; + */ + utime_t dur = now - hit_set_start_stamp; + unsigned unique = hit_set->approx_unique_insert_count(); + dout(20) << __func__ << " previous set had approx " << unique << " unique items over " << dur << " seconds" << dendl; + target_size = (double)unique * (double)pool.info.hit_set_period / (double)dur; + } + if (target_size < g_conf->osd_hit_set_min_size) + target_size = g_conf->osd_hit_set_min_size; + + dout(10) << __func__ << " target_size " << target_size + << " fpp " << bin_fpp << dendl; + hit_set.reset(new HitSet(pool.info.hit_set_type, target_size, + bin_fpp, now.sec())); + hit_set_start_stats.reset(new pg_stat_t(info.stats)); + hit_set_start_stamp = now; + info.hit_set.current_info = pg_hit_set_info_t(target_size, now); +} + +/** + * apply log entries to set + * + * this would only happen after peering, to at least capture writes + * during an interval that was potentially lost. + */ +bool ReplicatedPG::hit_set_apply_log() +{ + if (!hit_set) + return false; + + eversion_t to = info.last_update; + eversion_t from = info.hit_set.current_last_update; + if (to <= from) { + dout(20) << __func__ << " no update" << dendl; + return false; + } + + dout(20) << __func__ << " " << to << " .. " << info.last_update << dendl; + list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin(); + while (p != pg_log.get_log().log.rend() && p->version > to) + ++p; + while (p != pg_log.get_log().log.rend() && p->version > from) { + hit_set->insert(p->soid); + ++p; + } + + info.hit_set.current_last_update = to; + info.hit_set.current_info.size = hit_set->insert_count(); + return true; +} + +void ReplicatedPG::hit_set_persist() +{ + dout(10) << __func__ << dendl; + bufferlist bl; + hit_set->optimize(); + ::encode(*hit_set, bl); + + utime_t now = ceph_clock_now(cct); + RepGather *repop; + hobject_t oid; + bool reset = false; + + if (!info.hit_set.current_info.begin) + info.hit_set.current_info.begin = hit_set_start_stamp; + info.hit_set.current_info.size = hit_set->insert_count(); + if (info.hit_set.current_info.is_full()) { + // archive + info.hit_set.current_info.end = now; + info.hit_set.history.push_back(info.hit_set.current_info); + oid = get_hit_set_archive_object(info.hit_set.current_info.begin, + info.hit_set.current_info.end); + dout(20) << __func__ << " archive " << oid << dendl; + reset = true; + } else { + // persist snapshot of current hitset + oid = get_hit_set_current_object(now); + dout(20) << __func__ << " checkpoint " << oid << dendl; + } + info.hit_set.current_last_update = info.last_update; + info.hit_set.current_last_stamp = now; + if (reset) + hit_set_create(); + + ObjectContextRef obc = get_object_context(oid, true); + repop = simple_repop_create(obc); + repop->ctx->op_t.write(coll, oid, 0, bl.length(), bl); + repop->ctx->at_version = get_next_version(); + repop->ctx->log.push_back( + pg_log_entry_t( + pg_log_entry_t::MODIFY, + oid, + repop->ctx->at_version, + repop->ctx->obs->oi.version, + 0, + osd_reqid_t(), + repop->ctx->mtime) + ); + + if (info.hit_set.current_last_stamp != utime_t()) { + // FIXME: we cheat slightly here by bundling in a remove on a object + // other the RepGather object. we aren't carrying an ObjectContext for + // the deleted object over this period. + hobject_t old_obj = + get_hit_set_current_object(info.hit_set.current_last_stamp); + repop->ctx->op_t.remove(coll, old_obj); + ++repop->ctx->at_version.version; + repop->ctx->log.push_back( + pg_log_entry_t(pg_log_entry_t::DELETE, + old_obj, + repop->ctx->at_version, + info.hit_set.current_last_update, + 0, + osd_reqid_t(), + repop->ctx->mtime)); + } + + simple_repop_submit(repop); +} + // ========================================================================================== // SCRUB @@ -8554,7 +8754,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) assert(repop); repop->queue_snap_trimmer = true; repops.insert(repop->get()); - simple_repop_submit(repop); + pg->simple_repop_submit(repop); return discard_event(); } /* WaitingOnReplicasObjects */ diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 0ec316fe1b2..bbe3f56be5b 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -23,8 +23,9 @@ #include "include/assert.h" #include "common/cmdparse.h" -#include "PG.h" +#include "HitSet.h" #include "OSD.h" +#include "PG.h" #include "Watch.h" #include "OpRequest.h" @@ -646,6 +647,21 @@ protected: RepGather *simple_repop_create(ObjectContextRef obc); void simple_repop_submit(RepGather *repop); + // hot/cold tracking + boost::scoped_ptr<HitSet> hit_set; ///< currently accumulating HitSet + utime_t hit_set_start_stamp; ///< time the current HitSet started recording + boost::scoped_ptr<pg_stat_t> hit_set_start_stats; + + void hit_set_clear(); ///< discard any HitSet state + void hit_set_setup(); ///< initialize HitSet state + void hit_set_create(); ///< create a new HitSet + void hit_set_check(); + void hit_set_persist(); ///< persist hit info + bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet + + hobject_t get_hit_set_current_object(utime_t stamp); + hobject_t get_hit_set_archive_object(utime_t start, utime_t end); + /// true if we can send an ondisk/commit for v bool already_complete(eversion_t v) { for (xlist<RepGather*>::iterator i = repop_queue.begin(); @@ -1137,6 +1153,7 @@ public: void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs); void on_role_change(); + void on_pool_change(); void on_change(ObjectStore::Transaction *t); void on_activate(); void on_flushed() { |