diff options
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 39 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.h | 8 | ||||
-rw-r--r-- | src/rgw/rgw_quota.cc | 127 | ||||
-rw-r--r-- | src/rgw/rgw_quota.h | 4 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 47 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 25 |
6 files changed, 243 insertions, 7 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 165ca437987..2851f2bd702 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -2,6 +2,7 @@ #include "include/types.h" #include "cls/rgw/cls_rgw_ops.h" +#include "cls/rgw/cls_rgw_client.h" #include "include/rados/librados.hpp" #include "common/debug.h" @@ -157,6 +158,44 @@ int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *he return r; } +class GetDirHeaderCompletion : public ObjectOperationCompletion { + RGWGetDirHeader_CB *ret_ctx; +public: + GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {} + ~GetDirHeaderCompletion() { + ret_ctx->put(); + } + void handle_completion(int r, bufferlist& outbl) { + struct rgw_cls_list_ret ret; + try { + bufferlist::iterator iter = outbl.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + r = -EIO; + } + + ret_ctx->handle_response(r, ret.dir.header); + }; +}; + +int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx) +{ + bufferlist in, out; + struct rgw_cls_list_op call; + call.num_entries = 0; + ::encode(call, in); + ObjectReadOperation op; + GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx); + op.exec("rgw", "bucket_list", in, cb); + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + c->release(); + if (r < 0) + return r; + + return 0; +} + int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, list<rgw_bi_log_entry>& entries, bool *truncated) { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 2ea5d9ca771..39bb3c9fc4a 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -4,6 +4,13 @@ #include "include/types.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" +#include "common/RefCountedObj.h" + +class RGWGetDirHeader_CB : public RefCountedObject { +public: + virtual ~RGWGetDirHeader_CB() {} + virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; +}; /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); @@ -27,6 +34,7 @@ int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); +int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 70ab0762ce6..984b3b57dea 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -1,6 +1,7 @@ #include "include/utime.h" #include "common/lru_map.h" +#include "common/RefCountedObj.h" #include "rgw_common.h" #include "rgw_rados.h" @@ -12,20 +13,31 @@ struct RGWQuotaBucketStats { RGWBucketStats stats; utime_t expiration; + utime_t async_refresh_time; }; class RGWBucketStatsCache { RGWRados *store; lru_map<rgw_bucket, RGWQuotaBucketStats> stats_map; + RefCountedWaitObject *async_refcount; int fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats); public: #warning FIXME configurable stats_map size - RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(10000) {} + RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(10000) { + async_refcount = new RefCountedWaitObject; + } + ~RGWBucketStatsCache() { + async_refcount->put_wait(); /* wait for all pending async requests to complete */ + } int get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats); void adjust_bucket_stats(rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); + + void set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats); + int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs); + void async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats); }; int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats) @@ -42,6 +54,8 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& return r; } + stats = RGWBucketStats(); + map<RGWObjCategory, RGWBucketStats>::iterator iter; for (iter = bucket_stats.begin(); iter != bucket_stats.end(); ++iter) { RGWBucketStats& s = iter->second; @@ -53,23 +67,124 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& return 0; } +class AsyncRefreshHandler : public RGWGetBucketStats_CB { + RGWRados *store; + RGWBucketStatsCache *cache; +public: + AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), cache(_cache) {} + + int init_fetch(); + + void handle_response(int r); +}; + + +int AsyncRefreshHandler::init_fetch() +{ + ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; + map<RGWObjCategory, RGWBucketStats> bucket_stats; + int r = store->get_bucket_stats_async(bucket, this); + if (r < 0) { + ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; + + /* get_bucket_stats_async() dropped our reference already */ + return r; + } + + return 0; +} + +void AsyncRefreshHandler::handle_response(int r) +{ + if (r < 0) { + ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; + return; /* nothing to do here */ + } + + RGWBucketStats bs; + + map<RGWObjCategory, RGWBucketStats>::iterator iter; + for (iter = stats->begin(); iter != stats->end(); ++iter) { + RGWBucketStats& s = iter->second; + bs.num_kb += s.num_kb; + bs.num_kb_rounded += s.num_kb_rounded; + bs.num_objects += s.num_objects; + } + + cache->async_refresh_response(bucket, bs); +} + +int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs) +{ +#if 0 + if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */ + qs.async_update_flag.dec(); + return 0; + } +#endif +#warning protect against multiple updates + + async_refcount->get(); + + AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket); + + int ret = handler->init_fetch(); + if (ret < 0) { + async_refcount->put(); + handler->put(); + return ret; + } + + return 0; +} + +void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats) +{ + ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; + + RGWQuotaBucketStats qs; + + stats_map.find(bucket, qs); + + set_stats(bucket, qs, stats); + + async_refcount->put(); +} + +void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats) +{ + qs.stats = stats; + qs.expiration = ceph_clock_now(store->ctx()); + qs.async_refresh_time = qs.expiration; + qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; + qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; + + stats_map.add(bucket, qs); +} + int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats) { RGWQuotaBucketStats qs; + utime_t now = ceph_clock_now(store->ctx()); if (stats_map.find(bucket, qs)) { + if (now >= qs.async_refresh_time) { + int r = async_refresh(bucket, qs); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; + + /* continue processing, might be a transient error, async refresh is just optimization */ + } + } if (qs.expiration > ceph_clock_now(store->ctx())) { stats = qs.stats; return 0; } } - int ret = fetch_bucket_totals(bucket, qs.stats); + int ret = fetch_bucket_totals(bucket, stats); if (ret < 0 && ret != -ENOENT) return ret; - qs.expiration = ceph_clock_now(store->ctx()); - qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; - - stats_map.add(bucket, qs); + set_stats(bucket, qs, stats); return 0; } diff --git a/src/rgw/rgw_quota.h b/src/rgw/rgw_quota.h index 39cfc62de55..9af91e3986d 100644 --- a/src/rgw/rgw_quota.h +++ b/src/rgw/rgw_quota.h @@ -3,6 +3,7 @@ #include "include/utime.h" +#include "include/atomic.h" #include "common/lru_map.h" class RGWRados; @@ -42,7 +43,8 @@ class rgw_bucket; class RGWQuotaHandler { public: RGWQuotaHandler() {} - virtual ~RGWQuotaHandler() {} + virtual ~RGWQuotaHandler() { + } virtual int check_quota(rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, uint64_t num_objs, uint64_t size) = 0; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8035e0589de..811c7ee57cc 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4614,6 +4614,38 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_ return 0; } +class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { + RGWGetBucketStats_CB *cb; + +public: + RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {} + void handle_response(int r, rgw_bucket_dir_header& header) { + map<RGWObjCategory, RGWBucketStats> stats; + + if (r >= 0) { + translate_raw_stats(header, stats); + cb->set_response(header.ver, header.master_ver, &stats, header.max_marker); + } + + cb->handle_response(r); + + cb->put(); + } +}; + +int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx) +{ + RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx); + int r = cls_bucket_head_async(bucket, get_ctx); + if (r < 0) { + ctx->put(); + delete get_ctx; + return r; + } + + return 0; +} + void RGWRados::get_bucket_instance_entry(rgw_bucket& bucket, string& entry) { entry = bucket.name + ":" + bucket.bucket_id; @@ -5496,6 +5528,21 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& return 0; } +int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx) +{ + librados::IoCtx index_ctx; + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); + if (r < 0) + return r; + + r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx); + if (r < 0) + return r; + + return 0; +} + int RGWRados::check_quota(rgw_bucket& bucket, RGWQuotaInfo& quota_info, uint64_t obj_size) { return quota_handler->check_quota(bucket, quota_info, 1, obj_size); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a23f90f1f23..52b898123d4 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -761,6 +761,29 @@ public: int renew_state(); }; +class RGWGetBucketStats_CB : public RefCountedObject { +protected: + rgw_bucket bucket; + uint64_t bucket_ver; + uint64_t master_ver; + map<RGWObjCategory, RGWBucketStats> *stats; + string max_marker; +public: + RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {} + virtual ~RGWGetBucketStats_CB() {} + virtual void handle_response(int r) = 0; + virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver, + map<RGWObjCategory, RGWBucketStats> *_stats, + const string &_max_marker) { + bucket_ver = _bucket_ver; + master_ver = _master_ver; + stats = _stats; + max_marker = _max_marker; + } +}; + +class RGWGetDirHeader_CB; + class RGWRados { @@ -1295,6 +1318,7 @@ public: int decode_policy(bufferlist& bl, ACLOwner *owner); int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWBucketStats>& stats, string *max_marker); + int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb); void get_bucket_instance_obj(rgw_bucket& bucket, rgw_obj& obj); void get_bucket_instance_entry(rgw_bucket& bucket, string& entry); void get_bucket_meta_oid(rgw_bucket& bucket, string& oid); @@ -1326,6 +1350,7 @@ public: map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry, bool (*force_check_filter)(const string& name) = NULL); int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); + int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, RGWModifyOp op, rgw_obj& oid, string& tag); int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, |