summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-11 16:06:07 -0700
committerSage Weil <sage@inktank.com>2013-10-22 13:45:39 -0700
commit057bdb92a065abcc9d3b51a3aea4c24246621a5b (patch)
tree0180243c7b107fb594a256831b535fd823702ed6
parentf8054bb39345b934e7954e02d33415cc0c5c9833 (diff)
downloadceph-057bdb92a065abcc9d3b51a3aea4c24246621a5b.tar.gz
osd/ReplicatedPG: add basic HitSet tracking
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/osd/PG.cc2
-rw-r--r--src/osd/PG.h1
-rw-r--r--src/osd/ReplicatedPG.cc218
-rw-r--r--src/osd/ReplicatedPG.h19
5 files changed, 231 insertions, 10 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 0b3938ecb9e..dea23bae02f 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -393,6 +393,7 @@ OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Conf
OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num
OPTION(osd_pool_default_flags, OPT_INT, 0) // default flags for new pools
OPTION(osd_pool_default_flag_hashpspool, OPT_BOOL, false) // use new pg hashing to prevent pool/pg overlap
+OPTION(osd_hit_set_min_size, OPT_INT, 1000) // min target size for a HitSet
OPTION(osd_map_dedup, OPT_BOOL, true)
OPTION(osd_map_cache_size, OPT_INT, 500)
OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 8f7d3ccb684..91e71236819 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -4980,6 +4980,8 @@ void PG::handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap,
dout(10) << "handle_advance_map " << newup << "/" << newacting << dendl;
update_osdmap_ref(osdmap);
pool.update(osdmap);
+ if (pool.info.last_change == osdmap_ref->get_epoch())
+ on_pool_change();
AdvMap evt(osdmap, lastmap, newup, newacting);
recovery_state.handle_event(evt, rctx);
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index de7f0b82cac..08bf2f91610 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -1828,6 +1828,7 @@ public:
virtual bool same_for_rep_modify_since(epoch_t e) = 0;
virtual void on_role_change() = 0;
+ virtual void on_pool_change() = 0;
virtual void on_change(ObjectStore::Transaction *t) = 0;
virtual void on_activate() = 0;
virtual void on_flushed() = 0;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 775147d667f..ae93db3035e 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -940,14 +940,13 @@ void ReplicatedPG::do_op(OpRequestRef op)
ObjectContextRef obc;
bool can_create = op->may_write();
snapid_t snapid;
- int r = find_object_context(
- hobject_t(m->get_oid(),
- m->get_object_locator().key,
- m->get_snapid(),
- m->get_pg().ps(),
- m->get_object_locator().get_pool(),
- m->get_object_locator().nspace),
- &obc, can_create, &snapid);
+ hobject_t oid(m->get_oid(),
+ m->get_object_locator().key,
+ m->get_snapid(),
+ m->get_pg().ps(),
+ m->get_object_locator().get_pool(),
+ m->get_object_locator().nspace);
+ int r = find_object_context(oid, &obc, can_create, &snapid);
if (r == -EAGAIN) {
// If we're not the primary of this OSD, and we have
@@ -966,6 +965,14 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
}
+ if (hit_set) {
+ hit_set->insert(oid);
+ if (hit_set->insert_count() >= info.hit_set.current_info.target_size ||
+ hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) {
+ hit_set_persist();
+ }
+ }
+
if (maybe_handle_cache(op, obc, r))
return;
@@ -7309,12 +7316,19 @@ void ReplicatedPG::on_activate()
<< " from " << backfill_pos << dendl;
}
}
+
+ hit_set_setup();
}
void ReplicatedPG::on_change(ObjectStore::Transaction *t)
{
dout(10) << "on_change" << dendl;
+ if (hit_set && hit_set->insert_count() == 0) {
+ dout(20) << " discarding empty hit_set" << dendl;
+ hit_set_clear();
+ }
+
// requeue everything in the reverse order they should be
// reexamined.
@@ -7372,8 +7386,17 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
void ReplicatedPG::on_role_change()
{
dout(10) << "on_role_change" << dendl;
+ if (get_role() != 0 && hit_set) {
+ dout(10) << " clearing hit set" << dendl;
+ hit_set_clear();
+ }
}
+void ReplicatedPG::on_pool_change()
+{
+ dout(10) << __func__ << dendl;
+ hit_set_setup();
+}
// clear state. called on recovery completion AND cancellation.
void ReplicatedPG::_clear_recovery_state()
@@ -8254,6 +8277,183 @@ void ReplicatedPG::check_local()
+// ===========================
+// hit sets
+
+hobject_t ReplicatedPG::get_hit_set_current_object(utime_t stamp)
+{
+ ostringstream ss;
+ ss << "hit_set_" << info.pgid << "_current_" << stamp;
+ hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "",
+ info.pgid.ps(), info.pgid.pool(), "");
+ dout(20) << __func__ << " " << hoid << dendl;
+ return hoid;
+}
+
+hobject_t ReplicatedPG::get_hit_set_archive_object(utime_t start, utime_t end)
+{
+ ostringstream ss;
+ ss << "hit_set_" << info.pgid << "_archive_" << start << "_" << end;
+ hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "",
+ info.pgid.ps(), info.pgid.pool(), "");
+ dout(20) << __func__ << " " << hoid << dendl;
+ return hoid;
+}
+
+void ReplicatedPG::hit_set_clear()
+{
+ hit_set.reset(NULL);
+ hit_set_start_stats.reset(NULL);
+ hit_set_start_stamp = utime_t();
+}
+
+void ReplicatedPG::hit_set_setup()
+{
+ if (!pool.info.hit_set_count ||
+ !pool.info.hit_set_period ||
+ pool.info.hit_set_fpp_micro == 0 ||
+ pool.info.hit_set_type == HitSet::TYPE_NONE) {
+ hit_set_clear();
+ return;
+ }
+
+ // FIXME: discard any previous data for now
+ hit_set_create();
+}
+
+void ReplicatedPG::hit_set_create()
+{
+ utime_t now = ceph_clock_now(NULL);
+
+ int target_size = 0;
+ double actual_fpp = (double)pool.info.hit_set_fpp_micro / 1000000.0;
+ double bin_fpp = actual_fpp / pool.info.hit_set_count;
+
+ // oh yay, estimate target size based on the previous bin!
+ if (hit_set) {
+ /*
+ pg_stat_t cur = info.stats;
+ cur.sub(*hit_set_start_stats);
+ double ops = cur.stats.sum.num_rd + cur.stats.sum.num_wr;
+ */
+ utime_t dur = now - hit_set_start_stamp;
+ unsigned unique = hit_set->approx_unique_insert_count();
+ dout(20) << __func__ << " previous set had approx " << unique << " unique items over " << dur << " seconds" << dendl;
+ target_size = (double)unique * (double)pool.info.hit_set_period / (double)dur;
+ }
+ if (target_size < g_conf->osd_hit_set_min_size)
+ target_size = g_conf->osd_hit_set_min_size;
+
+ dout(10) << __func__ << " target_size " << target_size
+ << " fpp " << bin_fpp << dendl;
+ hit_set.reset(new HitSet(pool.info.hit_set_type, target_size,
+ bin_fpp, now.sec()));
+ hit_set_start_stats.reset(new pg_stat_t(info.stats));
+ hit_set_start_stamp = now;
+ info.hit_set.current_info = pg_hit_set_info_t(target_size, now);
+}
+
+/**
+ * apply log entries to set
+ *
+ * this would only happen after peering, to at least capture writes
+ * during an interval that was potentially lost.
+ */
+bool ReplicatedPG::hit_set_apply_log()
+{
+ if (!hit_set)
+ return false;
+
+ eversion_t to = info.last_update;
+ eversion_t from = info.hit_set.current_last_update;
+ if (to <= from) {
+ dout(20) << __func__ << " no update" << dendl;
+ return false;
+ }
+
+ dout(20) << __func__ << " " << to << " .. " << info.last_update << dendl;
+ list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
+ while (p != pg_log.get_log().log.rend() && p->version > to)
+ ++p;
+ while (p != pg_log.get_log().log.rend() && p->version > from) {
+ hit_set->insert(p->soid);
+ ++p;
+ }
+
+ info.hit_set.current_last_update = to;
+ info.hit_set.current_info.size = hit_set->insert_count();
+ return true;
+}
+
+void ReplicatedPG::hit_set_persist()
+{
+ dout(10) << __func__ << dendl;
+ bufferlist bl;
+ hit_set->optimize();
+ ::encode(*hit_set, bl);
+
+ utime_t now = ceph_clock_now(cct);
+ RepGather *repop;
+ hobject_t oid;
+ bool reset = false;
+
+ if (!info.hit_set.current_info.begin)
+ info.hit_set.current_info.begin = hit_set_start_stamp;
+ info.hit_set.current_info.size = hit_set->insert_count();
+ if (info.hit_set.current_info.is_full()) {
+ // archive
+ info.hit_set.current_info.end = now;
+ info.hit_set.history.push_back(info.hit_set.current_info);
+ oid = get_hit_set_archive_object(info.hit_set.current_info.begin,
+ info.hit_set.current_info.end);
+ dout(20) << __func__ << " archive " << oid << dendl;
+ reset = true;
+ } else {
+ // persist snapshot of current hitset
+ oid = get_hit_set_current_object(now);
+ dout(20) << __func__ << " checkpoint " << oid << dendl;
+ }
+ info.hit_set.current_last_update = info.last_update;
+ info.hit_set.current_last_stamp = now;
+ if (reset)
+ hit_set_create();
+
+ ObjectContextRef obc = get_object_context(oid, true);
+ repop = simple_repop_create(obc);
+ repop->ctx->op_t.write(coll, oid, 0, bl.length(), bl);
+ repop->ctx->at_version = get_next_version();
+ repop->ctx->log.push_back(
+ pg_log_entry_t(
+ pg_log_entry_t::MODIFY,
+ oid,
+ repop->ctx->at_version,
+ repop->ctx->obs->oi.version,
+ 0,
+ osd_reqid_t(),
+ repop->ctx->mtime)
+ );
+
+ if (info.hit_set.current_last_stamp != utime_t()) {
+ // FIXME: we cheat slightly here by bundling in a remove on a object
+ // other the RepGather object. we aren't carrying an ObjectContext for
+ // the deleted object over this period.
+ hobject_t old_obj =
+ get_hit_set_current_object(info.hit_set.current_last_stamp);
+ repop->ctx->op_t.remove(coll, old_obj);
+ ++repop->ctx->at_version.version;
+ repop->ctx->log.push_back(
+ pg_log_entry_t(pg_log_entry_t::DELETE,
+ old_obj,
+ repop->ctx->at_version,
+ info.hit_set.current_last_update,
+ 0,
+ osd_reqid_t(),
+ repop->ctx->mtime));
+ }
+
+ simple_repop_submit(repop);
+}
+
// ==========================================================================================
// SCRUB
@@ -8554,7 +8754,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
assert(repop);
repop->queue_snap_trimmer = true;
repops.insert(repop->get());
- simple_repop_submit(repop);
+ pg->simple_repop_submit(repop);
return discard_event();
}
/* WaitingOnReplicasObjects */
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 0ec316fe1b2..bbe3f56be5b 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -23,8 +23,9 @@
#include "include/assert.h"
#include "common/cmdparse.h"
-#include "PG.h"
+#include "HitSet.h"
#include "OSD.h"
+#include "PG.h"
#include "Watch.h"
#include "OpRequest.h"
@@ -646,6 +647,21 @@ protected:
RepGather *simple_repop_create(ObjectContextRef obc);
void simple_repop_submit(RepGather *repop);
+ // hot/cold tracking
+ boost::scoped_ptr<HitSet> hit_set; ///< currently accumulating HitSet
+ utime_t hit_set_start_stamp; ///< time the current HitSet started recording
+ boost::scoped_ptr<pg_stat_t> hit_set_start_stats;
+
+ void hit_set_clear(); ///< discard any HitSet state
+ void hit_set_setup(); ///< initialize HitSet state
+ void hit_set_create(); ///< create a new HitSet
+ void hit_set_check();
+ void hit_set_persist(); ///< persist hit info
+ bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
+
+ hobject_t get_hit_set_current_object(utime_t stamp);
+ hobject_t get_hit_set_archive_object(utime_t start, utime_t end);
+
/// true if we can send an ondisk/commit for v
bool already_complete(eversion_t v) {
for (xlist<RepGather*>::iterator i = repop_queue.begin();
@@ -1137,6 +1153,7 @@ public:
void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs);
void on_role_change();
+ void on_pool_change();
void on_change(ObjectStore::Transaction *t);
void on_activate();
void on_flushed() {