summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-18 20:07:20 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-18 20:07:20 -0700
commit13e030216e0d0a7143fd527ac51dd66059cb4a0f (patch)
tree0ae77bb82ae70f70bd62bb20006ca34694e022ff
parent0b932bfdd99b58042e1aefe47569846ba01cb5a6 (diff)
downloadceph-13e030216e0d0a7143fd527ac51dd66059cb4a0f.tar.gz
rgw: internal api for statelog objclass
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/Makefile.am6
-rw-r--r--src/rgw/rgw_rados.cc143
-rw-r--r--src/rgw/rgw_rados.h52
3 files changed, 198 insertions, 3 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 43a588b7841..9c261645002 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -405,7 +405,7 @@ librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
noinst_LIBRARIES += librgw.a
my_radosgw_ldadd = \
- libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \
+ libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a libcls_statelog_client.a \
libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
$(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
@@ -956,14 +956,14 @@ bin_DEBUGPROGRAMS += ceph_test_cors
unittest_rgw_meta_SOURCES = test/test_rgw_admin_meta.cc
unittest_rgw_meta_LDFLAGS = libglobal.la
unittest_rgw_meta_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \
- libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a
+ libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a
unittest_rgw_meta_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
check_PROGRAMS += unittest_rgw_meta
unittest_rgw_log_SOURCES = test/test_rgw_admin_log.cc
unittest_rgw_log_LDFLAGS = libglobal.la
unittest_rgw_log_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \
- libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a
+ libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a
unittest_rgw_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
check_PROGRAMS += unittest_rgw_log
endif
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index e65f8831c99..d90b35401c1 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -20,6 +20,7 @@
#include "cls/refcount/cls_refcount_client.h"
#include "cls/version/cls_version_client.h"
#include "cls/log/cls_log_client.h"
+#include "cls/statelog/cls_statelog_client.h"
#include "cls/lock/cls_lock_client.h"
#include "rgw_tools.h"
@@ -67,6 +68,8 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
#define RGW_DEFAULT_ZONE_ROOT_POOL ".rgw.root"
#define RGW_DEFAULT_REGION_ROOT_POOL ".rgw.root"
+#define RGW_STATELOG_OBJ_PREFIX "statelog."
+
#define dout_subsys ceph_subsys_rgw
@@ -5175,6 +5178,146 @@ int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
return ret;
}
+
+void RGWStateLog::oid_str(int shard, string& oid) {
+ oid = RGW_STATELOG_OBJ_PREFIX + module_name + ".";
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", shard);
+ oid += buf;
+}
+
+int RGWStateLog::get_shard_num(const string& object) {
+ uint32_t val = ceph_str_hash_linux(object.c_str(), object.length());
+ return val & num_shards;
+}
+
+string RGWStateLog::get_oid(const string& object) {
+ int shard = get_shard_num(object);
+ string oid;
+ oid_str(shard, oid);
+ return oid;
+}
+
+int RGWStateLog::open_ioctx(librados::IoCtx& ioctx) {
+ string pool_name;
+ store->get_log_pool_name(pool_name);
+ int r = store->rados->ioctx_create(pool_name.c_str(), ioctx);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: could not open rados pool" << dendl;
+ return r;
+ }
+ return 0;
+}
+
+int RGWStateLog::store_entry(const string& client_id, const string& op_id, const string& object,
+ uint32_t state, bufferlist *bl, uint32_t *check_state)
+{
+ if (client_id.empty() ||
+ op_id.empty() ||
+ object.empty()) {
+ ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl;
+ }
+
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx);
+ if (r < 0)
+ return r;
+
+ string oid = get_oid(object);
+
+ librados::ObjectWriteOperation op;
+ if (check_state) {
+ cls_statelog_check_state(op, client_id, op_id, object, *check_state);
+ }
+ utime_t ts = ceph_clock_now(store->ctx());
+ bufferlist nobl;
+ cls_statelog_add(op, client_id, op_id, object, ts, state, (bl ? *bl : nobl));
+ r = ioctx.operate(oid, &op);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+void RGWStateLog::init_list_entries(const string& client_id, const string& op_id, const string& object,
+ void **handle)
+{
+ list_state *state = new list_state;
+ state->client_id = client_id;
+ state->op_id = op_id;
+ state->object = object;
+ if (object.empty()) {
+ state->cur_shard = 0;
+ state->max_shard = num_shards - 1;
+ } else {
+ state->cur_shard = state->max_shard = get_shard_num(object);
+ }
+ *handle = (void *)state;
+}
+
+int RGWStateLog::list_entries(void *handle, int max_entries,
+ list<cls_statelog_entry>& entries)
+{
+ list_state *state = (list_state *)handle;
+
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx);
+ if (r < 0)
+ return r;
+
+ for (; state->cur_shard <= state->max_shard && max_entries > 0; ++state->cur_shard) {
+ string oid;
+ oid_str(state->cur_shard, oid);
+
+ librados::ObjectReadOperation op;
+ list<cls_statelog_entry> ents;
+ bool truncated;
+ cls_statelog_list(op, state->client_id, state->op_id, state->object, state->marker,
+ max_entries, ents, &state->marker, &truncated);
+ bufferlist ibl;
+ r = ioctx.operate(oid, &op, &ibl);
+ if (r == -ENOENT) {
+ truncated = false;
+ r = 0;
+ }
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "cls_statelog_list returned " << r << dendl;
+ return r;
+ }
+
+ if (!truncated) {
+ state->marker.clear();
+ state->cur_shard++;
+ }
+
+ max_entries -= ents.size();
+
+ entries.splice(entries.end(), ents);
+ }
+
+ return 0;
+}
+
+void RGWStateLog::finish_list_entries(void *handle)
+{
+ list_state *state = (list_state *)handle;
+ delete state;
+}
+
+void RGWStateLog::dump_entry(const cls_statelog_entry& entry, Formatter *f)
+{
+ f->open_object_section("statelog_entry");
+ f->dump_string("client_id", entry.client_id);
+ f->dump_string("op_id", entry.op_id);
+ f->dump_string("object", entry.object);
+ entry.timestamp.gmtime(f->dump_stream("timestamp"));
+ if (!dump_entry_internal(entry, f)) {
+ f->dump_int("state", entry.state);
+ }
+ f->close_section();
+}
+
uint64_t RGWRados::instance_id()
{
return rados->get_instance_id();
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index c2ec16f42c9..fcfb64e49a8 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -8,6 +8,7 @@
#include "cls/rgw/cls_rgw_types.h"
#include "cls/version/cls_version_types.h"
#include "cls/log/cls_log_types.h"
+#include "cls/statelog/cls_statelog_types.h"
#include "rgw_log.h"
#include "rgw_metadata.h"
#include "rgw_rest_conn.h"
@@ -563,9 +564,55 @@ WRITE_CLASS_ENCODER(RGWRegionMap);
class RGWDataChangesLog;
+class RGWStateLog {
+ RGWRados *store;
+ int num_shards;
+ string module_name;
+
+ void oid_str(int shard, string& oid);
+ int get_shard_num(const string& object);
+ string get_oid(const string& object);
+ int open_ioctx(librados::IoCtx& ioctx);
+
+ struct list_state {
+ int cur_shard;
+ int max_shard;
+ string marker;
+ string client_id;
+ string op_id;
+ string object;
+
+ list_state() : cur_shard(0), max_shard(0) {}
+ };
+
+protected:
+ virtual int dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) {
+ return false;
+ }
+
+public:
+ RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) :
+ store(_store), num_shards(_num_shards), module_name(_module_name) {}
+ virtual ~RGWStateLog() {}
+
+ int store_entry(const string& client_id, const string& op_id, const string& object,
+ uint32_t state, bufferlist *bl, uint32_t *check_state);
+
+ void init_list_entries(const string& client_id, const string& op_id, const string& object,
+ void **handle);
+
+ int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries);
+
+ void finish_list_entries(void *handle);
+
+ virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f);
+
+};
+
class RGWRados
{
friend class RGWGC;
+ friend class RGWStateLog;
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
@@ -1154,6 +1201,11 @@ public:
return s;
}
+
+ void get_log_pool_name(string& name) {
+ name = zone.log_pool.name;
+ }
+
private:
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);