From 990d0cf91fbc9f612421783a40e5c93fe2b0bdb3 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Sat, 29 Jun 2013 00:25:09 -0700 Subject: rgw: rework replica log + RESTful api simplifying. not compiling yet Signed-off-by: Yehuda Sadeh --- src/cls/replica_log/cls_replica_log_types.cc | 43 ++++++--- src/cls/replica_log/cls_replica_log_types.h | 5 ++ src/rgw/rgw_replica_log.cc | 24 +++-- src/rgw/rgw_replica_log.h | 24 +++-- src/rgw/rgw_rest_replica_log.cc | 126 +++++---------------------- src/rgw/rgw_rest_replica_log.h | 8 +- 6 files changed, 94 insertions(+), 136 deletions(-) diff --git a/src/cls/replica_log/cls_replica_log_types.cc b/src/cls/replica_log/cls_replica_log_types.cc index ba89fd42ae3..ac547213260 100644 --- a/src/cls/replica_log/cls_replica_log_types.cc +++ b/src/cls/replica_log/cls_replica_log_types.cc @@ -15,12 +15,17 @@ void cls_replica_log_item_marker::dump(Formatter *f) const { - f->dump_string("item name", item_name); - f->dump_stream("item timestamp") << item_timestamp; + f->dump_string("name", item_name); + f->dump_stream("timestamp") << item_timestamp; } -void cls_replica_log_item_marker:: -generate_test_instances(std::list& ls) +void cls_replica_log_item_marker::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("name", item_name, obj); + JSONDecoder::decode_json("timestamp", item_timestamp, obj); +} + +void generate_test_instances(std::list& ls) { ls.push_back(new cls_replica_log_item_marker); ls.back()->item_name = "test_item_1"; @@ -32,12 +37,20 @@ generate_test_instances(std::list& ls) void cls_replica_log_progress_marker::dump(Formatter *f) const { - f->dump_string("entity", entity_id); - f->dump_string("position_marker", position_marker); - position_time.gmtime(f->dump_stream("position_time")); + encode_json("entity", entity_id, f); + encode_json("position_marker", position_marker, f); + encode_json("position_time", position_time, f); encode_json("items_in_progress", items, f); } +void cls_replica_log_progress_marker::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("entity", entity_id, obj); + JSONDecoder::decode_json("position_marker", position_marker, obj); + JSONDecoder::decode_json("position_time", position_time, obj); + JSONDecoder::decode_json("items_in_progress", items, obj); +} + void cls_replica_log_progress_marker:: generate_test_instances(std::list& ls) { @@ -57,14 +70,24 @@ generate_test_instances(std::list& ls) void cls_replica_log_bound::dump(Formatter *f) const { - f->dump_string("position_marker", position_marker); - position_time.gmtime(f->dump_stream("position_time")); - f->dump_string("marker_exists", marker_exists ? "yes" : "no"); + encode_json("position_marker", position_marker, f); + encode_json("position_time", position_time, f); + encode_json("marker_exists", marker_exists, f); if (marker_exists) { encode_json("marker", marker, f); //progress marker } } +void cls_replica_log_bound::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("position_marker", position_marker, obj); + JSONDecoder::decode_json("position_time", position_time, obj); + JSONDecoder::decode_json("marker_exists", marker_exists, obj); + if (marker_exists) { + JSONDecoder::decode_json("marker", marker, obj); //progress marker + } +} + void cls_replica_log_bound:: generate_test_instances(std::list& ls) { diff --git a/src/cls/replica_log/cls_replica_log_types.h b/src/cls/replica_log/cls_replica_log_types.h index 39dc22e4456..29f29d6604d 100644 --- a/src/cls/replica_log/cls_replica_log_types.h +++ b/src/cls/replica_log/cls_replica_log_types.h @@ -17,6 +17,8 @@ #include "include/types.h" #include +class JSONObj; + struct cls_replica_log_item_marker { string item_name; // the name of the item we're marking utime_t item_timestamp; // the time stamp at which the item was outdated @@ -40,6 +42,7 @@ struct cls_replica_log_item_marker { } void dump(Formatter *f) const; + void decode_json(JSONObj *obj);; static void generate_test_instances(std::list& ls); }; WRITE_CLASS_ENCODER(cls_replica_log_item_marker) @@ -82,6 +85,7 @@ struct cls_replica_log_progress_marker { } void dump(Formatter *f) const; + void decode_json(JSONObj *obj);; static void generate_test_instances(std::list& ls); }; WRITE_CLASS_ENCODER(cls_replica_log_progress_marker) @@ -182,6 +186,7 @@ public: } void dump(Formatter *f) const; + void decode_json(JSONObj *obj);; static void generate_test_instances(std::list& ls); }; WRITE_CLASS_ENCODER(cls_replica_log_bound); diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc index 6d12b1e9ea1..1f74a8a1df3 100644 --- a/src/rgw/rgw_replica_log.cc +++ b/src/rgw/rgw_replica_log.cc @@ -8,10 +8,26 @@ * Copyright 2013 Inktank */ +#include "common/ceph_json.h" + #include "rgw_replica_log.h" #include "cls/replica_log/cls_replica_log_client.h" #include "rgw_rados.h" + +void RGWReplicaBounds::dump(Formatter *f) const +{ + encode_json("marker", marker, f); + encode_json("oldest_time", oldest_time, f); + encode_json("markers", markers, f); +} + +void RGWReplicaBounds::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("oldest_time", oldest_time, obj); + JSONDecoder::decode_json("markers", markers, obj); +}; + RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : cct(_store->cct), store(_store) {} @@ -19,8 +35,7 @@ int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool) { int r = store->rados->ioctx_create(pool.c_str(), ctx); if (r < 0) { - lderr(cct) << "ERROR: could not open rados pool " - << pool << dendl; + lderr(cct) << "ERROR: could not open rados pool " << pool << dendl; } return r; } @@ -60,8 +75,7 @@ int RGWReplicaLogger::delete_bound(const string& oid, const string& pool, } int RGWReplicaLogger::get_bounds(const string& oid, const string& pool, - string& marker, utime_t& oldest_time, - list& markers) + RGWReplicaBounds& bounds) { librados::IoCtx ioctx; int r = open_ioctx(ioctx, pool); @@ -69,7 +83,7 @@ int RGWReplicaLogger::get_bounds(const string& oid, const string& pool, return r; } - return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers); + return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers); } void RGWReplicaLogger::get_bound_info( diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h index 131a7efa98a..c82327433a1 100644 --- a/src/rgw/rgw_replica_log.h +++ b/src/rgw/rgw_replica_log.h @@ -25,6 +25,16 @@ using namespace std; #define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog." #define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog." + +struct RGWReplicaBounds { + string marker; + utime_t oldest_time; + list markers; + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; + class RGWReplicaLogger { protected: CephContext *cct; @@ -40,8 +50,7 @@ protected: int delete_bound(const string& oid, const string& pool, const string& daemon_id); int get_bounds(const string& oid, const string& pool, - string& marker, utime_t& oldest_time, - list& markers); + RGWReplicaBounds& bounds); public: static void get_bound_info(const cls_replica_log_progress_marker& progress, @@ -80,12 +89,10 @@ public: return RGWReplicaLogger::delete_bound(oid, pool, daemon_id); } - int get_bounds(int shard, string& marker, utime_t& oldest_time, - list& markers) { + int get_bounds(int shard, RGWReplicaBounds& bounds) { string oid; get_shard_oid(shard, oid); - return RGWReplicaLogger::get_bounds(oid, pool, - marker, oldest_time, markers); + return RGWReplicaLogger::get_bounds(oid, pool, bounds); } }; @@ -104,10 +111,9 @@ public: return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool, daemon_id); } - int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time, - list& markers) { + int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) { return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool, - marker, oldest_time, markers); + bounds); } }; diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc index fc1ef9c52cb..6b279c4e601 100644 --- a/src/rgw/rgw_rest_replica_log.cc +++ b/src/rgw/rgw_rest_replica_log.cc @@ -37,46 +37,9 @@ static int parse_to_utime(string& in, utime_t& out) { return 0; } -static int parse_input_list(const char *data, int data_len, - const char *el_name, list >& out) { - JSONParser parser; - - if (!parser.parse(data, data_len)) { - return -EINVAL; - } - if (!parser.is_array()) { - dout(5) << "Should have been an array" << dendl; - return -EINVAL; - } - - vector l; - - l = parser.get_array_elements(); - for (vector::iterator it = l.begin(); - it != l.end(); it++) { - JSONParser el_parser; - - if (!el_parser.parse((*it).c_str(), (*it).length())) { - dout(5) << "Error parsing an array element" << dendl; - return -EINVAL; - } - - string name, time; - - JSONDecoder::decode_json(el_name, name, (JSONObj *)&el_parser); - JSONDecoder::decode_json("time", time, (JSONObj *)&el_parser); - - utime_t ut; - if (parse_to_utime(time, ut) < 0) { - return -EINVAL; - } - out.push_back(make_pair(name, ut)); - } - - return 0; -} -static int get_input_list(req_state *s, const char *element_name, list >& out) { +template +static int get_input(req_state *s, T& out) { int rv, data_len; char *data; @@ -84,47 +47,20 @@ static int get_input_list(req_state *s, const char *element_name, listopen_object_section(name); - f->dump_string(el_name, val.item_name); - encode_json("time", val.item_timestamp, f); - f->close_section(); -} - -static void progress_encode_json(const char *name, - const char *sub_array_name, - const char *sub_array_el_name, - cls_replica_log_progress_marker &val, - Formatter *f) { - f->open_object_section(name); - f->dump_string("daemon_id", val.entity_id); - f->dump_string("marker", val.position_marker); - encode_json("time", val.position_time, f); - - f->open_array_section(sub_array_name); - for (list::iterator it = val.items.begin(); - it != val.items.end(); it++) { - cls_replica_log_item_marker& entry = (*it); - - item_encode_json(sub_array_name, sub_array_el_name, entry, f); - } - f->close_section(); - f->close_section(); -} - void RGWOp_OBJLog_SetBounds::execute() { string id_str = s->info.args.get("id"), marker = s->info.args.get("marker"), @@ -159,13 +95,13 @@ void RGWOp_OBJLog_SetBounds::execute() { string pool; RGWReplicaObjectLogger rl(store, pool, prefix); bufferlist bl; - list > entries; + list markers; - if ((http_ret = get_input_list(s, "bucket", entries)) < 0) { + if ((http_ret = get_input(s, markers)) < 0) { return; } - http_ret = rl.update_bound(shard, daemon_id, marker, ut, &entries); + http_ret = rl.update_bound(shard, daemon_id, marker, ut, &markers); } void RGWOp_OBJLog_GetBounds::execute() { @@ -189,7 +125,7 @@ void RGWOp_OBJLog_GetBounds::execute() { string pool; RGWReplicaObjectLogger rl(store, pool, prefix); - http_ret = rl.get_bounds(shard, lowest_bound, oldest_time, entries); + http_ret = rl.get_bounds(shard, bounds); } void RGWOp_OBJLog_GetBounds::send_response() { @@ -200,18 +136,7 @@ void RGWOp_OBJLog_GetBounds::send_response() { if (http_ret < 0) return; - s->formatter->open_object_section("container"); - s->formatter->open_array_section("items"); - for (list::iterator it = entries.begin(); - it != entries.end(); it++) { - cls_replica_log_progress_marker entry = (*it); - progress_encode_json("entry", "buckets", "bucket", entry, s->formatter); - flusher.flush(); - } - s->formatter->close_section(); - s->formatter->dump_string("lowest_bound", lowest_bound); - encode_json("oldest_time", oldest_time, s->formatter); - s->formatter->close_section(); + encode_json("bounds", bounds, s->formatter); flusher.flush(); } @@ -283,13 +208,13 @@ void RGWOp_BILog_SetBounds::execute() { RGWReplicaBucketLogger rl(store); bufferlist bl; - list > entries; + list markers; - if ((http_ret = get_input_list(s, "object", entries)) < 0) { + if ((http_ret = get_input(s, markers)) < 0) { return; } - http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &entries); + http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers); } void RGWOp_BILog_GetBounds::execute() { @@ -306,7 +231,7 @@ void RGWOp_BILog_GetBounds::execute() { return; RGWReplicaBucketLogger rl(store); - http_ret = rl.get_bounds(bucket, lowest_bound, oldest_time, entries); + http_ret = rl.get_bounds(bucket, bounds); } void RGWOp_BILog_GetBounds::send_response() { @@ -317,18 +242,7 @@ void RGWOp_BILog_GetBounds::send_response() { if (http_ret < 0) return; - s->formatter->open_object_section("container"); - s->formatter->open_array_section("entries"); - for (list::iterator it = entries.begin(); - it != entries.end(); it++) { - cls_replica_log_progress_marker entry = (*it); - progress_encode_json("entry", "objects", "object", entry, s->formatter); - flusher.flush(); - } - s->formatter->close_section(); - s->formatter->dump_string("lowest_bound", lowest_bound); - encode_json("oldest_time", oldest_time, s->formatter); - s->formatter->close_section(); + encode_json("bounds", bounds, s->formatter); flusher.flush(); } diff --git a/src/rgw/rgw_rest_replica_log.h b/src/rgw/rgw_rest_replica_log.h index dacabbe4f7c..91e3d614062 100644 --- a/src/rgw/rgw_rest_replica_log.h +++ b/src/rgw/rgw_rest_replica_log.h @@ -17,9 +17,7 @@ class RGWOp_OBJLog_GetBounds : public RGWRESTOp { string prefix; string obj_type; - utime_t oldest_time; - string lowest_bound; - list entries; + RGWReplicaBounds bounds; public: RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type) @@ -83,9 +81,7 @@ public: }; class RGWOp_BILog_GetBounds : public RGWRESTOp { - utime_t oldest_time; - string lowest_bound; - list entries; + RGWReplicaBounds bounds; public: RGWOp_BILog_GetBounds() {} ~RGWOp_BILog_GetBounds() {} -- cgit v1.2.1