diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-01 21:34:38 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-08 11:22:08 -0700 |
commit | f28df17be9c365d64a51d41a6cf244c0c905cd5e (patch) | |
tree | e0923793c34660de869f681044c0b1ab0f3bfc85 | |
parent | 39b258c2d823a6478f8d4fcc541cc0081fd2c6e6 (diff) | |
download | ceph-f28df17be9c365d64a51d41a6cf244c0c905cd5e.tar.gz |
rgw: changed data log renew thread
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/cls/log/cls_log.cc | 17 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.cc | 22 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.h | 4 | ||||
-rw-r--r-- | src/cls/log/cls_log_ops.h | 6 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.cc | 177 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.h | 28 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 30 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 2 |
8 files changed, 170 insertions, 116 deletions
diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc index c2d50fb6a34..ac5efc4f0b5 100644 --- a/src/cls/log/cls_log.cc +++ b/src/cls/log/cls_log.cc @@ -70,17 +70,20 @@ static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *ou return -EINVAL; } - cls_log_entry& entry = op.entry; + for (list<cls_log_entry>::iterator iter = op.entries.begin(); + iter != op.entries.end(); ++iter) { + cls_log_entry& entry = *iter; - string index; + string index; - get_index(hctx, entry.timestamp, index); + get_index(hctx, entry.timestamp, index); - CLS_LOG(0, "storing entry at %s", index.c_str()); + CLS_LOG(0, "storing entry at %s", index.c_str()); - int ret = write_log_entry(hctx, index, entry); - if (ret < 0) - return ret; + int ret = write_log_entry(hctx, index, entry); + if (ret < 0) + return ret; + } return 0; } diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc index d1c199ba263..c551f407358 100644 --- a/src/cls/log/cls_log_client.cc +++ b/src/cls/log/cls_log_client.cc @@ -9,25 +9,39 @@ using namespace librados; +void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entries) +{ + bufferlist in; + cls_log_add_op call; + call.entries = entries; + ::encode(call, in); + op.exec("log", "add", in); +} + void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry) { bufferlist in; cls_log_add_op call; - call.entry = entry; + call.entries.push_back(entry); ::encode(call, in); op.exec("log", "add", in); } -void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl) { - cls_log_entry entry; - entry.timestamp = timestamp; entry.section = section; entry.name = name; entry.data = bl; +} + +void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl) +{ + cls_log_entry entry; + cls_log_add_prepare_entry(entry, timestamp, section, name, bl); cls_log_add(op, entry); } diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h index 6c0046b26f2..4171adbda11 100644 --- a/src/cls/log/cls_log_client.h +++ b/src/cls/log/cls_log_client.h @@ -9,6 +9,10 @@ * log objclass */ +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl); + +void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entry); void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl); diff --git a/src/cls/log/cls_log_ops.h b/src/cls/log/cls_log_ops.h index 6dc457ed5fd..6a5366f5a66 100644 --- a/src/cls/log/cls_log_ops.h +++ b/src/cls/log/cls_log_ops.h @@ -8,19 +8,19 @@ #include "cls_log_types.h" struct cls_log_add_op { - cls_log_entry entry; + list<cls_log_entry> entries; cls_log_add_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - ::encode(entry, bl); + ::encode(entries, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); - ::decode(entry, bl); + ::decode(entries, bl); DECODE_FINISH(bl); } }; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index b256ceb93f4..df4ef93aa3c 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -934,105 +934,6 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state, } -#if 0 - -class CompletionMap { - map<rgw_bucket, RefCountedCond *> entries; - Mutex lock; - -public: - - void add(string& s) { - Mutex::Locker l(lock); - - entries[s] = new RefCountedObject; - } - - - bool wait(string& s) { - map<string, RefCountedCond *>::iterator iter; - l.Lock(); - iter = entries.find(s); - if (iter == entries.end()) { - l.Unlock(); - return false; - } - - RefCountedCond *rcc = iter->second; - rcc->get(); - l.Unlock(); - - rcc->wait(); - rcc->put(); - - return true; - - } - - void complete(string& s) { - lock.Lock(); - - map<string, RefCountedCond *>::iterator iter = entries.find(s); - if (iter == entries.end()) { - lock.Unlock(); - return; - } - - RefCountedCond *rcc = iter->second; - - entries.erase(iter); - - lock.Unlock(); - - rcc->complete(); - rcc->put(); - } - -}; - - -class RGWChangedBucketsTracker { - CephContext *cct; - RGWRados *store; - - map<rgw_bucket, utime_t> last_reported; - - struct PendingInfo : public RefCountedCond { - PendingInfo() {} - }; - - CompletionMap pending; - - Mutex lock; -public: - RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {} - - int report_bucket_changed(rgw_bucket& bucket) { - lock.Lock(); - - map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket); - - bool exists = (iter != iter.end()); - if (exists) { - utime_t& t = iter->second; - utime_t now = ceph_clock_now(cct); - - if (now > t + get_resolution_sec()) - exists = false; - } - - lock.Unlock(); - - if (exists) - return true; - } - - uint32_t get_resolution_sec(); -}; - - -#endif - void rgw_data_change::dump(Formatter *f) const { string type; @@ -1055,6 +956,44 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { return (int)r; } +int RGWDataChangesLog::renew_entries() +{ + map<int, list<cls_log_entry> > m; + + map<string, rgw_bucket>::iterator iter; + string section; + utime_t ut = ceph_clock_now(cct); + for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) { + rgw_bucket& bucket = iter->second; + int index = choose_oid(bucket); + + cls_log_entry entry; + + rgw_data_change change; + bufferlist bl; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bucket.name; + ::encode(change, bl); + + store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); + + m[index].push_back(entry); + } + + map<int, list<cls_log_entry> >::iterator miter; + for (miter = m.begin(); miter != m.end(); ++miter) { + list<cls_log_entry>& entries = miter->second; + + int ret = store->time_log_add(oids[miter->first], entries); + if (ret < 0) { + lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl; + return ret; + } + } + + return 0; +} + int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { lock.Lock(); @@ -1064,6 +1003,8 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { changes.add(bucket.name, status); } + cur_cycle[bucket.name] = bucket; + lock.Unlock(); utime_t now = ceph_clock_now(cct); @@ -1179,3 +1120,41 @@ int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int return 0; } + +bool RGWDataChangesLog::going_down() +{ + return (down_flag.read() != 0); +} + +RGWDataChangesLog::~RGWDataChangesLog() { + down_flag.set(1); + renew_thread->stop(); + renew_thread->join(); + delete[] oids; +} + +void *RGWDataChangesLog::ChangesRenewThread::entry() { + do { + dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; + int r = log->renew_entries(); + if (r < 0) { + dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; + } + + if (log->going_down()) + break; + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(20, 0)); + lock.Unlock(); + } while (!log->going_down()); + + return NULL; +} + +void RGWDataChangesLog::ChangesRenewThread::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index ff4992a8455..ecc4cb693a5 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -260,6 +260,8 @@ class RGWDataChangesLog { Mutex lock; + atomic_t down_flag; + struct ChangeStatus { utime_t cur_expiration; utime_t cur_sent; @@ -280,6 +282,22 @@ class RGWDataChangesLog { lru_map<string, ChangeStatusPtr> changes; + map<string, rgw_bucket> cur_cycle; + + class ChangesRenewThread : public Thread { + CephContext *cct; + RGWDataChangesLog *log; + Mutex lock; + Cond cond; + + public: + ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread") {} + void *entry(); + void stop(); + }; + + ChangesRenewThread *renew_thread; + public: RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"), @@ -294,14 +312,16 @@ public: snprintf(buf, sizeof(buf), "%s.%d", prefix, i); oids[i] = buf; } - } - ~RGWDataChangesLog() { - delete[] oids; + renew_thread = new ChangesRenewThread(cct, this); + renew_thread->create(); } + ~RGWDataChangesLog(); + int choose_oid(rgw_bucket& bucket); int add_entry(rgw_bucket& bucket); + int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, list<rgw_data_change>& entries, string& marker, bool *truncated); @@ -313,6 +333,8 @@ public: }; int list_entries(utime_t& start_time, utime_t& end_time, int max_entries, list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated); + + bool going_down(); }; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 419b1fbac64..0b818cacd5c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1096,6 +1096,11 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& sec name = prefix + buf; } +void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl) +{ + cls_log_add_prepare_entry(entry, ut, section, key, bl); +} + int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl) { librados::IoCtx io_ctx; @@ -1121,6 +1126,31 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section return r; } +int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries) +{ + librados::IoCtx io_ctx; + + const char *log_pool = zone.log_pool.name.c_str(); + int r = rados->ioctx_create(log_pool, io_ctx); + if (r == -ENOENT) { + rgw_bucket pool(log_pool); + r = create_pool(pool); + if (r < 0) + return r; + + // retry + r = rados->ioctx_create(log_pool, io_ctx); + } + if (r < 0) + return r; + + ObjectWriteOperation op; + cls_log_add(op, entries); + + r = io_ctx.operate(oid, &op); + return r; +} + int RGWRados::time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index f3d095f92d4..5d59f9a2a7b 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -987,6 +987,8 @@ public: void shard_name(const string& prefix, unsigned max_shards, string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name); + void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl); + int time_log_add(const string& oid, list<cls_log_entry>& entries); int time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated); |