diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-04-25 19:06:08 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-08 11:22:08 -0700 |
commit | 171b0bf267aaaf21b06008a58410ba209424585b (patch) | |
tree | f5f68e4e7bf3b6383e838af5dace7f1278eb25fb | |
parent | 988dab3e10e79c48de6ed54d5d6ecfcbcbc26a93 (diff) | |
download | ceph-171b0bf267aaaf21b06008a58410ba209424585b.tar.gz |
rgw: data changes log, naive implementation
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/rgw/rgw_admin.cc | 49 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.cc | 188 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.h | 72 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 11 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 6 |
5 files changed, 322 insertions, 4 deletions
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 6418e4c8a29..bb6f69ed638 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -90,6 +90,7 @@ void _usage() cerr << " metadata list list metadata info\n"; cerr << " mdlog list list metadata log\n"; cerr << " bilog list list bucket index log\n"; + cerr << " datalog list list data log\n"; cerr << "options:\n"; cerr << " --uid=<id> user id\n"; cerr << " --subuser=<name> subuser name\n"; @@ -198,6 +199,7 @@ enum { OPT_METADATA_LIST, OPT_MDLOG_LIST, OPT_BILOG_LIST, + OPT_DATALOG_LIST, }; static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) @@ -225,7 +227,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) strcmp(cmd, "temp") == 0 || strcmp(cmd, "metadata") == 0 || strcmp(cmd, "mdlog") == 0 || - strcmp(cmd, "bilog") == 0) { + strcmp(cmd, "bilog") == 0 || + strcmp(cmd, "datalog") == 0) { *need_more = true; return 0; } @@ -366,6 +369,9 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) } else if (strcmp(prev_cmd, "bilog") == 0) { if (strcmp(cmd, "list") == 0) return OPT_BILOG_LIST; + } else if (strcmp(prev_cmd, "datalog") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_DATALOG_LIST; } return -EINVAL; @@ -1797,6 +1803,47 @@ next: formatter->close_section(); formatter->flush(cout); } + + if (opt_cmd == OPT_DATALOG_LIST) { + formatter->open_array_section("entries"); + bool truncated; + int count = 0; + if (max_entries < 0) + max_entries = 1000; + + utime_t start_time, end_time; + + int ret = parse_date_str(start_date, start_time); + if (ret < 0) + return -ret; + + ret = parse_date_str(end_date, end_time); + if (ret < 0) + return -ret; + + RGWDataChangesLog *log = store->data_log; + RGWDataChangesLog::LogMarker marker; + + do { + list<rgw_data_change> entries; + ret = log->list_entries(start_time, end_time, max_entries - count, entries, marker, &truncated); + if (ret < 0) { + cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + count += entries.size(); + + for (list<rgw_data_change>::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_data_change& entry = *iter; + encode_json("entry", entry, formatter); + } + formatter->flush(cout); + } while (truncated && count < max_entries); + + formatter->close_section(); + formatter->flush(cout); + } return 0; } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 223524f27fe..7d8a3b36e83 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -4,6 +4,7 @@ #include <map> #include "common/errno.h" +#include "common/ceph_json.h" #include "rgw_rados.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" @@ -932,3 +933,190 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state, return 0; } + +#if 0 + +class CompletionMap { + map<rgw_bucket, RefCountedCond *> entries; + Mutex lock; + +public: + + void add(string& s) { + Mutex::Locker l(lock); + + entries[s] = new RefCountedObject; + } + + + bool wait(string& s) { + map<string, RefCountedCond *>::iterator iter; + l.Lock(); + iter = entries.find(s); + if (iter == entries.end()) { + l.Unlock(); + return false; + } + + RefCountedCond *rcc = iter->second; + rcc->get(); + l.Unlock(); + + rcc->wait(); + rcc->put(); + + return true; + + } + + void complete(string& s) { + lock.Lock(); + + map<string, RefCountedCond *>::iterator iter = entries.find(s); + if (iter == entries.end()) { + lock.Unlock(); + return; + } + + RefCountedCond *rcc = iter->second; + + entries.erase(iter); + + lock.Unlock(); + + rcc->complete(); + rcc->put(); + } + +}; + + +class RGWChangedBucketsTracker { + CephContext *cct; + RGWRados *store; + + map<rgw_bucket, utime_t> last_reported; + + struct PendingInfo : public RefCountedCond { + PendingInfo() {} + }; + + CompletionMap pending; + + Mutex lock; +public: + RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {} + + int report_bucket_changed(rgw_bucket& bucket) { + lock.Lock(); + + map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket); + + bool exists = (iter != iter.end()); + if (exists) { + utime_t& t = iter->second; + utime_t now = ceph_clock_now(cct); + + if (now > t + get_resolution_sec()) + exists = false; + } + + lock.Unlock(); + + if (exists) + return true; + } + + uint32_t get_resolution_sec(); +}; + + +#endif + +void rgw_data_change::dump(Formatter *f) const +{ + string type; + switch (entity_type) { + case ENTITY_TYPE_BUCKET: + type = "bucket"; + break; + default: + type = "unknown"; + } + encode_json("entity_type", type, f); + encode_json("key", key, f); +} + + +int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { + string& name = bucket.name; + uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards; + + return (int)r; +} + +int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { + string& oid = oids[choose_oid(bucket)]; + + utime_t ut = ceph_clock_now(cct); + bufferlist bl; + rgw_data_change change; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bucket.name; + ::encode(change, bl); + string section; + return store->time_log_add(oid, ut, section, change.key, bl); +} + +int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, + list<rgw_data_change>& entries, string& marker, bool *truncated) { + + list<cls_log_entry> log_entries; + + int ret = store->time_log_list(oids[shard], start_time, end_time, + max_entries, log_entries, marker, truncated); + if (ret < 0) + return ret; + + list<cls_log_entry>::iterator iter; + for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) { + rgw_data_change entry; + bufferlist::iterator liter = iter->data.begin(); + try { + ::decode(entry, liter); + } catch (buffer::error& err) { + lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl; + return -EIO; + } + entries.push_back(entry); + } + + return 0; +} + +int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int max_entries, + list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated) { + bool truncated; + + entries.clear(); + + for (; marker.shard < num_shards && (int)entries.size() < max_entries; + marker.shard++, marker.marker.clear()) { + int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries, + marker.marker, &truncated); + if (ret == -ENOENT) { + continue; + } + if (ret < 0) { + return ret; + } + if (truncated) { + *ptruncated = true; + return 0; + } + } + + *ptruncated = (marker.shard < num_shards); + + return 0; +} diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 9eb16cf3707..ddca6122efc 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -219,4 +219,76 @@ public: static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher); }; + +enum DataLogEntityType { + ENTITY_TYPE_BUCKET = 1, +}; + +struct rgw_data_change { + DataLogEntityType entity_type; + string key; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + uint8_t t = (uint8_t)entity_type; + ::encode(t, bl); + ::encode(key, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + uint8_t t; + ::decode(t, bl); + entity_type = (DataLogEntityType)t; + ::decode(key, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_data_change) + +class RGWDataChangesLog { + CephContext *cct; + RGWRados *store; + + int num_shards; + string *oids; + +public: + + RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) { + num_shards = 128; /* FIXME */ + oids = new string[num_shards]; + + const char *prefix = "bucket_log"; /* FIXME */ + + for (int i = 0; i < num_shards; i++) { + char buf[16]; + snprintf(buf, sizeof(buf), "%s.%d", prefix, i); + oids[i] = buf; + } + } + + ~RGWDataChangesLog() { + delete[] oids; + } + + int choose_oid(rgw_bucket& bucket); + int add_entry(rgw_bucket& bucket); + int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, + list<rgw_data_change>& entries, string& marker, bool *truncated); + + struct LogMarker { + int shard; + string marker; + + LogMarker() : shard(0) {} + }; + int list_entries(utime_t& start_time, utime_t& end_time, int max_entries, + list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated); +}; + + #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index ccd99363765..419b1fbac64 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -12,6 +12,7 @@ #include "rgw_cache.h" #include "rgw_acl.h" #include "rgw_metadata.h" +#include "rgw_bucket.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" @@ -497,6 +498,7 @@ void RGWRadosCtx::set_prefetch_data(rgw_obj& obj) { void RGWRados::finalize() { delete meta_mgr; + delete data_log; if (use_gc_thread) { gc->stop_processor(); delete gc; @@ -525,6 +527,7 @@ int RGWRados::init_rados() return ret; meta_mgr = new RGWMetadataManager(cct, this); + data_log = new RGWDataChangesLog(cct, this); return ret; } @@ -2886,6 +2889,12 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, if (bucket_is_system(bucket)) return 0; + int ret = data_log->add_entry(obj.bucket); + if (ret < 0) { + lderr(cct) << "ERROR: failed writing data log" << dendl; + return ret; + } + if (state && state->obj_tag.length()) { int len = state->obj_tag.length(); char buf[len + 1]; @@ -2897,7 +2906,7 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, append_rand_alpha(cct, tag, tag, 32); } } - int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag, + ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag, obj.object, obj.key); return ret; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 057bf75e7c0..f3d095f92d4 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -456,7 +456,7 @@ struct RGWRegionMap { }; WRITE_CLASS_ENCODER(RGWRegionMap); - +class RGWDataChangesLog; class RGWRados { @@ -564,7 +564,7 @@ public: num_watchers(0), watchers(NULL), watch_handles(NULL), bucket_id_lock("rados_bucket_id"), max_bucket_id(0), cct(NULL), rados(NULL), - pools_initialized(false), meta_mgr(NULL) {} + pools_initialized(false), meta_mgr(NULL), data_log(NULL) {} void set_context(CephContext *_cct) { cct = _cct; @@ -583,6 +583,8 @@ public: RGWMetadataManager *meta_mgr; + RGWDataChangesLog *data_log; + virtual ~RGWRados() { if (rados) { rados->shutdown(); |