diff options
author | Sage Weil <sage@inktank.com> | 2012-12-02 07:29:46 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-12-02 14:30:32 -0800 |
commit | 528108485be7912069087822e5b7a1a2f1dd515e (patch) | |
tree | 2f070a1ba08272905bf98c2866e969167135119a | |
parent | c10958e4fb32de49166c5ab09b59e83521672927 (diff) | |
download | ceph-528108485be7912069087822e5b7a1a2f1dd515e.tar.gz |
os/FileStore: only wait for applying ops to complete before commit
We can have a large number of operations in the op_wq waiting to be applied
to the fs. Currently, when we want to commit, we want for them *all* to
apply. This can take a very long time (the default queue length is 500
operations!).
Instead, mark an Op as started ("applying") when the thread pool actually
starts to apply it. At that point, only wait for applying ops to complete.
We let any threads with an op seq < max_applying_seq begin as well so that
we have a proper ordering/barrier. When those flush, applied_seq will ==
max_applying_seq, and that becomes the committing_seq value.
Note that 'applied_seq' is still maintain, but serves no real purpose
except to populate our asserts with sanity checks. max_applying_seq serves
the purpose applied_seq used to.
This removes once unnecessary source of latency associated with fs
commits.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/os/FileStore.cc | 13 | ||||
-rw-r--r-- | src/os/JournalingObjectStore.cc | 27 | ||||
-rw-r--r-- | src/os/JournalingObjectStore.h | 4 |
3 files changed, 25 insertions, 19 deletions
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 3feb92924a9..2c66a5ea7db 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls, void FileStore::queue_op(OpSequencer *osr, Op *o) { - // mark apply start _now_, because we need to drain the entire apply - // queue during commit in order to put the store in a consistent - // state. - apply_manager.op_apply_start(o->op); + // queue op on sequencer, then queue sequencer for the threadpool, + // so that regardless of which order the threads pick up the + // sequencer, the op order will be preserved. osr->queue(o); @@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr) { osr->apply_lock.Lock(); Op *o = osr->peek_queue(); - + apply_manager.op_apply_start(o->op); dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl; int r = do_transactions(o->tls, o->op); apply_manager.op_apply_finish(o->op); dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; - - /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " - << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; - */ } void FileStore::_finish_op(OpSequencer *osr) diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 42b95c96a58..3f05fa0fb16 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) Mutex::Locker l(apply_lock); // if we ops are blocked, or there are already people (left) in // line, get in line. - if (blocked || !ops_apply_blocked.empty()) { + if (op > max_applying_seq && + (blocked || !ops_apply_blocked.empty())) { Cond cond; ops_apply_blocked.push_back(&cond); dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; @@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) ops_apply_blocked.front()->Signal(); } } - dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl; - assert(!blocked); + dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) + << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl; + if (op > max_applying_seq) + max_applying_seq = op; + assert(op > committed_seq); open_ops++; return op; } @@ -136,7 +140,10 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) { Mutex::Locker l(apply_lock); dout(10) << "op_apply_finish " << op << " open_ops " << open_ops - << " -> " << (open_ops-1) << dendl; + << " -> " << (open_ops-1) + << ", max_applying_seq " << max_applying_seq + << ", applied_seq " << applied_seq << " -> " << MAX(op, applied_seq) + << dendl; if (--open_ops == 0) open_ops_cond.Signal(); @@ -185,15 +192,17 @@ bool JournalingObjectStore::ApplyManager::commit_start() { Mutex::Locker l(apply_lock); - dout(10) << "commit_start " - << ", applied_seq " << applied_seq << dendl; + dout(10) << "commit_start max_applying_seq " << max_applying_seq + << ", applied_seq " << applied_seq + << dendl; blocked = true; while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; + dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, " + << " max_applying_seq " << max_applying_seq << " applied_seq " << applied_seq << dendl; open_ops_cond.Wait(apply_lock); } assert(open_ops == 0); - + assert(applied_seq == max_applying_seq); dout(10) << "commit_start blocked, all open_ops have completed" << dendl; { Mutex::Locker l(com_lock); @@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start() goto out; } - committing_seq = applied_seq; + committing_seq = max_applying_seq; dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl; diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index dff49d43cbb..28665799a8c 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -54,6 +54,7 @@ protected: Cond blocked_cond; int open_ops; Cond open_ops_cond; + uint64_t max_applying_seq; uint64_t applied_seq; Mutex com_lock; @@ -68,6 +69,7 @@ protected: apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context), blocked(false), open_ops(0), + max_applying_seq(0), applied_seq(0), com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context), committing_seq(0), committed_seq(0) {} @@ -97,7 +99,7 @@ protected: } { Mutex::Locker l(apply_lock); - applied_seq = fs_op_seq; + max_applying_seq = applied_seq = fs_op_seq; } } } apply_manager; |