summaryrefslogtreecommitdiff
path: root/src/os/JournalingObjectStore.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/os/JournalingObjectStore.cc')
-rw-r--r--src/os/JournalingObjectStore.cc53
1 files changed, 10 insertions, 43 deletions
diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc
index b1aee62eca8..b91b6dc207c 100644
--- a/src/os/JournalingObjectStore.cc
+++ b/src/os/JournalingObjectStore.cc
@@ -109,28 +109,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
Mutex::Locker l(apply_lock);
- // if we ops are blocked, or there are already people (left) in
- // line, get in line.
- if (op > max_applying_seq &&
- (blocked || !ops_apply_blocked.empty())) {
- Cond cond;
- ops_apply_blocked.push_back(&cond);
- dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
- // sleep until we are not blocked AND we are at the front of line
- while (blocked || ops_apply_blocked.front() != &cond)
- cond.Wait(apply_lock);
- dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
- ops_apply_blocked.pop_front();
- if (!ops_apply_blocked.empty()) {
- dout(10) << "op_apply_start " << op << " ...and kicking next in line" << dendl;
- ops_apply_blocked.front()->Signal();
- }
- }
- dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
- << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
-
- if (op > max_applying_seq)
- max_applying_seq = op;
+ dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
+ assert(!blocked);
assert(op > committed_seq);
open_ops++;
return op;
@@ -141,11 +121,10 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
Mutex::Locker l(apply_lock);
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
<< " -> " << (open_ops-1)
- << ", max_applying_seq " << max_applying_seq
<< ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
<< dendl;
- if (--open_ops == 0)
- open_ops_cond.Signal();
+ --open_ops;
+ assert(open_ops >= 0);
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
@@ -159,20 +138,18 @@ uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
lock.Lock();
uint64_t op = ++op_seq;
dout(10) << "op_submit_start " << op << dendl;
- ops_submitting.push_back(op);
return op;
}
void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
{
dout(10) << "op_submit_finish " << op << dendl;
- if (op != ops_submitting.front()) {
- dout(0) << "op_submit_finish " << op << " expected "
- << ops_submitting.front()
+ if (op != op_submitted + 1) {
+ dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
<< ", OUT OF ORDER" << dendl;
assert(0 == "out of order op_submit_finish");
}
- ops_submitting.pop_front();
+ op_submitted = op;
lock.Unlock();
}
@@ -192,30 +169,22 @@ bool JournalingObjectStore::ApplyManager::commit_start()
{
Mutex::Locker l(apply_lock);
- dout(10) << "commit_start max_applying_seq " << max_applying_seq
- << ", max_applied_seq " << max_applied_seq
+ dout(10) << "commit_start max_applied_seq " << max_applied_seq
+ << ", open_ops " << open_ops
<< dendl;
blocked = true;
- while (open_ops > 0) {
- dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
- << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
- open_ops_cond.Wait(apply_lock);
- }
assert(open_ops == 0);
- assert(max_applied_seq == max_applying_seq);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
Mutex::Locker l(com_lock);
if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
- if (!ops_apply_blocked.empty())
- ops_apply_blocked.front()->Signal();
assert(commit_waiters.empty());
goto out;
}
- committing_seq = max_applying_seq;
+ committing_seq = max_applied_seq;
dout(10) << "commit_start committing " << committing_seq
<< ", still blocked" << dendl;
@@ -235,8 +204,6 @@ void JournalingObjectStore::ApplyManager::commit_started()
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
- if (!ops_apply_blocked.empty())
- ops_apply_blocked.front()->Signal();
}
void JournalingObjectStore::ApplyManager::commit_finish()