diff options
-rw-r--r-- | src/rgw/rgw_bucket.cc | 64 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.h | 22 |
2 files changed, 84 insertions, 2 deletions
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 00f52550aae..13920e9e87f 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1095,6 +1095,51 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { } int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { + lock.Lock(); + ChangeStatus *& status = changes[bucket.name]; + if (!status) { + status = new ChangeStatus; + } + + status->get(); + lock.Unlock(); + + utime_t now = ceph_clock_now(cct); + + status->lock->Lock(); + +#warning FIXME delta config + if (now < status->cur_expiration) { + /* no need to send, recently completed */ + status->lock->Unlock(); + status->put(); + return 0; + } + + RefCountedCond *cond; + + if (status->pending) { + cond = status->cond; + + assert(cond); + + status->cond->get(); + status->lock->Unlock(); + status->put(); + + cond->wait(); + cond->put(); +#warning FIXME need to return actual status + return 0; + } + + status->cond = new RefCountedCond; + status->pending = true; + + status->cur_sent = now; + + status->lock->Unlock(); + string& oid = oids[choose_oid(bucket)]; utime_t ut = ceph_clock_now(cct); @@ -1104,7 +1149,24 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { change.key = bucket.name; ::encode(change, bl); string section; - return store->time_log_add(oid, ut, section, change.key, bl); + int ret = store->time_log_add(oid, ut, section, change.key, bl); + + status->lock->Lock(); + + cond = status->cond; + + status->pending = false; + status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */ + status->cur_expiration += utime_t(5, 0); + status->cond = NULL; + status->lock->Unlock(); + + status->put(); + + cond->done(); + cond->put(); + + return ret; } int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index d3e9f968c6c..2ac0144409c 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -259,9 +259,29 @@ class RGWDataChangesLog { int num_shards; string *oids; + Mutex lock; + + struct ChangeStatus : public RefCountedObject { + utime_t cur_expiration; + utime_t cur_sent; + bool pending; + RefCountedCond *cond; + Mutex *lock; + + ChangeStatus() : pending(false), cond(NULL) { + lock = new Mutex("RGWDataChangesLog::ChangeStatus"); + } + + ~ChangeStatus() { + delete lock; + } + }; + + map<string, ChangeStatus *> changes; + public: - RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) { + RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog") { num_shards = 128; /* FIXME */ oids = new string[num_shards]; |