diff options
Diffstat (limited to 'src/os/JournalingObjectStore.cc')
-rw-r--r-- | src/os/JournalingObjectStore.cc | 53 |
1 files changed, 10 insertions, 43 deletions
diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index b1aee62eca8..b91b6dc207c 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -109,28 +109,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) { Mutex::Locker l(apply_lock); - // if we ops are blocked, or there are already people (left) in - // line, get in line. - if (op > max_applying_seq && - (blocked || !ops_apply_blocked.empty())) { - Cond cond; - ops_apply_blocked.push_back(&cond); - dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; - // sleep until we are not blocked AND we are at the front of line - while (blocked || ops_apply_blocked.front() != &cond) - cond.Wait(apply_lock); - dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl; - ops_apply_blocked.pop_front(); - if (!ops_apply_blocked.empty()) { - dout(10) << "op_apply_start " << op << " ...and kicking next in line" << dendl; - ops_apply_blocked.front()->Signal(); - } - } - dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) - << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl; - - if (op > max_applying_seq) - max_applying_seq = op; + dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl; + assert(!blocked); assert(op > committed_seq); open_ops++; return op; @@ -141,11 +121,10 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) Mutex::Locker l(apply_lock); dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) - << ", max_applying_seq " << max_applying_seq << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq) << dendl; - if (--open_ops == 0) - open_ops_cond.Signal(); + --open_ops; + assert(open_ops >= 0); // there can be multiple applies in flight; track the max value we // note. note that we can't _read_ this value and learn anything @@ -159,20 +138,18 @@ uint64_t JournalingObjectStore::SubmitManager::op_submit_start() lock.Lock(); uint64_t op = ++op_seq; dout(10) << "op_submit_start " << op << dendl; - ops_submitting.push_back(op); return op; } void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op) { dout(10) << "op_submit_finish " << op << dendl; - if (op != ops_submitting.front()) { - dout(0) << "op_submit_finish " << op << " expected " - << ops_submitting.front() + if (op != op_submitted + 1) { + dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1) << ", OUT OF ORDER" << dendl; assert(0 == "out of order op_submit_finish"); } - ops_submitting.pop_front(); + op_submitted = op; lock.Unlock(); } @@ -192,30 +169,22 @@ bool JournalingObjectStore::ApplyManager::commit_start() { Mutex::Locker l(apply_lock); - dout(10) << "commit_start max_applying_seq " << max_applying_seq - << ", max_applied_seq " << max_applied_seq + dout(10) << "commit_start max_applied_seq " << max_applied_seq + << ", open_ops " << open_ops << dendl; blocked = true; - while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, " - << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl; - open_ops_cond.Wait(apply_lock); - } assert(open_ops == 0); - assert(max_applied_seq == max_applying_seq); dout(10) << "commit_start blocked, all open_ops have completed" << dendl; { Mutex::Locker l(com_lock); if (max_applied_seq == committed_seq) { dout(10) << "commit_start nothing to do" << dendl; blocked = false; - if (!ops_apply_blocked.empty()) - ops_apply_blocked.front()->Signal(); assert(commit_waiters.empty()); goto out; } - committing_seq = max_applying_seq; + committing_seq = max_applied_seq; dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl; @@ -235,8 +204,6 @@ void JournalingObjectStore::ApplyManager::commit_started() // allow new ops. (underlying fs should now be committing all prior ops) dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl; blocked = false; - if (!ops_apply_blocked.empty()) - ops_apply_blocked.front()->Signal(); } void JournalingObjectStore::ApplyManager::commit_finish() |