summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2011-09-01 16:00:39 -0700
committerYehuda Sadeh <yehuda@hq.newdream.net>2011-09-01 16:00:39 -0700
commitd0eed624187bf7a0c6da6e5fd7635d3de6ad25f1 (patch)
treec5ce6a1130fcb92f2a66ca5238b9353f254f55fb
parent7f640a9256b8258c78c917ac684e85901cf63eea (diff)
downloadceph-d0eed624187bf7a0c6da6e5fd7635d3de6ad25f1.tar.gz
rgw: poll allocation thread
-rw-r--r--src/common/config.cc3
-rw-r--r--src/common/config.h3
-rw-r--r--src/rgw/rgw_bucket.cc71
-rw-r--r--src/rgw/rgw_bucket.h1
-rw-r--r--src/rgw/rgw_main.cc24
-rw-r--r--src/rgw/rgw_rados.h19
6 files changed, 104 insertions, 17 deletions
diff --git a/src/common/config.cc b/src/common/config.cc
index c1d99d1412d..8bfcc21423a 100644
--- a/src/common/config.cc
+++ b/src/common/config.cc
@@ -434,6 +434,9 @@ struct config_option config_optionsp[] = {
OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 60*60),
OPTION(rgw_thread_pool_size, OPT_INT, 100),
+ OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0),
+ OPTION(rgw_pools_preallocate_max, OPT_INT, 100),
+ OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70),
// see config.h
OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
diff --git a/src/common/config.h b/src/common/config.h
index 644cc1d6be1..467789c7994 100644
--- a/src/common/config.h
+++ b/src/common/config.h
@@ -568,6 +568,9 @@ public:
int rgw_op_thread_timeout;
int rgw_op_thread_suicide_timeout;
int rgw_thread_pool_size;
+ double rgw_maintenance_tick_interval;
+ int rgw_pools_preallocate_max;
+ int rgw_pools_preallocate_threshold;
// This will be set to true when it is safe to start threads.
// Once it is true, it will never change.
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index c8ceccde9b0..8069dbd9027 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -15,8 +15,6 @@ static rgw_bucket pi_buckets(BUCKETS_POOL_NAME);
static string avail_pools = ".pools.avail";
static string pool_name_prefix = "p";
-#define POOLS_PREALLOCATE_NUM 100
-
int rgw_store_bucket_info(string& bucket_name, RGWBucketInfo& info)
{
@@ -64,11 +62,11 @@ int rgw_remove_bucket_info(string& bucket_name)
return ret;
}
-static int generate_preallocated_pools(vector<string>& pools)
+static int generate_preallocated_pools(vector<string>& pools, int num)
{
vector<string> names;
- for (int i = 0; i < POOLS_PREALLOCATE_NUM; i++) {
+ for (int i = 0; i < num; i++) {
string name = pool_name_prefix;
append_rand_alpha(pool_name_prefix, name, 8);
names.push_back(name);
@@ -98,18 +96,8 @@ static int generate_preallocated_pools(vector<string>& pools)
return 0;
}
-static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+static int register_available_pools(vector<string>& pools)
{
- vector<string> pools;
- int ret = generate_preallocated_pools(pools);
- if (ret < 0) {
- RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
- return ret;
- }
- bucket.pool = pools.back();
- pools.pop_back();
- bucket.name = bucket_name;
-
map<string, bufferlist> m;
vector<string>::iterator iter;
@@ -119,7 +107,7 @@ static int generate_pool(string& bucket_name, rgw_bucket& bucket)
m[name] = bl;
}
rgw_obj obj(pi_buckets, avail_pools);
- ret = rgwstore->tmap_set(obj, m);
+ int ret = rgwstore->tmap_set(obj, m);
if (ret == -ENOENT) {
rgw_bucket new_bucket;
map<string,bufferlist> attrs;
@@ -136,6 +124,26 @@ static int generate_pool(string& bucket_name, rgw_bucket& bucket)
return 0;
}
+static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+{
+ vector<string> pools;
+ int ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max);
+ if (ret < 0) {
+ RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
+ return ret;
+ }
+ bucket.pool = pools.back();
+ pools.pop_back();
+ bucket.name = bucket_name;
+
+ ret = register_available_pools(pools);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
static int withdraw_pool(string& pool_name)
{
rgw_obj obj(pi_buckets, avail_pools);
@@ -143,6 +151,37 @@ static int withdraw_pool(string& pool_name)
return rgwstore->tmap_set(obj, pool_name, bl);
}
+int rgw_bucket_maintain_pools()
+{
+ bufferlist header;
+ map<string, bufferlist> m;
+ string pool_name;
+
+ rgw_obj obj(pi_buckets, avail_pools);
+ int ret = rgwstore->tmap_get(obj, header, m);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ }
+
+ if ((int)m.size() < g_conf->rgw_pools_preallocate_threshold) {
+ RGW_LOG(0) << "rgw_bucket_maintain_pools allocating pools (m.size()=" << m.size() << " threshold="
+ << g_conf->rgw_pools_preallocate_threshold << ")" << dendl;
+ vector<string> pools;
+ ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max - m.size());
+ if (ret < 0) {
+ RGW_LOG(0) << "failed to preallocate pools" << dendl;
+ return ret;
+ }
+ ret = register_available_pools(pools);
+ if (ret < 0) {
+ RGW_LOG(0) << "failed to register available pools" << dendl;
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
int rgw_bucket_allocate_pool(string& bucket_name, rgw_bucket& bucket)
{
bufferlist header;
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index 63c339c1402..91706f56455 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -17,6 +17,7 @@ extern int rgw_bucket_allocate_pool(string& bucket_name, rgw_bucket& bucket);
extern int rgw_create_bucket(std::string& id, string& bucket_name, rgw_bucket& bucket,
map<std::string, bufferlist>& attrs, bool exclusive = true, uint64_t auid = 0);
+extern int rgw_bucket_maintain_pools(void);
#endif
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc
index 1ab2219e556..a67d8dd9151 100644
--- a/src/rgw/rgw_main.cc
+++ b/src/rgw/rgw_main.cc
@@ -17,6 +17,7 @@
#include "common/config.h"
#include "common/errno.h"
#include "common/WorkQueue.h"
+#include "common/Timer.h"
#include "rgw_common.h"
#include "rgw_access.h"
#include "rgw_acl.h"
@@ -25,6 +26,7 @@
#include "rgw_rest.h"
#include "rgw_os.h"
#include "rgw_log.h"
+#include "rgw_bucket.h"
#include <map>
#include <string>
@@ -218,6 +220,16 @@ done:
RGW_LOG(0) << "====== req done fcgx=" << hex << fcgx << dec << " http_status=" << http_ret << " ======" << dendl;
}
+class C_RGWMaintenanceTick : public Context {
+ SafeTimer *timer;
+public:
+ C_RGWMaintenanceTick(SafeTimer *t) : timer(t) {}
+ void finish(int r) {
+ rgw_bucket_maintain_pools();
+ RGW_LOG(20) << "C_RGWMaintenanceTick::finish()" << dendl;
+ timer->add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(timer));
+ }
+};
/*
* start up the RADOS connection and then handle HTTP messages as they come in
*/
@@ -255,8 +267,20 @@ int main(int argc, const char **argv)
RGWProcess process(g_ceph_context, g_conf->rgw_thread_pool_size);
+ Mutex lock("rgw_timer_lock");
+ SafeTimer timer(g_ceph_context, lock);
+
+ lock.Lock();
+ timer.init();
+ timer.add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(&timer));
+ lock.Unlock();
+
process.run();
+ lock.Lock();
+ timer.shutdown();
+ lock.Unlock();
+
return 0;
}
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index 2cbaaa70340..bcc96835e29 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -2,10 +2,12 @@
#define CEPH_RGWRADOS_H
#include "include/rados/librados.hpp"
+#include "include/Context.h"
#include "rgw_access.h"
#include "rgw_common.h"
class RGWWatcher;
+class SafeTimer;
struct RGWObjState {
bool is_atomic;
@@ -77,6 +79,19 @@ class RGWRados : public RGWAccess
int set_buckets_auid(vector<rgw_bucket>& buckets, uint64_t auid);
+ Mutex lock;
+ SafeTimer *timer;
+
+ class C_Tick : public Context {
+ RGWRados *rados;
+ public:
+ C_Tick(RGWRados *_r) : rados(_r) {}
+ void finish(int r) {
+ rados->tick();
+ }
+ };
+
+
RGWWatcher *watcher;
uint64_t watch_handle;
librados::IoCtx root_pool_ctx;
@@ -106,7 +121,9 @@ class RGWRados : public RGWAccess
pair<string, bufferlist> *cmp_xattr);
int delete_obj_impl(void *ctx, std::string& id, rgw_obj& src_obj, bool sync);
public:
- RGWRados() : watcher(NULL), watch_handle(0) {}
+ RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0) {}
+
+ void tick();
/** Initialize the RADOS instance and prepare to do other ops */
virtual int initialize(CephContext *cct);