diff options
author | Sage Weil <sage@inktank.com> | 2013-01-21 15:29:28 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-01-22 14:47:41 -0800 |
commit | 6e3363b20e590cd9df89f2caebe71867b94cc291 (patch) | |
tree | 0afcb24dfb262966d0024a2d1d168e87b206ce9f | |
parent | c549a0cf6fae78c8418a3b4b0702fd8a1e4ce482 (diff) | |
download | ceph-6e3363b20e590cd9df89f2caebe71867b94cc291.tar.gz |
common/PrioritizedQueue: add min cost, max tokens per bucket
Two problems.
First, we need to cap the tokens per bucket. Otherwise, a stream of
items at one priority over time will indefinitely inflate the tokens
available at another priority. The cap should represent how "bursty"
we allow a given bucket to be. Start with 4MB for now.
Second, set a floor on the item cost. Otherwise, we can have an
infinite queue of 0 cost items that start over queues. More
realistically, we need to balance the overhead of processing small items
with the cost of large items. I.e., a 4 KB item is not 1/1000th as
expensive as a 4MB item.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/common/PrioritizedQueue.h | 45 | ||||
-rw-r--r-- | src/common/config_opts.h | 4 | ||||
-rw-r--r-- | src/msg/DispatchQueue.h | 2 | ||||
-rw-r--r-- | src/osd/OSD.h | 5 |
4 files changed, 50 insertions, 6 deletions
diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 6ada0660124..072eaebf2ee 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -45,6 +45,8 @@ template <typename T, typename K> class PrioritizedQueue { int64_t total_priority; + int64_t max_tokens_per_subqueue; + int64_t min_cost; template <class F> static unsigned filter_list_pairs( @@ -76,22 +78,39 @@ class PrioritizedQueue { struct SubQueue { private: map<K, list<pair<unsigned, T> > > q; - unsigned tokens; + unsigned tokens, max_tokens; int64_t size; typename map<K, list<pair<unsigned, T> > >::iterator cur; public: SubQueue(const SubQueue &other) - : q(other.q), tokens(other.tokens), size(other.size), + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), cur(q.begin()) {} - SubQueue() : tokens(0), size(0), cur(q.begin()) {} + SubQueue() + : tokens(0), + max_tokens(0), + size(0) {} + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + unsigned get_max_tokens() const { + return max_tokens; + } unsigned num_tokens() const { return tokens; } void put_tokens(unsigned t) { tokens += t; + if (tokens > max_tokens) + tokens = max_tokens; } void take_tokens(unsigned t) { + if (tokens > t) tokens -= t; + else + tokens = 0; } void enqueue(K cl, unsigned cost, T item) { q[cl].push_back(make_pair(cost, item)); @@ -175,7 +194,9 @@ class PrioritizedQueue { if (p != queue.end()) return &p->second; total_priority += priority; - return &queue[priority]; + SubQueue *sq = &queue[priority]; + sq->set_max_tokens(max_tokens_per_subqueue); + return sq; } void remove_queue(unsigned priority) { @@ -196,7 +217,11 @@ class PrioritizedQueue { } public: - PrioritizedQueue() : total_priority(0) {} + PrioritizedQueue(unsigned max_per, unsigned min_c) + : total_priority(0), + max_tokens_per_subqueue(max_per), + min_cost(min_c) + {} unsigned length() { unsigned total = 0; @@ -276,10 +301,14 @@ public: } void enqueue(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue(cl, cost, item); } void enqueue_front(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue_front(cl, cost, item); } @@ -300,6 +329,9 @@ public: return ret; } + // if there are multiple buckets/subqueues with sufficient tokens, + // we behave like a strict priority queue among all subqueues that + // are eligible to run. for (typename map<unsigned, SubQueue>::iterator i = queue.begin(); i != queue.end(); ++i) { @@ -315,6 +347,9 @@ public: return ret; } } + + // if no subqueues have sufficient tokens, we behave like a strict + // priority queue. T ret = queue.rbegin()->second.front().second; unsigned cost = queue.rbegin()->second.front().first; queue.rbegin()->second.pop_front(); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 32bd41c6ce3..7eb3e94fadc 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800) OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) +OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(ms_pq_min_cost, OPT_U64, 65536) OPTION(ms_inject_socket_failures, OPT_U64, 0) OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds @@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true) OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading +OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) OPTION(osd_recovery_threads, OPT_INT, 1) OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index ea44c165d56..884e0269342 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -151,6 +151,8 @@ class DispatchQueue { DispatchQueue(CephContext *cct, SimpleMessenger *msgr) : cct(cct), msgr(msgr), lock("SimpleMessenger::DispatchQeueu::lock"), + mqueue(cct->_conf->ms_pq_max_tokens_per_priority, + cct->_conf->ms_pq_min_cost), next_pipe_id(1), dispatch_thread(this), stop(false) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3d5e51523ea..1af32d73883 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -642,7 +642,10 @@ private: : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >( "OSD::OpWQ", ti, ti*10, tp), qlock("OpWQ::qlock"), - osd(o) {} + osd(o), + pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority, + o->cct->_conf->osd_op_pq_min_cost) + {} void _enqueue_front(pair<PGRef, OpRequestRef> item); void _enqueue(pair<PGRef, OpRequestRef> item); |