summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-11 16:07:55 -0700
committerSage Weil <sage@inktank.com>2013-10-22 13:45:39 -0700
commit5a2142ee433c8a1dcda857863be78da6eb96a56a (patch)
treeeab952b91dc1239c046c095705fe081d42f08f49
parent057bdb92a065abcc9d3b51a3aea4c24246621a5b (diff)
downloadceph-5a2142ee433c8a1dcda857863be78da6eb96a56a.tar.gz
librados, osd: list and get HitSets via librados
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/ceph_strings.cc2
-rw-r--r--src/include/rados.h5
-rw-r--r--src/include/rados/librados.hpp22
-rw-r--r--src/librados/IoCtxImpl.cc31
-rw-r--r--src/librados/IoCtxImpl.h5
-rw-r--r--src/librados/librados.cc16
-rw-r--r--src/osd/ReplicatedPG.cc50
-rw-r--r--src/osd/osd_types.cc5
-rw-r--r--src/osdc/Objecter.h79
-rw-r--r--src/test/Makefile.am3
-rw-r--r--src/test/librados/tier.cc113
11 files changed, 322 insertions, 9 deletions
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc
index 221fb059740..2749fbb0a3b 100644
--- a/src/common/ceph_strings.cc
+++ b/src/common/ceph_strings.cc
@@ -86,6 +86,8 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_PGLS: return "pgls";
case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
+ case CEPH_OSD_OP_PG_HITSET_LS: return "pg-hitset-ls";
+ case CEPH_OSD_OP_PG_HITSET_GET: return "pg-hitset-get";
case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
diff --git a/src/include/rados.h b/src/include/rados.h
index e7a32b5afef..70c04d4167d 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -265,6 +265,8 @@ enum {
/** pg **/
CEPH_OSD_OP_PGLS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1,
CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2,
+ CEPH_OSD_OP_PG_HITSET_LS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 3,
+ CEPH_OSD_OP_PG_HITSET_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 4,
};
static inline int ceph_osd_op_type_lock(int op)
@@ -417,6 +419,9 @@ struct ceph_osd_op {
__le64 snapid;
__le64 src_version;
} __attribute__ ((packed)) copy_from;
+ struct {
+ struct ceph_timespec stamp;
+ } __attribute__ ((packed)) hit_set_get;
};
__le32 payload_len;
} __attribute__ ((packed));
diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp
index 58a64996705..a6449c58d9b 100644
--- a/src/include/rados/librados.hpp
+++ b/src/include/rados/librados.hpp
@@ -422,7 +422,6 @@ namespace librados
* @param prval [out] place error code in prval upon completion
*/
void is_dirty(bool *isdirty, int *prval);
-
};
/* IoCtx : This is a context in which we can perform I/O.
@@ -598,6 +597,27 @@ namespace librados
/// Iterator indicating the end of a pool
const ObjectIterator& objects_end() const;
+ /**
+ * List available hit set objects
+ *
+ * @param uint32_t [in] hash position to query
+ * @param c [in] completion
+ * @param pls [out] list of available intervals
+ */
+ int hit_set_list(uint32_t hash, AioCompletion *c,
+ std::list< std::pair<time_t, time_t> > *pls);
+
+ /**
+ * Retrieve hit set for a given hash, and time
+ *
+ * @param uint32_t [in] hash position
+ * @param c [in] completion
+ * @param stamp [in] time interval that falls within the hit set's interval
+ * @param pbl [out] buffer to store the result in
+ */
+ int hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp,
+ bufferlist *pbl);
+
uint64_t get_last_version();
int aio_read(const std::string& oid, AioCompletion *c,
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc
index 49a61173cc1..b31bf86f758 100644
--- a/src/librados/IoCtxImpl.cc
+++ b/src/librados/IoCtxImpl.cc
@@ -808,6 +808,37 @@ int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
return 0;
}
+int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
+ std::list< std::pair<time_t, time_t> > *pls)
+{
+ Context *onack = new C_aio_Ack(c);
+ c->is_read = true;
+ c->io = this;
+
+ Mutex::Locker l(*lock);
+ ::ObjectOperation rd;
+ rd.hit_set_ls(pls, NULL);
+ object_locator_t oloc(poolid);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ return 0;
+}
+
+int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
+ time_t stamp,
+ bufferlist *pbl)
+{
+ Context *onack = new C_aio_Ack(c);
+ c->is_read = true;
+ c->io = this;
+
+ Mutex::Locker l(*lock);
+ ::ObjectOperation rd;
+ rd.hit_set_get(utime_t(stamp, 0), pbl, 0);
+ object_locator_t oloc(poolid);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ return 0;
+}
+
int librados::IoCtxImpl::remove(const object_t& oid)
{
::ObjectOperation op;
diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h
index 7d7cf3722c7..d748db43b95 100644
--- a/src/librados/IoCtxImpl.h
+++ b/src/librados/IoCtxImpl.h
@@ -187,6 +187,11 @@ struct librados::IoCtxImpl {
int pool_change_auid(unsigned long long auid);
int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c);
+ int hit_set_list(uint32_t hash, AioCompletionImpl *c,
+ std::list< std::pair<time_t, time_t> > *pls);
+ int hit_set_get(uint32_t hash, AioCompletionImpl *c, time_t stamp,
+ bufferlist *pbl);
+
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(const object_t& oid, uint64_t cookie);
diff --git a/src/librados/librados.cc b/src/librados/librados.cc
index a9f317f06cc..002e58f9f4c 100644
--- a/src/librados/librados.cc
+++ b/src/librados/librados.cc
@@ -275,8 +275,6 @@ void librados::ObjectReadOperation::is_dirty(bool *is_dirty, int *prval)
o->is_dirty(is_dirty, prval);
}
-
-
int librados::IoCtx::omap_get_vals(const std::string& oid,
const std::string& start_after,
const std::string& filter_prefix,
@@ -1158,6 +1156,20 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const
return ObjectIterator::__EndObjectIterator;
}
+int librados::IoCtx::hit_set_list(uint32_t hash, AioCompletion *c,
+ std::list< std::pair<time_t, time_t> > *pls)
+{
+ return io_ctx_impl->hit_set_list(hash, c->pc, pls);
+}
+
+int librados::IoCtx::hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp,
+ bufferlist *pbl)
+{
+ return io_ctx_impl->hit_set_get(hash, c->pc, stamp, pbl);
+}
+
+
+
uint64_t librados::IoCtx::get_last_version()
{
return io_ctx_impl->last_version();
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index ae93db3035e..3995ae0f320 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -731,6 +731,56 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
}
break;
+ case CEPH_OSD_OP_PG_HITSET_LS:
+ {
+ list< pair<utime_t,utime_t> > ls;
+ for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+ p != info.hit_set.history.end();
+ ++p)
+ ls.push_back(make_pair(p->begin, p->end));
+ if (info.hit_set.current_info.begin)
+ ls.push_back(make_pair(info.hit_set.current_info.begin, utime_t()));
+ else if (hit_set)
+ ls.push_back(make_pair(hit_set_start_stamp, utime_t()));
+ ::encode(ls, osd_op.outdata);
+ }
+ break;
+
+ case CEPH_OSD_OP_PG_HITSET_GET:
+ {
+ utime_t stamp(osd_op.op.hit_set_get.stamp);
+ if ((info.hit_set.current_info.begin &&
+ stamp >= info.hit_set.current_info.begin) ||
+ stamp >= hit_set_start_stamp) {
+ // read the current in-memory HitSet, not the version we've
+ // checkpointed.
+ if (!hit_set) {
+ result= -ENOENT;
+ break;
+ }
+ ::encode(*hit_set, osd_op.outdata);
+ result = osd_op.outdata.length();
+ } else {
+ // read an archived HitSet.
+ hobject_t oid;
+ for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+ p != info.hit_set.history.end();
+ ++p) {
+ if (stamp >= p->begin && stamp <= p->end) {
+ oid = get_hit_set_archive_object(p->begin, p->end);
+ break;
+ }
+ }
+ if (oid == hobject_t()) {
+ result = -ENOENT;
+ break;
+ }
+ result = osd->store->read(coll, oid, 0, 0, osd_op.outdata);
+ }
+ }
+ break;
+
+
default:
result = -EINVAL;
break;
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 65f6f27e29f..b441d3fc8c7 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -3743,6 +3743,11 @@ ostream& operator<<(ostream& out, const OSDOp& op)
case CEPH_OSD_OP_PGLS_FILTER:
out << " start_epoch " << op.op.pgls.start_epoch;
break;
+ case CEPH_OSD_OP_PG_HITSET_LS:
+ break;
+ case CEPH_OSD_OP_PG_HITSET_GET:
+ out << " " << utime_t(op.op.hit_set_get.stamp);
+ break;
}
} else if (ceph_osd_op_type_multi(op.op.op)) {
switch (op.op.op) {
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index b24349499a1..40331f2afb4 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -676,6 +676,85 @@ struct ObjectOperation {
out_handler[p] = h;
}
+ struct C_ObjectOperation_hit_set_ls : public Context {
+ bufferlist bl;
+ std::list< std::pair<time_t, time_t> > *ptls;
+ std::list< std::pair<utime_t, utime_t> > *putls;
+ int *prval;
+ C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
+ std::list< std::pair<utime_t, utime_t> > *ut,
+ int *r)
+ : ptls(t), putls(ut), prval(r) {}
+ void finish(int r) {
+ if (r < 0)
+ return;
+ try {
+ bufferlist::iterator p = bl.begin();
+ std::list< std::pair<utime_t, utime_t> > ls;
+ ::decode(ls, p);
+ if (ptls) {
+ ptls->clear();
+ for (list< pair<utime_t,utime_t> >::iterator p = ls.begin(); p != ls.end(); ++p)
+ // round initial timestamp up to the next full second to keep this a valid interval.
+ ptls->push_back(make_pair(p->first.usec() ? p->first.sec() + 1 : p->first.sec(), p->second.sec()));
+ }
+ if (putls)
+ putls->swap(ls);
+ } catch (buffer::error& e) {
+ r = -EIO;
+ }
+ if (prval)
+ *prval = r;
+ }
+ };
+
+ /**
+ * list available HitSets.
+ *
+ * We will get back a list of time intervals. Note that the most recent range may have
+ * an empty end timestamp if it is still accumulating.
+ *
+ * @param pls [out] list of time intervals
+ * @param prval [out] return value
+ */
+ void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
+ out_bl[p] = &h->bl;
+ out_handler[p] = h;
+ }
+ void hit_set_ls(std::list< std::pair<utime_t, utime_t> > *pls, int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
+ out_bl[p] = &h->bl;
+ out_handler[p] = h;
+ }
+
+ /**
+ * get HitSet
+ *
+ * Return an encoded HitSet that includes the provided time
+ * interval.
+ *
+ * @param stamp [in] timestamp
+ * @param pbl [out] target buffer for encoded HitSet
+ * @param prval [out] return value
+ */
+ void hit_set_get(utime_t stamp, bufferlist *pbl, int *prval) {
+ OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
+ op.op.hit_set_get.stamp.tv_sec = stamp.sec();
+ op.op.hit_set_get.stamp.tv_nsec = stamp.nsec();
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ out_bl[p] = pbl;
+ }
+
void omap_get_header(bufferlist *bl, int *prval) {
add_op(CEPH_OSD_OP_OMAPGETHEADER);
unsigned p = ops.size() - 1;
diff --git a/src/test/Makefile.am b/src/test/Makefile.am
index 3b4dc04b3e0..9d033467b01 100644
--- a/src/test/Makefile.am
+++ b/src/test/Makefile.am
@@ -769,7 +769,8 @@ bin_DEBUGPROGRAMS += ceph_test_rados_api_misc
ceph_test_rados_api_tier_SOURCES = \
test/librados/tier.cc \
- test/librados/test.cc
+ test/librados/test.cc \
+ osd/HitSet.cc
ceph_test_rados_api_tier_LDADD = $(LIBRADOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
ceph_test_rados_api_tier_CXXFLAGS = $(UNITTEST_CXXFLAGS)
bin_DEBUGPROGRAMS += ceph_test_rados_api_tier
diff --git a/src/test/librados/tier.cc b/src/test/librados/tier.cc
index 3485f2f91ce..24c08e21c9e 100644
--- a/src/test/librados/tier.cc
+++ b/src/test/librados/tier.cc
@@ -8,12 +8,15 @@
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "include/stringify.h"
+#include "include/types.h"
#include "global/global_context.h"
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "common/common_init.h"
#include "test/librados/test.h"
+#include "osd/HitSet.h"
+
#include <errno.h>
#include <map>
#include <sstream>
@@ -79,7 +82,7 @@ TEST(LibRadosMisc, Dirty) {
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
}
-TEST(LibRadosMisc, HitSet) {
+TEST(LibRadosMisc, HitSetNone) {
Rados cluster;
std::string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
@@ -87,11 +90,111 @@ TEST(LibRadosMisc, HitSet) {
ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
{
- ObjectReadOperation op;
list< pair<time_t,time_t> > ls;
- int rval;
- op.hit_set_ls(123, &ls, &rval);
- ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
+ AioCompletion *c = librados::Rados::aio_create_completion();
+ ASSERT_EQ(0, ioctx.hit_set_list(123, c, &ls));
+ c->wait_for_complete();
+ ASSERT_EQ(0, c->get_return_value());
+ ASSERT_TRUE(ls.empty());
+ c->release();
+ }
+ {
+ bufferlist bl;
+ AioCompletion *c = librados::Rados::aio_create_completion();
+ ASSERT_EQ(0, ioctx.hit_set_get(123, c, 12345, &bl));
+ c->wait_for_complete();
+ ASSERT_EQ(-ENOENT, c->get_return_value());
+ c->release();
+ }
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
+}
+
+string set_pool_str(string pool, string var, string val)
+{
+ return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
+ + string("\",\"var\": \"") + var + string("\",\"val\": \"")
+ + val + string("\"}");
+}
+
+string set_pool_str(string pool, string var, int val)
+{
+ return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
+ + string("\",\"var\": \"") + var + string("\",\"val\": ")
+ + stringify(val) + string("}");
+}
+
+TEST(LibRadosMisc, HitSetRead) {
+ Rados cluster;
+ std::string pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
+ IoCtx ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
+
+ // FIXME: detect num pgs
+ int num_pg = 8;
+
+ // enable hitset tracking for this pool
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_count", 8),
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_period", 60),
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_type", "bloom"),
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_fpp", ".01"),
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_map();
+
+ // do a bunch of reads
+ for (int i=0; i<1000; ++i) {
+ bufferlist bl;
+ ASSERT_EQ(-ENOENT, ioctx.read(stringify(i), bl, 1, 0));
+ }
+
+ // get HitSets
+ std::map<int,HitSet> hitsets;
+ for (int i=0; i<num_pg; ++i) {
+ list< pair<time_t,time_t> > ls;
+ AioCompletion *c = librados::Rados::aio_create_completion();
+ ASSERT_EQ(0, ioctx.hit_set_list(i, c, &ls));
+ c->wait_for_complete();
+ c->release();
+ std::cout << "pg " << i << " ls " << ls << std::endl;
+ ASSERT_FALSE(ls.empty());
+
+ // get the latest
+ c = librados::Rados::aio_create_completion();
+ bufferlist bl;
+ ASSERT_EQ(0, ioctx.hit_set_get(i, c, ls.back().first, &bl));
+ c->wait_for_complete();
+ c->release();
+
+ //std::cout << "bl len is " << bl.length() << "\n";
+ //bl.hexdump(std::cout);
+ //std::cout << std::endl;
+
+ bufferlist::iterator p = bl.begin();
+ ::decode(hitsets[i], p);
+ }
+
+ for (int i=0; i<1000; ++i) {
+ string n = stringify(i);
+ uint32_t hash = ioctx.get_object_hash_position(n);
+ hobject_t oid(sobject_t(n, CEPH_NOSNAP), "", hash, -1, "");
+ int pg = ioctx.get_object_pg_hash_position(n);
+ std::cout << "checking for " << oid << ", should be in pg " << pg << std::endl;
+ bool found = false;
+ for (int p=0; p<num_pg; ++p) {
+ if (hitsets[p].contains(oid)) {
+ found = true;
+ break;
+ }
+ }
+ ASSERT_TRUE(found);
}
ioctx.close();