diff options
author | Babu Shanmugam <anbu@enovance.com> | 2013-06-26 18:56:33 +0530 |
---|---|---|
committer | Babu Shanmugam <anbu@enovance.com> | 2013-06-26 18:56:33 +0530 |
commit | caf94b72d242a8d665785f34afbe4b739bbc6ef0 (patch) | |
tree | 3fa29fdff84c6e7e01294ebfe4864406b8490c92 | |
parent | c103031ec6b259c70e3e33609dbb292f5b04d3fe (diff) | |
download | ceph-caf94b72d242a8d665785f34afbe4b739bbc6ef0.tar.gz |
RESTful API implementation for replica_log
Signed-off-by: Babu Shanmugam <anbu@enovance.com>
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/rgw/rgw_common.h | 4 | ||||
-rw-r--r-- | src/rgw/rgw_main.cc | 3 | ||||
-rw-r--r-- | src/rgw/rgw_rest_replica_log.cc | 449 | ||||
-rw-r--r-- | src/rgw/rgw_rest_replica_log.h | 160 |
5 files changed, 616 insertions, 2 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index dff7489de43..ecc5b5641a9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -424,6 +424,7 @@ radosgw_SOURCES = \ rgw/rgw_rest_metadata.cc \ rgw/rgw_replica_log.cc \ rgw/rgw_rest_log.cc \ + rgw/rgw_rest_replica_log.cc \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ rgw/rgw_swift_auth.cc \ @@ -2143,6 +2144,7 @@ noinst_HEADERS = \ rgw/rgw_tools.h\ rgw/rgw_rest_metadata.h\ rgw/rgw_rest_log.h\ + rgw/rgw_rest_replica_log.h\ rgw/rgw_usage.h\ rgw/rgw_user.h\ rgw/rgw_bucket.h\ diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index bafe10d46db..8f213638f82 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -614,7 +614,7 @@ struct RGWBucketInfo ::encode(owner, bl); ::encode(flags, bl); ::encode(region, bl); - ::encode(creation_time, bl); + ::encode((uint32_t)creation_time, bl); ::encode(placement_rule, bl); ENCODE_FINISH(bl); } @@ -628,7 +628,7 @@ struct RGWBucketInfo if (struct_v >= 5) ::decode(region, bl); if (struct_v >= 6) - ::decode(creation_time, bl); + ::decode((uint32_t&)creation_time, bl); if (struct_v >= 7) ::decode(placement_rule, bl); DECODE_FINISH(bl); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index ba894334444..547cb275a16 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -42,6 +42,8 @@ #include "rgw_rest_bucket.h" #include "rgw_rest_metadata.h" #include "rgw_rest_log.h" +#include "rgw_replica_log.h" +#include "rgw_rest_replica_log.h" #include "rgw_swift_auth.h" #include "rgw_swift.h" #include "rgw_log.h" @@ -506,6 +508,7 @@ int main(int argc, const char **argv) /*Registering resource for /admin/metadata */ admin_resource->register_resource("metadata", new RGWRESTMgr_Metadata); admin_resource->register_resource("log", new RGWRESTMgr_Log); + admin_resource->register_resource("replica_log", new RGWRESTMgr_ReplicaLog); rest.register_resource(g_conf->rgw_admin_entry, admin_resource); } diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc new file mode 100644 index 00000000000..5dcf498ae46 --- /dev/null +++ b/src/rgw/rgw_rest_replica_log.cc @@ -0,0 +1,449 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 eNovance SAS <licensing@enovance.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "common/ceph_json.h" +#include "common/strtol.h" +#include "rgw_rest.h" +#include "rgw_op.h" +#include "rgw_rest_s3.h" +#include "rgw_replica_log.h" +#include "rgw_metadata.h" +#include "rgw_bucket.h" +#include "rgw_rest_replica_log.h" +#include "rgw_client_io.h" +#include "common/errno.h" + +#define dout_subsys ceph_subsys_rgw + +static int get_data(req_state *s, bufferlist& bl) { + size_t cl = 0; + char *data; + int read_len; + + if (s->length) + cl = atoll(s->length); + if (cl) { + data = (char *)malloc(cl + 1); + if (!data) { + return -ENOMEM; + } + int r = s->cio->read(data, cl, &read_len); + if (cl != (size_t)read_len) { + dout(10) << "cio->read incomplete" << dendl; + } + if (r < 0) { + free(data); + return r; + } + bl.append(data, read_len); + } else { + int chunk_size = CEPH_PAGE_SIZE; + const char *enc = s->info.env->get("HTTP_TRANSFER_ENCODING"); + if (!enc || strcmp(enc, "chunked")) { + return -ERR_LENGTH_REQUIRED; + } + data = (char *)malloc(chunk_size); + if (!data) { + return -ENOMEM; + } + do { + int r = s->cio->read(data, chunk_size, &read_len); + if (r < 0) { + free(data); + return r; + } + bl.append(data, read_len); + } while ((read_len == chunk_size)); + } + + free(data); + return 0; +} + +static int parse_to_utime(string& in, utime_t& out) { + struct tm tm; + + if (!parse_iso8601(in.c_str(), &tm)) + return -EINVAL; + + time_t tt = mktime(&tm); + out = utime_t(tt, 0); + return 0; +} + +static int parse_input_list(bufferlist& bl, const char *el_name, list<pair<string, utime_t> >& out) { + JSONParser parser; + + if (!parser.parse(bl.c_str(), bl.length())) { + return -EINVAL; + } + if (!parser.is_array()) { + dout(5) << "Should have been an array" << dendl; + return -EINVAL; + } + + vector<string> l; + + l = parser.get_array_elements(); + for (vector<string>::iterator it = l.begin(); + it != l.end(); it++) { + JSONParser el_parser; + + if (!el_parser.parse((*it).c_str(), (*it).length())) { + dout(5) << "Error parsing an array element" << dendl; + return -EINVAL; + } + + string name, time; + + JSONDecoder::decode_json(el_name, name, (JSONObj *)&el_parser); + JSONDecoder::decode_json("time", time, (JSONObj *)&el_parser); + + utime_t ut; + if (parse_to_utime(time, ut) < 0) { + return -EINVAL; + } + out.push_back(make_pair(name, ut)); + } + + return 0; +} + +static int get_input_list(req_state *s, const char *element_name, list<pair<string, utime_t> >& out) { + bufferlist bl; + int rv; + + if ((rv = get_data(s, bl)) < 0) { + dout(5) << "Error - reading input data - " << rv << dendl; + return rv; + } + + if ((rv = parse_input_list(bl, element_name, out)) < 0) { + dout(5) << "Error parsing input list - " << rv << dendl; + return rv; + } + + return 0; +} + +static void item_encode_json(const char *name, + const char *el_name, + cls_replica_log_item_marker& val, + Formatter *f) { + f->open_object_section(name); + f->dump_string(el_name, val.item_name); + encode_json("time", val.item_timestamp, f); + f->close_section(); +} + +static void progress_encode_json(const char *name, + const char *sub_array_name, + const char *sub_array_el_name, + cls_replica_log_progress_marker &val, + Formatter *f) { + f->open_object_section(name); + f->dump_string("daemon_id", val.entity_id); + f->dump_string("marker", val.position_marker); + encode_json("time", val.position_time, f); + + f->open_array_section(sub_array_name); + for (list<cls_replica_log_item_marker>::iterator it = val.items.begin(); + it != val.items.end(); it++) { + cls_replica_log_item_marker& entry = (*it); + + item_encode_json(sub_array_name, sub_array_el_name, entry, f); + } + f->close_section(); + f->close_section(); +} + +void RGWOp_OBJLog_SetBounds::execute() { + string id_str = s->info.args.get("id"), + marker = s->info.args.get("marker"), + time = s->info.args.get("time"), + daemon_id = s->info.args.get("daemon_id"); + + if (id_str.empty() || + marker.empty() || + time.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + utime_t ut; + + shard = (int)strict_strtol(id_str.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id_str << ", err " << err << dendl; + http_ret = -EINVAL; + return; + } + + if (parse_to_utime(time, ut) < 0) { + http_ret = -EINVAL; + return; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + bufferlist bl; + list<pair<string, utime_t> > entries; + + if ((http_ret = get_input_list(s, "bucket", entries)) < 0) { + return; + } + + http_ret = rl.update_bound(shard, daemon_id, marker, ut, &entries); +} + +void RGWOp_OBJLog_GetBounds::execute() { + string id = s->info.args.get("id"); + + if (id.empty()) { + dout(5) << " Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + + shard = (int)strict_strtol(id.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl; + http_ret = -EINVAL; + return; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + http_ret = rl.get_bounds(shard, lowest_bound, oldest_time, entries); +} + +void RGWOp_OBJLog_GetBounds::send_response() { + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret < 0) + return; + + s->formatter->open_object_section("container"); + s->formatter->open_array_section("items"); + for (list<cls_replica_log_progress_marker>::iterator it = entries.begin(); + it != entries.end(); it++) { + cls_replica_log_progress_marker entry = (*it); + progress_encode_json("entry", "buckets", "bucket", entry, s->formatter); + flusher.flush(); + } + s->formatter->close_section(); + s->formatter->dump_string("lowest_bound", lowest_bound); + encode_json("oldest_time", oldest_time, s->formatter); + s->formatter->close_section(); + flusher.flush(); +} + +void RGWOp_OBJLog_DeleteBounds::execute() { + string id = s->info.args.get("id"), + daemon_id = s->info.args.get("daemon_id"); + + if (id.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + + shard = (int)strict_strtol(id.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl; + http_ret = -EINVAL; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + http_ret = rl.delete_bound(shard, daemon_id); +} + +static int bucket_name_to_bucket(RGWRados *store, string& bucket_str, rgw_bucket& bucket) { + RGWBucketInfo bucket_info; + RGWObjVersionTracker objv_tracker; + time_t mtime; + + int r = store->get_bucket_info(NULL, bucket_str, bucket_info, &objv_tracker, &mtime); + if (r < 0) { + dout(5) << "could not get bucket info for bucket=" << bucket_str << dendl; + return -EINVAL; + } + + bucket = bucket_info.bucket; + return 0; +} + +void RGWOp_BILog_SetBounds::execute() { + string bucket_str = s->info.args.get("bucket"), + marker = s->info.args.get("marker"), + time = s->info.args.get("time"), + daemon_id = s->info.args.get("daemon_id"); + + if (bucket_str.empty() || + marker.empty() || + time.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + utime_t ut; + + if (parse_to_utime(time, ut) < 0) { + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + bufferlist bl; + list<pair<string, utime_t> > entries; + + if ((http_ret = get_input_list(s, "object", entries)) < 0) { + return; + } + + http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &entries); +} + +void RGWOp_BILog_GetBounds::execute() { + string bucket_str = s->info.args.get("bucket"); + + if (bucket_str.empty()) { + dout(5) << " Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + http_ret = rl.get_bounds(bucket, lowest_bound, oldest_time, entries); +} + +void RGWOp_BILog_GetBounds::send_response() { + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret < 0) + return; + + s->formatter->open_object_section("container"); + s->formatter->open_array_section("entries"); + for (list<cls_replica_log_progress_marker>::iterator it = entries.begin(); + it != entries.end(); it++) { + cls_replica_log_progress_marker entry = (*it); + progress_encode_json("entry", "objects", "object", entry, s->formatter); + flusher.flush(); + } + s->formatter->close_section(); + s->formatter->dump_string("lowest_bound", lowest_bound); + encode_json("oldest_time", oldest_time, s->formatter); + s->formatter->close_section(); + flusher.flush(); +} + +void RGWOp_BILog_DeleteBounds::execute() { + string bucket_str = s->info.args.get("bucket"), + daemon_id = s->info.args.get("daemon_id"); + + if (bucket_str.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + http_ret = rl.delete_bound(bucket, daemon_id); +} + +RGWOp *RGWHandler_ReplicaLog::op_get() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) { + return new RGWOp_OBJLog_GetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + } else if (type.compare("bucket-index") == 0) { + return new RGWOp_BILog_GetBounds; + } else if (type.compare("data") == 0) { + return new RGWOp_OBJLog_GetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + } + return NULL; +} + +RGWOp *RGWHandler_ReplicaLog::op_delete() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) + return new RGWOp_OBJLog_DeleteBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + else if (type.compare("bucket-index") == 0) + return new RGWOp_BILog_DeleteBounds; + else if (type.compare("data") == 0) + return new RGWOp_OBJLog_DeleteBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + + return NULL; +} + +RGWOp *RGWHandler_ReplicaLog::op_post() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) { + return new RGWOp_OBJLog_SetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + } else if (type.compare("bucket-index") == 0) { + return new RGWOp_BILog_SetBounds; + } else if (type.compare("data") == 0) { + return new RGWOp_OBJLog_SetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + } + return NULL; +} + diff --git a/src/rgw/rgw_rest_replica_log.h b/src/rgw/rgw_rest_replica_log.h new file mode 100644 index 00000000000..0095e6771b9 --- /dev/null +++ b/src/rgw/rgw_rest_replica_log.h @@ -0,0 +1,160 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 eNovance SAS <licensing@enovance.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_RGW_REST_REPLICA_LOG_H +#define CEPH_RGW_REST_REPLICA_LOG_H + +class RGWOp_OBJLog_GetBounds : public RGWRESTOp { + int http_ret; + string prefix; + string obj_type; + utime_t oldest_time; + string lowest_bound; + list<cls_replica_log_progress_marker> entries; + +public: + RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type) + : http_ret(0) , prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_GetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_READ); + } + int verify_permission() { + return check_caps(s->user.caps); + } + void execute(); + virtual void send_response(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_getbounds"); + return s.c_str(); + } +}; + +class RGWOp_OBJLog_SetBounds : public RGWRESTOp { + string prefix; + string obj_type; +public: + RGWOp_OBJLog_SetBounds(const char *_prefix, const char *type) + : prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_SetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_updatebounds"); + return s.c_str(); + } +}; + +class RGWOp_OBJLog_DeleteBounds : public RGWRESTOp { + string prefix; + string obj_type; +public: + RGWOp_OBJLog_DeleteBounds(const char *_prefix, const char *type) + : prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_DeleteBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_deletebound"); + return s.c_str(); + } +}; + +class RGWOp_BILog_GetBounds : public RGWRESTOp { + int http_ret; + utime_t oldest_time; + string lowest_bound; + list<cls_replica_log_progress_marker> entries; +public: + RGWOp_BILog_GetBounds() : http_ret(0) {} + ~RGWOp_BILog_GetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_READ); + } + int verify_permission() { + return check_caps(s->user.caps); + } + void execute(); + virtual void send_response(); + virtual const char *name() { + return "replicabilog_getbounds"; + } +}; + +class RGWOp_BILog_SetBounds : public RGWRESTOp { +public: + RGWOp_BILog_SetBounds() {} + ~RGWOp_BILog_SetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + return "replicabilog_updatebounds"; + } +}; + +class RGWOp_BILog_DeleteBounds : public RGWRESTOp { +public: + RGWOp_BILog_DeleteBounds() {} + ~RGWOp_BILog_DeleteBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + return "replicabilog_deletebound"; + } +}; + +class RGWHandler_ReplicaLog : public RGWHandler_Auth_S3 { +protected: + RGWOp *op_get(); + RGWOp *op_delete(); + RGWOp *op_post(); + + int read_permissions(RGWOp*) { + return 0; + } +public: + RGWHandler_ReplicaLog() : RGWHandler_Auth_S3() {} + virtual ~RGWHandler_ReplicaLog() {} +}; + +class RGWRESTMgr_ReplicaLog : public RGWRESTMgr { +public: + RGWRESTMgr_ReplicaLog() {} + virtual ~RGWRESTMgr_ReplicaLog() {} + + virtual RGWHandler *get_handler(struct req_state *s){ + return new RGWHandler_ReplicaLog; + } +}; + +#endif /*!CEPH_RGW_REST_REPLICA_LOG_H*/ |