summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-01-21 15:29:28 -0800
committerSage Weil <sage@inktank.com>2013-01-22 14:47:41 -0800
commit6e3363b20e590cd9df89f2caebe71867b94cc291 (patch)
tree0afcb24dfb262966d0024a2d1d168e87b206ce9f
parentc549a0cf6fae78c8418a3b4b0702fd8a1e4ce482 (diff)
downloadceph-6e3363b20e590cd9df89f2caebe71867b94cc291.tar.gz
common/PrioritizedQueue: add min cost, max tokens per bucket
Two problems. First, we need to cap the tokens per bucket. Otherwise, a stream of items at one priority over time will indefinitely inflate the tokens available at another priority. The cap should represent how "bursty" we allow a given bucket to be. Start with 4MB for now. Second, set a floor on the item cost. Otherwise, we can have an infinite queue of 0 cost items that start over queues. More realistically, we need to balance the overhead of processing small items with the cost of large items. I.e., a 4 KB item is not 1/1000th as expensive as a 4MB item. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/common/PrioritizedQueue.h45
-rw-r--r--src/common/config_opts.h4
-rw-r--r--src/msg/DispatchQueue.h2
-rw-r--r--src/osd/OSD.h5
4 files changed, 50 insertions, 6 deletions
diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h
index 6ada0660124..072eaebf2ee 100644
--- a/src/common/PrioritizedQueue.h
+++ b/src/common/PrioritizedQueue.h
@@ -45,6 +45,8 @@
template <typename T, typename K>
class PrioritizedQueue {
int64_t total_priority;
+ int64_t max_tokens_per_subqueue;
+ int64_t min_cost;
template <class F>
static unsigned filter_list_pairs(
@@ -76,22 +78,39 @@ class PrioritizedQueue {
struct SubQueue {
private:
map<K, list<pair<unsigned, T> > > q;
- unsigned tokens;
+ unsigned tokens, max_tokens;
int64_t size;
typename map<K, list<pair<unsigned, T> > >::iterator cur;
public:
SubQueue(const SubQueue &other)
- : q(other.q), tokens(other.tokens), size(other.size),
+ : q(other.q),
+ tokens(other.tokens),
+ max_tokens(other.max_tokens),
+ size(other.size),
cur(q.begin()) {}
- SubQueue() : tokens(0), size(0), cur(q.begin()) {}
+ SubQueue()
+ : tokens(0),
+ max_tokens(0),
+ size(0) {}
+ void set_max_tokens(unsigned mt) {
+ max_tokens = mt;
+ }
+ unsigned get_max_tokens() const {
+ return max_tokens;
+ }
unsigned num_tokens() const {
return tokens;
}
void put_tokens(unsigned t) {
tokens += t;
+ if (tokens > max_tokens)
+ tokens = max_tokens;
}
void take_tokens(unsigned t) {
+ if (tokens > t)
tokens -= t;
+ else
+ tokens = 0;
}
void enqueue(K cl, unsigned cost, T item) {
q[cl].push_back(make_pair(cost, item));
@@ -175,7 +194,9 @@ class PrioritizedQueue {
if (p != queue.end())
return &p->second;
total_priority += priority;
- return &queue[priority];
+ SubQueue *sq = &queue[priority];
+ sq->set_max_tokens(max_tokens_per_subqueue);
+ return sq;
}
void remove_queue(unsigned priority) {
@@ -196,7 +217,11 @@ class PrioritizedQueue {
}
public:
- PrioritizedQueue() : total_priority(0) {}
+ PrioritizedQueue(unsigned max_per, unsigned min_c)
+ : total_priority(0),
+ max_tokens_per_subqueue(max_per),
+ min_cost(min_c)
+ {}
unsigned length() {
unsigned total = 0;
@@ -276,10 +301,14 @@ public:
}
void enqueue(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue(cl, cost, item);
}
void enqueue_front(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue_front(cl, cost, item);
}
@@ -300,6 +329,9 @@ public:
return ret;
}
+ // if there are multiple buckets/subqueues with sufficient tokens,
+ // we behave like a strict priority queue among all subqueues that
+ // are eligible to run.
for (typename map<unsigned, SubQueue>::iterator i = queue.begin();
i != queue.end();
++i) {
@@ -315,6 +347,9 @@ public:
return ret;
}
}
+
+ // if no subqueues have sufficient tokens, we behave like a strict
+ // priority queue.
T ret = queue.rbegin()->second.front().second;
unsigned cost = queue.rbegin()->second.front().first;
queue.rbegin()->second.pop_front();
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 32bd41c6ce3..7eb3e94fadc 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800)
OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
+OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(ms_pq_min_cost, OPT_U64, 65536)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
@@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true)
OPTION(osd_map_cache_size, OPT_INT, 500)
OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message
OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading
+OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
OPTION(osd_disk_threads, OPT_INT, 1)
OPTION(osd_recovery_threads, OPT_INT, 1)
OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
index ea44c165d56..884e0269342 100644
--- a/src/msg/DispatchQueue.h
+++ b/src/msg/DispatchQueue.h
@@ -151,6 +151,8 @@ class DispatchQueue {
DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
: cct(cct), msgr(msgr),
lock("SimpleMessenger::DispatchQeueu::lock"),
+ mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
+ cct->_conf->ms_pq_min_cost),
next_pipe_id(1),
dispatch_thread(this),
stop(false)
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 3d5e51523ea..1af32d73883 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -642,7 +642,10 @@ private:
: ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
"OSD::OpWQ", ti, ti*10, tp),
qlock("OpWQ::qlock"),
- osd(o) {}
+ osd(o),
+ pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ o->cct->_conf->osd_op_pq_min_cost)
+ {}
void _enqueue_front(pair<PGRef, OpRequestRef> item);
void _enqueue(pair<PGRef, OpRequestRef> item);