diff options
author | Sage Weil <sage@inktank.com> | 2013-10-11 16:07:55 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-10-22 13:45:39 -0700 |
commit | 5a2142ee433c8a1dcda857863be78da6eb96a56a (patch) | |
tree | eab952b91dc1239c046c095705fe081d42f08f49 | |
parent | 057bdb92a065abcc9d3b51a3aea4c24246621a5b (diff) | |
download | ceph-5a2142ee433c8a1dcda857863be78da6eb96a56a.tar.gz |
librados, osd: list and get HitSets via librados
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/ceph_strings.cc | 2 | ||||
-rw-r--r-- | src/include/rados.h | 5 | ||||
-rw-r--r-- | src/include/rados/librados.hpp | 22 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.cc | 31 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.h | 5 | ||||
-rw-r--r-- | src/librados/librados.cc | 16 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 50 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 5 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 79 | ||||
-rw-r--r-- | src/test/Makefile.am | 3 | ||||
-rw-r--r-- | src/test/librados/tier.cc | 113 |
11 files changed, 322 insertions, 9 deletions
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc index 221fb059740..2749fbb0a3b 100644 --- a/src/common/ceph_strings.cc +++ b/src/common/ceph_strings.cc @@ -86,6 +86,8 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_PGLS: return "pgls"; case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter"; + case CEPH_OSD_OP_PG_HITSET_LS: return "pg-hitset-ls"; + case CEPH_OSD_OP_PG_HITSET_GET: return "pg-hitset-get"; case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys"; case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals"; case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header"; diff --git a/src/include/rados.h b/src/include/rados.h index e7a32b5afef..70c04d4167d 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -265,6 +265,8 @@ enum { /** pg **/ CEPH_OSD_OP_PGLS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1, CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2, + CEPH_OSD_OP_PG_HITSET_LS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 3, + CEPH_OSD_OP_PG_HITSET_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 4, }; static inline int ceph_osd_op_type_lock(int op) @@ -417,6 +419,9 @@ struct ceph_osd_op { __le64 snapid; __le64 src_version; } __attribute__ ((packed)) copy_from; + struct { + struct ceph_timespec stamp; + } __attribute__ ((packed)) hit_set_get; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 58a64996705..a6449c58d9b 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -422,7 +422,6 @@ namespace librados * @param prval [out] place error code in prval upon completion */ void is_dirty(bool *isdirty, int *prval); - }; /* IoCtx : This is a context in which we can perform I/O. @@ -598,6 +597,27 @@ namespace librados /// Iterator indicating the end of a pool const ObjectIterator& objects_end() const; + /** + * List available hit set objects + * + * @param uint32_t [in] hash position to query + * @param c [in] completion + * @param pls [out] list of available intervals + */ + int hit_set_list(uint32_t hash, AioCompletion *c, + std::list< std::pair<time_t, time_t> > *pls); + + /** + * Retrieve hit set for a given hash, and time + * + * @param uint32_t [in] hash position + * @param c [in] completion + * @param stamp [in] time interval that falls within the hit set's interval + * @param pbl [out] buffer to store the result in + */ + int hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp, + bufferlist *pbl); + uint64_t get_last_version(); int aio_read(const std::string& oid, AioCompletion *c, diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 49a61173cc1..b31bf86f758 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -808,6 +808,37 @@ int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c, return 0; } +int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c, + std::list< std::pair<time_t, time_t> > *pls) +{ + Context *onack = new C_aio_Ack(c); + c->is_read = true; + c->io = this; + + Mutex::Locker l(*lock); + ::ObjectOperation rd; + rd.hit_set_ls(pls, NULL); + object_locator_t oloc(poolid); + objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL); + return 0; +} + +int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c, + time_t stamp, + bufferlist *pbl) +{ + Context *onack = new C_aio_Ack(c); + c->is_read = true; + c->io = this; + + Mutex::Locker l(*lock); + ::ObjectOperation rd; + rd.hit_set_get(utime_t(stamp, 0), pbl, 0); + object_locator_t oloc(poolid); + objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL); + return 0; +} + int librados::IoCtxImpl::remove(const object_t& oid) { ::ObjectOperation op; diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 7d7cf3722c7..d748db43b95 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -187,6 +187,11 @@ struct librados::IoCtxImpl { int pool_change_auid(unsigned long long auid); int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c); + int hit_set_list(uint32_t hash, AioCompletionImpl *c, + std::list< std::pair<time_t, time_t> > *pls); + int hit_set_get(uint32_t hash, AioCompletionImpl *c, time_t stamp, + bufferlist *pbl); + void set_sync_op_version(version_t ver); int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); int unwatch(const object_t& oid, uint64_t cookie); diff --git a/src/librados/librados.cc b/src/librados/librados.cc index a9f317f06cc..002e58f9f4c 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -275,8 +275,6 @@ void librados::ObjectReadOperation::is_dirty(bool *is_dirty, int *prval) o->is_dirty(is_dirty, prval); } - - int librados::IoCtx::omap_get_vals(const std::string& oid, const std::string& start_after, const std::string& filter_prefix, @@ -1158,6 +1156,20 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const return ObjectIterator::__EndObjectIterator; } +int librados::IoCtx::hit_set_list(uint32_t hash, AioCompletion *c, + std::list< std::pair<time_t, time_t> > *pls) +{ + return io_ctx_impl->hit_set_list(hash, c->pc, pls); +} + +int librados::IoCtx::hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp, + bufferlist *pbl) +{ + return io_ctx_impl->hit_set_get(hash, c->pc, stamp, pbl); +} + + + uint64_t librados::IoCtx::get_last_version() { return io_ctx_impl->last_version(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ae93db3035e..3995ae0f320 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -731,6 +731,56 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) } break; + case CEPH_OSD_OP_PG_HITSET_LS: + { + list< pair<utime_t,utime_t> > ls; + for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin(); + p != info.hit_set.history.end(); + ++p) + ls.push_back(make_pair(p->begin, p->end)); + if (info.hit_set.current_info.begin) + ls.push_back(make_pair(info.hit_set.current_info.begin, utime_t())); + else if (hit_set) + ls.push_back(make_pair(hit_set_start_stamp, utime_t())); + ::encode(ls, osd_op.outdata); + } + break; + + case CEPH_OSD_OP_PG_HITSET_GET: + { + utime_t stamp(osd_op.op.hit_set_get.stamp); + if ((info.hit_set.current_info.begin && + stamp >= info.hit_set.current_info.begin) || + stamp >= hit_set_start_stamp) { + // read the current in-memory HitSet, not the version we've + // checkpointed. + if (!hit_set) { + result= -ENOENT; + break; + } + ::encode(*hit_set, osd_op.outdata); + result = osd_op.outdata.length(); + } else { + // read an archived HitSet. + hobject_t oid; + for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin(); + p != info.hit_set.history.end(); + ++p) { + if (stamp >= p->begin && stamp <= p->end) { + oid = get_hit_set_archive_object(p->begin, p->end); + break; + } + } + if (oid == hobject_t()) { + result = -ENOENT; + break; + } + result = osd->store->read(coll, oid, 0, 0, osd_op.outdata); + } + } + break; + + default: result = -EINVAL; break; diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 65f6f27e29f..b441d3fc8c7 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -3743,6 +3743,11 @@ ostream& operator<<(ostream& out, const OSDOp& op) case CEPH_OSD_OP_PGLS_FILTER: out << " start_epoch " << op.op.pgls.start_epoch; break; + case CEPH_OSD_OP_PG_HITSET_LS: + break; + case CEPH_OSD_OP_PG_HITSET_GET: + out << " " << utime_t(op.op.hit_set_get.stamp); + break; } } else if (ceph_osd_op_type_multi(op.op.op)) { switch (op.op.op) { diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index b24349499a1..40331f2afb4 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -676,6 +676,85 @@ struct ObjectOperation { out_handler[p] = h; } + struct C_ObjectOperation_hit_set_ls : public Context { + bufferlist bl; + std::list< std::pair<time_t, time_t> > *ptls; + std::list< std::pair<utime_t, utime_t> > *putls; + int *prval; + C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t, + std::list< std::pair<utime_t, utime_t> > *ut, + int *r) + : ptls(t), putls(ut), prval(r) {} + void finish(int r) { + if (r < 0) + return; + try { + bufferlist::iterator p = bl.begin(); + std::list< std::pair<utime_t, utime_t> > ls; + ::decode(ls, p); + if (ptls) { + ptls->clear(); + for (list< pair<utime_t,utime_t> >::iterator p = ls.begin(); p != ls.end(); ++p) + // round initial timestamp up to the next full second to keep this a valid interval. + ptls->push_back(make_pair(p->first.usec() ? p->first.sec() + 1 : p->first.sec(), p->second.sec())); + } + if (putls) + putls->swap(ls); + } catch (buffer::error& e) { + r = -EIO; + } + if (prval) + *prval = r; + } + }; + + /** + * list available HitSets. + * + * We will get back a list of time intervals. Note that the most recent range may have + * an empty end timestamp if it is still accumulating. + * + * @param pls [out] list of time intervals + * @param prval [out] return value + */ + void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) { + add_op(CEPH_OSD_OP_PG_HITSET_LS); + unsigned p = ops.size() - 1; + out_rval[p] = prval; + C_ObjectOperation_hit_set_ls *h = + new C_ObjectOperation_hit_set_ls(pls, NULL, prval); + out_bl[p] = &h->bl; + out_handler[p] = h; + } + void hit_set_ls(std::list< std::pair<utime_t, utime_t> > *pls, int *prval) { + add_op(CEPH_OSD_OP_PG_HITSET_LS); + unsigned p = ops.size() - 1; + out_rval[p] = prval; + C_ObjectOperation_hit_set_ls *h = + new C_ObjectOperation_hit_set_ls(NULL, pls, prval); + out_bl[p] = &h->bl; + out_handler[p] = h; + } + + /** + * get HitSet + * + * Return an encoded HitSet that includes the provided time + * interval. + * + * @param stamp [in] timestamp + * @param pbl [out] target buffer for encoded HitSet + * @param prval [out] return value + */ + void hit_set_get(utime_t stamp, bufferlist *pbl, int *prval) { + OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET); + op.op.hit_set_get.stamp.tv_sec = stamp.sec(); + op.op.hit_set_get.stamp.tv_nsec = stamp.nsec(); + unsigned p = ops.size() - 1; + out_rval[p] = prval; + out_bl[p] = pbl; + } + void omap_get_header(bufferlist *bl, int *prval) { add_op(CEPH_OSD_OP_OMAPGETHEADER); unsigned p = ops.size() - 1; diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 3b4dc04b3e0..9d033467b01 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -769,7 +769,8 @@ bin_DEBUGPROGRAMS += ceph_test_rados_api_misc ceph_test_rados_api_tier_SOURCES = \ test/librados/tier.cc \ - test/librados/test.cc + test/librados/test.cc \ + osd/HitSet.cc ceph_test_rados_api_tier_LDADD = $(LIBRADOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) ceph_test_rados_api_tier_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_rados_api_tier diff --git a/src/test/librados/tier.cc b/src/test/librados/tier.cc index 3485f2f91ce..24c08e21c9e 100644 --- a/src/test/librados/tier.cc +++ b/src/test/librados/tier.cc @@ -8,12 +8,15 @@ #include "include/rados/librados.h" #include "include/rados/librados.hpp" #include "include/stringify.h" +#include "include/types.h" #include "global/global_context.h" #include "global/global_init.h" #include "common/ceph_argparse.h" #include "common/common_init.h" #include "test/librados/test.h" +#include "osd/HitSet.h" + #include <errno.h> #include <map> #include <sstream> @@ -79,7 +82,7 @@ TEST(LibRadosMisc, Dirty) { ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); } -TEST(LibRadosMisc, HitSet) { +TEST(LibRadosMisc, HitSetNone) { Rados cluster; std::string pool_name = get_temp_pool_name(); ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); @@ -87,11 +90,111 @@ TEST(LibRadosMisc, HitSet) { ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx)); { - ObjectReadOperation op; list< pair<time_t,time_t> > ls; - int rval; - op.hit_set_ls(123, &ls, &rval); - ASSERT_EQ(0, ioctx.operate("foo", &op, NULL)); + AioCompletion *c = librados::Rados::aio_create_completion(); + ASSERT_EQ(0, ioctx.hit_set_list(123, c, &ls)); + c->wait_for_complete(); + ASSERT_EQ(0, c->get_return_value()); + ASSERT_TRUE(ls.empty()); + c->release(); + } + { + bufferlist bl; + AioCompletion *c = librados::Rados::aio_create_completion(); + ASSERT_EQ(0, ioctx.hit_set_get(123, c, 12345, &bl)); + c->wait_for_complete(); + ASSERT_EQ(-ENOENT, c->get_return_value()); + c->release(); + } + + ioctx.close(); + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + +string set_pool_str(string pool, string var, string val) +{ + return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool + + string("\",\"var\": \"") + var + string("\",\"val\": \"") + + val + string("\"}"); +} + +string set_pool_str(string pool, string var, int val) +{ + return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool + + string("\",\"var\": \"") + var + string("\",\"val\": ") + + stringify(val) + string("}"); +} + +TEST(LibRadosMisc, HitSetRead) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx)); + + // FIXME: detect num pgs + int num_pg = 8; + + // enable hitset tracking for this pool + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_count", 8), + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_period", 60), + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_type", "bloom"), + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_fpp", ".01"), + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_map(); + + // do a bunch of reads + for (int i=0; i<1000; ++i) { + bufferlist bl; + ASSERT_EQ(-ENOENT, ioctx.read(stringify(i), bl, 1, 0)); + } + + // get HitSets + std::map<int,HitSet> hitsets; + for (int i=0; i<num_pg; ++i) { + list< pair<time_t,time_t> > ls; + AioCompletion *c = librados::Rados::aio_create_completion(); + ASSERT_EQ(0, ioctx.hit_set_list(i, c, &ls)); + c->wait_for_complete(); + c->release(); + std::cout << "pg " << i << " ls " << ls << std::endl; + ASSERT_FALSE(ls.empty()); + + // get the latest + c = librados::Rados::aio_create_completion(); + bufferlist bl; + ASSERT_EQ(0, ioctx.hit_set_get(i, c, ls.back().first, &bl)); + c->wait_for_complete(); + c->release(); + + //std::cout << "bl len is " << bl.length() << "\n"; + //bl.hexdump(std::cout); + //std::cout << std::endl; + + bufferlist::iterator p = bl.begin(); + ::decode(hitsets[i], p); + } + + for (int i=0; i<1000; ++i) { + string n = stringify(i); + uint32_t hash = ioctx.get_object_hash_position(n); + hobject_t oid(sobject_t(n, CEPH_NOSNAP), "", hash, -1, ""); + int pg = ioctx.get_object_pg_hash_position(n); + std::cout << "checking for " << oid << ", should be in pg " << pg << std::endl; + bool found = false; + for (int p=0; p<num_pg; ++p) { + if (hitsets[p].contains(oid)) { + found = true; + break; + } + } + ASSERT_TRUE(found); } ioctx.close(); |