diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-06 13:41:29 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-06 13:41:29 -0700 |
commit | fff440ba53e01732806f0ea6a6d62d028787454e (patch) | |
tree | 7585cc134be3480865e674a1356c8bfa2f0c77f9 | |
parent | 8d09db614002a49011a372bc5e28fe294019f4b5 (diff) | |
download | ceph-fff440ba53e01732806f0ea6a6d62d028787454e.tar.gz |
rgw: more data changes log implementation
Remove a bunch of hard coded stuff. Make renew thread updates
expiration. Only renew if there was more than one request through the
current window.
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 5 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.cc | 79 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.h | 17 |
3 files changed, 79 insertions, 22 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d407e829f3b..f37b7f50be5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -581,6 +581,11 @@ OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket name rules for US region buckets OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log +OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds) +OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log +OPTION(rgw_data_log_num_shards, OPT_INT, 128) // number of objects to keep data changes log on +OPTION(rgw_data_log_obj_prefix, OPT_STR, "data_log") // + OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter // This will be set to true when it is safe to start threads. diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 89e63791442..acdd594f0c9 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -997,12 +997,19 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { int RGWDataChangesLog::renew_entries() { - map<int, list<cls_log_entry> > m; + /* we can't keep the bucket name as part of the cls_log_entry, and we need + * it later, so we keep two lists under the map */ + map<int, pair<list<string>, list<cls_log_entry> > > m; + + lock.Lock(); + map<string, rgw_bucket> entries; + entries.swap(cur_cycle); + lock.Unlock(); map<string, rgw_bucket>::iterator iter; string section; utime_t ut = ceph_clock_now(cct); - for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) { + for (iter = entries.begin(); iter != entries.end(); ++iter) { rgw_bucket& bucket = iter->second; int index = choose_oid(bucket); @@ -1016,33 +1023,65 @@ int RGWDataChangesLog::renew_entries() store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); - m[index].push_back(entry); + m[index].second.push_back(entry); } - map<int, list<cls_log_entry> >::iterator miter; + map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter; for (miter = m.begin(); miter != m.end(); ++miter) { - list<cls_log_entry>& entries = miter->second; + list<cls_log_entry>& entries = miter->second.second; + + utime_t now = ceph_clock_now(cct); int ret = store->time_log_add(oids[miter->first], entries); if (ret < 0) { + /* we don't really need to have a special handling for failed cases here, + * as this is just an optimization. */ lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl; return ret; } + + utime_t expiration = now; + expiration += utime_t(cct->_conf->rgw_data_log_window, 0); + + list<string>& buckets = miter->second.first; + list<string>::iterator liter; + for (liter = buckets.begin(); liter != buckets.end(); ++liter) { + update_renewed(*liter, expiration); + } } return 0; } -int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { - lock.Lock(); - - ChangeStatusPtr status; - if (!changes.find(bucket.name, status)) { +void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status) +{ + assert(lock.is_locked()); + if (!changes.find(bucket_name, status)) { status = ChangeStatusPtr(new ChangeStatus); - changes.add(bucket.name, status); + changes.add(bucket_name, status); } +} +void RGWDataChangesLog::register_renew(rgw_bucket& bucket) +{ + Mutex::Locker l(lock); cur_cycle[bucket.name] = bucket; +} + +void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration) +{ + Mutex::Locker l(lock); + ChangeStatusPtr status; + _get_change(bucket_name, status); + + status->cur_expiration = expiration; +} + +int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { + lock.Lock(); + + ChangeStatusPtr status; + _get_change(bucket.name, status); lock.Unlock(); @@ -1050,10 +1089,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->lock->Lock(); -#warning FIXME delta config if (now < status->cur_expiration) { /* no need to send, recently completed */ status->lock->Unlock(); + + register_renew(bucket); return 0; } @@ -1067,10 +1107,12 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->cond->get(); status->lock->Unlock(); - cond->wait(); + int ret = cond->wait(); cond->put(); -#warning FIXME need to return actual status - return 0; + if (!ret) { + register_renew(bucket); + } + return ret; } status->cond = new RefCountedCond; @@ -1097,11 +1139,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->pending = false; status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */ - status->cur_expiration += utime_t(5, 0); + status->cur_expiration += utime_t(cct->_conf->rgw_data_log_window, 0); status->cond = NULL; status->lock->Unlock(); - cond->done(); + cond->done(ret); cond->put(); return ret; @@ -1183,8 +1225,9 @@ void *RGWDataChangesLog::ChangesRenewThread::entry() { if (log->going_down()) break; + int interval = cct->_conf->rgw_data_log_window * 3 / 4; lock.Lock(); - cond.WaitInterval(cct, lock, utime_t(20, 0)); + cond.WaitInterval(cct, lock, utime_t(interval, 0)); lock.Unlock(); } while (!log->going_down()); diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 6868a1bca46..bd728a46b12 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -287,6 +287,10 @@ class RGWDataChangesLog { map<string, rgw_bucket> cur_cycle; + void _get_change(string& bucket_name, ChangeStatusPtr& status); + void register_renew(rgw_bucket& bucket); + void update_renewed(string& bucket_name, utime_t& expiration); + class ChangesRenewThread : public Thread { CephContext *cct; RGWDataChangesLog *log; @@ -304,15 +308,20 @@ class RGWDataChangesLog { public: RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"), - changes(1000) /* FIXME */ { - num_shards = 128; /* FIXME */ + changes(cct->_conf->rgw_data_log_changes_size) { + num_shards = cct->_conf->rgw_data_log_num_shards; + oids = new string[num_shards]; - const char *prefix = "bucket_log"; /* FIXME */ + string prefix = cct->_conf->rgw_data_log_obj_prefix; + + if (prefix.empty()) { + prefix = "data_log"; + } for (int i = 0; i < num_shards; i++) { char buf[16]; - snprintf(buf, sizeof(buf), "%s.%d", prefix, i); + snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i); oids[i] = buf; } |