summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-05-06 13:41:29 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-05-08 11:22:09 -0700
commit3385c2c5be176e65e3d4a74fca31075890659909 (patch)
treeed667a1e8548466f2f35c4c666a4dfb3521589d4
parenta37092f9f31fbc11a101f1cc30baf15effabd8c6 (diff)
downloadceph-3385c2c5be176e65e3d4a74fca31075890659909.tar.gz
rgw: more data changes log implementation
Remove a bunch of hard coded stuff. Make renew thread updates expiration. Only renew if there was more than one request through the current window. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/common/config_opts.h5
-rw-r--r--src/rgw/rgw_bucket.cc79
-rw-r--r--src/rgw/rgw_bucket.h17
3 files changed, 79 insertions, 22 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 5160a611402..ed52297bd78 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -603,6 +603,11 @@ OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket na
OPTION(rgw_list_buckets_max_chunk, OPT_INT, 1000) // max buckets to retrieve in a single op when listing user buckets
OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log
+OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds)
+OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log
+OPTION(rgw_data_log_num_shards, OPT_INT, 128) // number of objects to keep data changes log on
+OPTION(rgw_data_log_obj_prefix, OPT_STR, "data_log") //
+
OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
// This will be set to true when it is safe to start threads.
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index df4ef93aa3c..f1a7b08de30 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -958,12 +958,19 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
int RGWDataChangesLog::renew_entries()
{
- map<int, list<cls_log_entry> > m;
+ /* we can't keep the bucket name as part of the cls_log_entry, and we need
+ * it later, so we keep two lists under the map */
+ map<int, pair<list<string>, list<cls_log_entry> > > m;
+
+ lock.Lock();
+ map<string, rgw_bucket> entries;
+ entries.swap(cur_cycle);
+ lock.Unlock();
map<string, rgw_bucket>::iterator iter;
string section;
utime_t ut = ceph_clock_now(cct);
- for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) {
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_bucket& bucket = iter->second;
int index = choose_oid(bucket);
@@ -977,33 +984,65 @@ int RGWDataChangesLog::renew_entries()
store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
- m[index].push_back(entry);
+ m[index].second.push_back(entry);
}
- map<int, list<cls_log_entry> >::iterator miter;
+ map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
for (miter = m.begin(); miter != m.end(); ++miter) {
- list<cls_log_entry>& entries = miter->second;
+ list<cls_log_entry>& entries = miter->second.second;
+
+ utime_t now = ceph_clock_now(cct);
int ret = store->time_log_add(oids[miter->first], entries);
if (ret < 0) {
+ /* we don't really need to have a special handling for failed cases here,
+ * as this is just an optimization. */
lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
return ret;
}
+
+ utime_t expiration = now;
+ expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
+
+ list<string>& buckets = miter->second.first;
+ list<string>::iterator liter;
+ for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
+ update_renewed(*liter, expiration);
+ }
}
return 0;
}
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
- lock.Lock();
-
- ChangeStatusPtr status;
- if (!changes.find(bucket.name, status)) {
+void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
+{
+ assert(lock.is_locked());
+ if (!changes.find(bucket_name, status)) {
status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bucket.name, status);
+ changes.add(bucket_name, status);
}
+}
+void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
+{
+ Mutex::Locker l(lock);
cur_cycle[bucket.name] = bucket;
+}
+
+void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
+{
+ Mutex::Locker l(lock);
+ ChangeStatusPtr status;
+ _get_change(bucket_name, status);
+
+ status->cur_expiration = expiration;
+}
+
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+ lock.Lock();
+
+ ChangeStatusPtr status;
+ _get_change(bucket.name, status);
lock.Unlock();
@@ -1011,10 +1050,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
status->lock->Lock();
-#warning FIXME delta config
if (now < status->cur_expiration) {
/* no need to send, recently completed */
status->lock->Unlock();
+
+ register_renew(bucket);
return 0;
}
@@ -1028,10 +1068,12 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
status->cond->get();
status->lock->Unlock();
- cond->wait();
+ int ret = cond->wait();
cond->put();
-#warning FIXME need to return actual status
- return 0;
+ if (!ret) {
+ register_renew(bucket);
+ }
+ return ret;
}
status->cond = new RefCountedCond;
@@ -1058,11 +1100,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
status->pending = false;
status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
- status->cur_expiration += utime_t(5, 0);
+ status->cur_expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
status->cond = NULL;
status->lock->Unlock();
- cond->done();
+ cond->done(ret);
cond->put();
return ret;
@@ -1144,8 +1186,9 @@ void *RGWDataChangesLog::ChangesRenewThread::entry() {
if (log->going_down())
break;
+ int interval = cct->_conf->rgw_data_log_window * 3 / 4;
lock.Lock();
- cond.WaitInterval(cct, lock, utime_t(20, 0));
+ cond.WaitInterval(cct, lock, utime_t(interval, 0));
lock.Unlock();
} while (!log->going_down());
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index ecc4cb693a5..8a849b32201 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -284,6 +284,10 @@ class RGWDataChangesLog {
map<string, rgw_bucket> cur_cycle;
+ void _get_change(string& bucket_name, ChangeStatusPtr& status);
+ void register_renew(rgw_bucket& bucket);
+ void update_renewed(string& bucket_name, utime_t& expiration);
+
class ChangesRenewThread : public Thread {
CephContext *cct;
RGWDataChangesLog *log;
@@ -301,15 +305,20 @@ class RGWDataChangesLog {
public:
RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"),
- changes(1000) /* FIXME */ {
- num_shards = 128; /* FIXME */
+ changes(cct->_conf->rgw_data_log_changes_size) {
+ num_shards = cct->_conf->rgw_data_log_num_shards;
+
oids = new string[num_shards];
- const char *prefix = "bucket_log"; /* FIXME */
+ string prefix = cct->_conf->rgw_data_log_obj_prefix;
+
+ if (prefix.empty()) {
+ prefix = "data_log";
+ }
for (int i = 0; i < num_shards; i++) {
char buf[16];
- snprintf(buf, sizeof(buf), "%s.%d", prefix, i);
+ snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
oids[i] = buf;
}