diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-18 20:07:20 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-18 20:07:20 -0700 |
commit | 13e030216e0d0a7143fd527ac51dd66059cb4a0f (patch) | |
tree | 0ae77bb82ae70f70bd62bb20006ca34694e022ff | |
parent | 0b932bfdd99b58042e1aefe47569846ba01cb5a6 (diff) | |
download | ceph-13e030216e0d0a7143fd527ac51dd66059cb4a0f.tar.gz |
rgw: internal api for statelog objclass
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/Makefile.am | 6 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 143 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 52 |
3 files changed, 198 insertions, 3 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 43a588b7841..9c261645002 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -405,7 +405,7 @@ librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS} noinst_LIBRARIES += librgw.a my_radosgw_ldadd = \ - libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \ + libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a libcls_statelog_client.a \ libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \ $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) @@ -956,14 +956,14 @@ bin_DEBUGPROGRAMS += ceph_test_cors unittest_rgw_meta_SOURCES = test/test_rgw_admin_meta.cc unittest_rgw_meta_LDFLAGS = libglobal.la unittest_rgw_meta_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \ - libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a + libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a unittest_rgw_meta_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} check_PROGRAMS += unittest_rgw_meta unittest_rgw_log_SOURCES = test/test_rgw_admin_log.cc unittest_rgw_log_LDFLAGS = libglobal.la unittest_rgw_log_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \ - libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a + libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a unittest_rgw_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} check_PROGRAMS += unittest_rgw_log endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index e65f8831c99..d90b35401c1 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -20,6 +20,7 @@ #include "cls/refcount/cls_refcount_client.h" #include "cls/version/cls_version_client.h" #include "cls/log/cls_log_client.h" +#include "cls/statelog/cls_statelog_client.h" #include "cls/lock/cls_lock_client.h" #include "rgw_tools.h" @@ -67,6 +68,8 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; #define RGW_DEFAULT_ZONE_ROOT_POOL ".rgw.root" #define RGW_DEFAULT_REGION_ROOT_POOL ".rgw.root" +#define RGW_STATELOG_OBJ_PREFIX "statelog." + #define dout_subsys ceph_subsys_rgw @@ -5175,6 +5178,146 @@ int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid, return ret; } + +void RGWStateLog::oid_str(int shard, string& oid) { + oid = RGW_STATELOG_OBJ_PREFIX + module_name + "."; + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard); + oid += buf; +} + +int RGWStateLog::get_shard_num(const string& object) { + uint32_t val = ceph_str_hash_linux(object.c_str(), object.length()); + return val & num_shards; +} + +string RGWStateLog::get_oid(const string& object) { + int shard = get_shard_num(object); + string oid; + oid_str(shard, oid); + return oid; +} + +int RGWStateLog::open_ioctx(librados::IoCtx& ioctx) { + string pool_name; + store->get_log_pool_name(pool_name); + int r = store->rados->ioctx_create(pool_name.c_str(), ioctx); + if (r < 0) { + lderr(store->ctx()) << "ERROR: could not open rados pool" << dendl; + return r; + } + return 0; +} + +int RGWStateLog::store_entry(const string& client_id, const string& op_id, const string& object, + uint32_t state, bufferlist *bl, uint32_t *check_state) +{ + if (client_id.empty() || + op_id.empty() || + object.empty()) { + ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl; + } + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx); + if (r < 0) + return r; + + string oid = get_oid(object); + + librados::ObjectWriteOperation op; + if (check_state) { + cls_statelog_check_state(op, client_id, op_id, object, *check_state); + } + utime_t ts = ceph_clock_now(store->ctx()); + bufferlist nobl; + cls_statelog_add(op, client_id, op_id, object, ts, state, (bl ? *bl : nobl)); + r = ioctx.operate(oid, &op); + if (r < 0) { + return r; + } + + return 0; +} + +void RGWStateLog::init_list_entries(const string& client_id, const string& op_id, const string& object, + void **handle) +{ + list_state *state = new list_state; + state->client_id = client_id; + state->op_id = op_id; + state->object = object; + if (object.empty()) { + state->cur_shard = 0; + state->max_shard = num_shards - 1; + } else { + state->cur_shard = state->max_shard = get_shard_num(object); + } + *handle = (void *)state; +} + +int RGWStateLog::list_entries(void *handle, int max_entries, + list<cls_statelog_entry>& entries) +{ + list_state *state = (list_state *)handle; + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx); + if (r < 0) + return r; + + for (; state->cur_shard <= state->max_shard && max_entries > 0; ++state->cur_shard) { + string oid; + oid_str(state->cur_shard, oid); + + librados::ObjectReadOperation op; + list<cls_statelog_entry> ents; + bool truncated; + cls_statelog_list(op, state->client_id, state->op_id, state->object, state->marker, + max_entries, ents, &state->marker, &truncated); + bufferlist ibl; + r = ioctx.operate(oid, &op, &ibl); + if (r == -ENOENT) { + truncated = false; + r = 0; + } + if (r < 0) { + ldout(store->ctx(), 0) << "cls_statelog_list returned " << r << dendl; + return r; + } + + if (!truncated) { + state->marker.clear(); + state->cur_shard++; + } + + max_entries -= ents.size(); + + entries.splice(entries.end(), ents); + } + + return 0; +} + +void RGWStateLog::finish_list_entries(void *handle) +{ + list_state *state = (list_state *)handle; + delete state; +} + +void RGWStateLog::dump_entry(const cls_statelog_entry& entry, Formatter *f) +{ + f->open_object_section("statelog_entry"); + f->dump_string("client_id", entry.client_id); + f->dump_string("op_id", entry.op_id); + f->dump_string("object", entry.object); + entry.timestamp.gmtime(f->dump_stream("timestamp")); + if (!dump_entry_internal(entry, f)) { + f->dump_int("state", entry.state); + } + f->close_section(); +} + uint64_t RGWRados::instance_id() { return rados->get_instance_id(); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index c2ec16f42c9..fcfb64e49a8 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -8,6 +8,7 @@ #include "cls/rgw/cls_rgw_types.h" #include "cls/version/cls_version_types.h" #include "cls/log/cls_log_types.h" +#include "cls/statelog/cls_statelog_types.h" #include "rgw_log.h" #include "rgw_metadata.h" #include "rgw_rest_conn.h" @@ -563,9 +564,55 @@ WRITE_CLASS_ENCODER(RGWRegionMap); class RGWDataChangesLog; +class RGWStateLog { + RGWRados *store; + int num_shards; + string module_name; + + void oid_str(int shard, string& oid); + int get_shard_num(const string& object); + string get_oid(const string& object); + int open_ioctx(librados::IoCtx& ioctx); + + struct list_state { + int cur_shard; + int max_shard; + string marker; + string client_id; + string op_id; + string object; + + list_state() : cur_shard(0), max_shard(0) {} + }; + +protected: + virtual int dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) { + return false; + } + +public: + RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) : + store(_store), num_shards(_num_shards), module_name(_module_name) {} + virtual ~RGWStateLog() {} + + int store_entry(const string& client_id, const string& op_id, const string& object, + uint32_t state, bufferlist *bl, uint32_t *check_state); + + void init_list_entries(const string& client_id, const string& op_id, const string& object, + void **handle); + + int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries); + + void finish_list_entries(void *handle); + + virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f); + +}; + class RGWRados { friend class RGWGC; + friend class RGWStateLog; /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); @@ -1154,6 +1201,11 @@ public: return s; } + + void get_log_pool_name(string& name) { + name = zone.log_pool.name; + } + private: int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); |