summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-12-04 15:05:18 -0800
committerSage Weil <sage@inktank.com>2012-12-04 15:05:18 -0800
commit3ef741ac2dfb9ad8f3086051e452e9a2bf43fd4e (patch)
tree71365d27c2859759901d07500554355ae7c25f81
parentf3bd3564fa26efa5c5d6664464d247fe974db902 (diff)
parent85574a36226611ccf0fb7591fd275a2bdcca2bad (diff)
downloadceph-3ef741ac2dfb9ad8f3086051e452e9a2bf43fd4e.tar.gz
Merge branch 'wip-filestore' into next
Reviewed-by: Sam Just <sam.just@inktank.com>
-rw-r--r--src/os/FileStore.cc13
-rw-r--r--src/os/JournalingObjectStore.cc33
-rw-r--r--src/os/JournalingObjectStore.h8
3 files changed, 30 insertions, 24 deletions
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 3feb92924a9..2c66a5ea7db 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
void FileStore::queue_op(OpSequencer *osr, Op *o)
{
- // mark apply start _now_, because we need to drain the entire apply
- // queue during commit in order to put the store in a consistent
- // state.
- apply_manager.op_apply_start(o->op);
+ // queue op on sequencer, then queue sequencer for the threadpool,
+ // so that regardless of which order the threads pick up the
+ // sequencer, the op order will be preserved.
osr->queue(o);
@@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr)
{
osr->apply_lock.Lock();
Op *o = osr->peek_queue();
-
+ apply_manager.op_apply_start(o->op);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
-
- /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
- << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
- */
}
void FileStore::_finish_op(OpSequencer *osr)
diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc
index 42b95c96a58..b1aee62eca8 100644
--- a/src/os/JournalingObjectStore.cc
+++ b/src/os/JournalingObjectStore.cc
@@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
Mutex::Locker l(apply_lock);
// if we ops are blocked, or there are already people (left) in
// line, get in line.
- if (blocked || !ops_apply_blocked.empty()) {
+ if (op > max_applying_seq &&
+ (blocked || !ops_apply_blocked.empty())) {
Cond cond;
ops_apply_blocked.push_back(&cond);
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
@@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
ops_apply_blocked.front()->Signal();
}
}
- dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
- assert(!blocked);
+ dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
+ << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
+ if (op > max_applying_seq)
+ max_applying_seq = op;
+ assert(op > committed_seq);
open_ops++;
return op;
}
@@ -136,15 +140,18 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
Mutex::Locker l(apply_lock);
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
- << " -> " << (open_ops-1) << dendl;
+ << " -> " << (open_ops-1)
+ << ", max_applying_seq " << max_applying_seq
+ << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
+ << dendl;
if (--open_ops == 0)
open_ops_cond.Signal();
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
- if (op > applied_seq)
- applied_seq = op;
+ if (op > max_applied_seq)
+ max_applied_seq = op;
}
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
@@ -185,19 +192,21 @@ bool JournalingObjectStore::ApplyManager::commit_start()
{
Mutex::Locker l(apply_lock);
- dout(10) << "commit_start "
- << ", applied_seq " << applied_seq << dendl;
+ dout(10) << "commit_start max_applying_seq " << max_applying_seq
+ << ", max_applied_seq " << max_applied_seq
+ << dendl;
blocked = true;
while (open_ops > 0) {
- dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+ dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
+ << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
open_ops_cond.Wait(apply_lock);
}
assert(open_ops == 0);
-
+ assert(max_applied_seq == max_applying_seq);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
Mutex::Locker l(com_lock);
- if (applied_seq == committed_seq) {
+ if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
if (!ops_apply_blocked.empty())
@@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start()
goto out;
}
- committing_seq = applied_seq;
+ committing_seq = max_applying_seq;
dout(10) << "commit_start committing " << committing_seq
<< ", still blocked" << dendl;
diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h
index dff49d43cbb..ae74c32cd25 100644
--- a/src/os/JournalingObjectStore.h
+++ b/src/os/JournalingObjectStore.h
@@ -54,7 +54,8 @@ protected:
Cond blocked_cond;
int open_ops;
Cond open_ops_cond;
- uint64_t applied_seq;
+ uint64_t max_applying_seq;
+ uint64_t max_applied_seq;
Mutex com_lock;
map<version_t, vector<Context*> > commit_waiters;
@@ -68,7 +69,8 @@ protected:
apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context),
blocked(false),
open_ops(0),
- applied_seq(0),
+ max_applying_seq(0),
+ max_applied_seq(0),
com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context),
committing_seq(0), committed_seq(0) {}
void add_waiter(uint64_t, Context*);
@@ -97,7 +99,7 @@ protected:
}
{
Mutex::Locker l(apply_lock);
- applied_seq = fs_op_seq;
+ max_applying_seq = max_applied_seq = fs_op_seq;
}
}
} apply_manager;