diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-10-07 20:22:51 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-10-07 20:22:51 -0700 |
commit | 0f3b794d188850800fd41fa2ef29045175b451a2 (patch) | |
tree | 04ac9f69eab4b944fa18527a1ae2ff2a95d674a8 | |
parent | 10a1e9b371a02d69e617677b80ed34c264c2d2dc (diff) | |
download | ceph-0f3b794d188850800fd41fa2ef29045175b451a2.tar.gz |
rgw: async quota updatewip-rgw-quota
Not done yet.
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 38 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.h | 8 | ||||
-rw-r--r-- | src/rgw/rgw_quota.cc | 107 | ||||
-rw-r--r-- | src/rgw/rgw_quota.h | 7 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 37 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 25 |
6 files changed, 214 insertions, 8 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 165ca437987..36caf19b4fe 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -157,6 +157,44 @@ int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *he return r; } +class GetDirHeaderCompletion : public ObjectOperationCompletion { + RGWGetDirHeader_CB *ret_ctx; +public: + GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {} + ~GetDirHeaderCompletion() { + ret_ctx->put(); + } + void handle_completion(int r, bufferlist& outbl) { + struct rgw_cls_list_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + r = -EIO; + } + + ret_ctx->handle_response(r, ret.dir.header); + }; +}; + +int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx) +{ + bufferlist in, out; + struct rgw_cls_list_op call; + call.num_entries = 0; + ::encode(call, in); + ObjectWriteOperation op; + GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx); + op.exec("rgw", "bucket_list", in, cb); + AioCompletion *c = IoCtx::create_completion(NULL, NULL, NULL); + int r = io_ctx.aio_operate(oid, c, &op); + c->release(); + if (r < 0) + return r; + + return 0; +} + int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, list<rgw_bi_log_entry>& entries, bool *truncated) { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 2ea5d9ca771..39bb3c9fc4a 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -4,6 +4,13 @@ #include "include/types.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" +#include "common/RefCountedObj.h" + +class RGWGetDirHeader_CB : public RefCountedObject { +public: + virtual ~RGWGetDirHeader_CB() {} + virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; +}; /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); @@ -27,6 +34,7 @@ int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); +int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 70ab0762ce6..1f406a2c82c 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -26,6 +26,9 @@ public: int get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats); void adjust_bucket_stats(rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); + + int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs); + void async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats); }; int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats) @@ -42,6 +45,8 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& return r; } + stats = RGWBucketStats(); + map<RGWObjCategory, RGWBucketStats>::iterator iter; for (iter = bucket_stats.begin(); iter != bucket_stats.end(); ++iter) { RGWBucketStats& s = iter->second; @@ -53,23 +58,115 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& return 0; } +class AsyncRefreshHandler : public RGWGetBucketStats_CB { + RGWRados *store; + RGWBucketStatsCache *cache; +public: + AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), cache(_cache) {} + + int init_fetch(); + + void handle_response(int r); +}; + + +int AsyncRefreshHandler::init_fetch() +{ + map<RGWObjCategory, RGWBucketStats> bucket_stats; + int r = store->get_bucket_stats_async(bucket, this); + if (r < 0) { + ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; + + /* get_bucket_stats_async() dropped our reference already */ + return r; + } + + return 0; +} + +void AsyncRefreshHandler::handle_response(int r) +{ + if (r < 0) + return; /* nothing to do here */ + + RGWBucketStats bs; + + map<RGWObjCategory, RGWBucketStats>::iterator iter; + for (iter = stats->begin(); iter != stats->end(); ++iter) { + RGWBucketStats& s = iter->second; + bs.num_kb += s.num_kb; + bs.num_kb_rounded += s.num_kb_rounded; + bs.num_objects += s.num_objects; + } + + cache->async_refresh_response(bucket, bs); +} + +int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs) +{ + if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */ + qs.async_update_flag.dec(); + return 0; + } + + AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket); + + async_refcount.get(); + + int ret = handler->init_fetch(); + if (ret < 0) { + async_refcount.put(); + handler->put(); + return ret; + } + + return 0; +} + +void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats) +{ + RGWQuotaBucketStats qs; + + stats_map.find(bucket, qs); + + set_stats(bucket, qs, stats); + async_refcount.put(); +} + +void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats) +{ + qs.stats = stats; + qs.expiration = ceph_clock_now(store->ctx()); + qs.async_refresh_time = qs.expiration; + qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; + qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; + + stats_map.add(bucket, qs); +} + int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats) { RGWQuotaBucketStats qs; + utime_t now = ceph_clock_now(store->ctx()); if (stats_map.find(bucket, qs)) { + if (now >= qs.async_refresh_time) { + int r = async_refresh(bucket); + if (r < 0) { + ldout(s->ctx(), 0) << "ERROR: quota async refresh returned ret=" << ret << dendl; + + /* continue processing, might be a transient error, async refresh is just optimization */ + } + } if (qs.expiration > ceph_clock_now(store->ctx())) { stats = qs.stats; return 0; } } - int ret = fetch_bucket_totals(bucket, qs.stats); + int ret = fetch_bucket_totals(bucket, stats); if (ret < 0 && ret != -ENOENT) return ret; - qs.expiration = ceph_clock_now(store->ctx()); - qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; - - stats_map.add(bucket, qs); + set_stats(bucket, qs, stats); return 0; } diff --git a/src/rgw/rgw_quota.h b/src/rgw/rgw_quota.h index 39cfc62de55..6cdc21c52a5 100644 --- a/src/rgw/rgw_quota.h +++ b/src/rgw/rgw_quota.h @@ -3,7 +3,9 @@ #include "include/utime.h" +#include "include/atomic.h" #include "common/lru_map.h" +#include "common/RefCountedObj.h" class RGWRados; class JSONObj; @@ -40,9 +42,12 @@ WRITE_CLASS_ENCODER(RGWQuotaInfo) class rgw_bucket; class RGWQuotaHandler { + RefCountedWaitObject async_refcount; public: RGWQuotaHandler() {} - virtual ~RGWQuotaHandler() {} + virtual ~RGWQuotaHandler() { + async_refcount.put_wait(); /* wait for all pending async requests to complete */ + } virtual int check_quota(rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, uint64_t num_objs, uint64_t size) = 0; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8035e0589de..06a8113af44 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4614,6 +4614,39 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_ return 0; } +class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { + RGWGetBucketStats_CB *cb; + +public: + RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {} + + void handle_response(int r, rgw_bucket_dir_header& header) { + map<RGWObjCategory, RGWBucketStats> stats; + + if (r >= 0) { + translate_raw_stats(header, stats); + cb->set_response(header.ver, header.master_ver, &stats, header.max_marker); + } + + cb->handle_response(r); + + cb->put(); + } +}; + +int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx) +{ + RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx); + int r = cls_bucket_head_async(bucket, get_ctx); + if (r < 0) { + ctx->put(); + delete get_ctx; + return r; + } + + return 0; +} + void RGWRados::get_bucket_instance_entry(rgw_bucket& bucket, string& entry) { entry = bucket.name + ":" + bucket.bucket_id; @@ -5481,7 +5514,7 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx, return 0; } -int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header) +int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx) { librados::IoCtx index_ctx; string oid; @@ -5489,7 +5522,7 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& if (r < 0) return r; - r = cls_rgw_get_dir_header(index_ctx, oid, &header); + r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx); if (r < 0) return r; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a23f90f1f23..52b898123d4 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -761,6 +761,29 @@ public: int renew_state(); }; +class RGWGetBucketStats_CB : public RefCountedObject { +protected: + rgw_bucket bucket; + uint64_t bucket_ver; + uint64_t master_ver; + map<RGWObjCategory, RGWBucketStats> *stats; + string max_marker; +public: + RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {} + virtual ~RGWGetBucketStats_CB() {} + virtual void handle_response(int r) = 0; + virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver, + map<RGWObjCategory, RGWBucketStats> *_stats, + const string &_max_marker) { + bucket_ver = _bucket_ver; + master_ver = _master_ver; + stats = _stats; + max_marker = _max_marker; + } +}; + +class RGWGetDirHeader_CB; + class RGWRados { @@ -1295,6 +1318,7 @@ public: int decode_policy(bufferlist& bl, ACLOwner *owner); int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWBucketStats>& stats, string *max_marker); + int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb); void get_bucket_instance_obj(rgw_bucket& bucket, rgw_obj& obj); void get_bucket_instance_entry(rgw_bucket& bucket, string& entry); void get_bucket_meta_oid(rgw_bucket& bucket, string& oid); @@ -1326,6 +1350,7 @@ public: map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry, bool (*force_check_filter)(const string& name) = NULL); int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); + int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, RGWModifyOp op, rgw_obj& oid, string& tag); int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, |