diff options
author | Samuel Just <sam.just@inktank.com> | 2013-10-01 17:18:17 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-10-03 16:19:00 -0700 |
commit | ffdcbc8f71d68e1ee8f4a4d5c4222fae539eb28a (patch) | |
tree | 316b1d1355c180550cb0fbfb7a750d165913846a | |
parent | 399f1d53f7f441992f48aa72139cd628c4ad4f29 (diff) | |
download | ceph-ffdcbc8f71d68e1ee8f4a4d5c4222fae539eb28a.tar.gz |
ReplicatedPG: block reads on an object until the write is committedwip-6059
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/ReplicatedPG.cc | 50 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 163 |
2 files changed, 191 insertions, 22 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index a661aa7f786..95a10eab361 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -981,21 +981,9 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - if ((op->may_read()) && (obc->obs.oi.lost)) { - // This object is lost. Reading from it returns an error. - dout(20) << __func__ << ": object " << obc->obs.oi.soid - << " is lost" << dendl; - osd->reply_op_error(op, -ENFILE); - return; - } dout(25) << __func__ << ": object " << obc->obs.oi.soid << " has oi of " << obc->obs.oi << dendl; - if (!op->may_write() && !obc->obs.exists) { - osd->reply_op_error(op, -ENOENT); - return; - } - // are writes blocked by another object? if (obc->blocked_by) { dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by " @@ -1114,11 +1102,30 @@ void ReplicatedPG::do_op(OpRequestRef op) } } - op->mark_started(); - OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); + if (!get_rw_locks(ctx)) { + op->mark_delayed("waiting for rw locks"); + close_op_ctx(ctx); + return; + } + + if ((op->may_read()) && (obc->obs.oi.lost)) { + // This object is lost. Reading from it returns an error. + dout(20) << __func__ << ": object " << obc->obs.oi.soid + << " is lost" << dendl; + close_op_ctx(ctx); + osd->reply_op_error(op, -ENFILE); + return; + } + if (!op->may_write() && !obc->obs.exists) { + close_op_ctx(ctx); + osd->reply_op_error(op, -ENOENT); + return; + } + + op->mark_started(); ctx->obc = obc; ctx->src_obc = src_obc; @@ -1195,7 +1202,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) if (already_complete(oldv)) { reply_ctx(ctx, 0, oldv, entry->user_version); } else { - delete ctx; + close_op_ctx(ctx); if (m->wants_ack()) { if (already_ack(oldv)) { @@ -1288,7 +1295,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) if (result == -EAGAIN) { // clean up after the ctx - delete ctx; + close_op_ctx(ctx); return; } @@ -1340,7 +1347,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); osd->send_message_osd_client(reply, m->get_connection()); - delete ctx; + close_op_ctx(ctx); return; } @@ -1388,13 +1395,13 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) void ReplicatedPG::reply_ctx(OpContext *ctx, int r) { osd->reply_op_error(ctx->op, r); - delete ctx; + close_op_ctx(ctx); } void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv) { osd->reply_op_error(ctx->op, r, v, uv); - delete ctx; + close_op_ctx(ctx); } void ReplicatedPG::log_op_stats(OpContext *ctx) @@ -4575,7 +4582,7 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop) kick_object_context_blocked(ctx->obc); - delete ctx; + close_op_ctx(ctx); } void ReplicatedPG::cancel_copy_ops() @@ -4731,6 +4738,8 @@ void ReplicatedPG::op_commit(RepGather *repop) eval_repop(repop); } + release_op_ctx_locks(repop->ctx); + repop->put(); unlock(); } @@ -4973,6 +4982,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe void ReplicatedPG::remove_repop(RepGather *repop) { + release_op_ctx_locks(repop->ctx); repop_map.erase(repop->rep_tid); repop->put(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index abee57ffe7d..15053217d01 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -300,6 +300,8 @@ public: hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking + enum { W_LOCK, R_LOCK, NONE } lock_to_release; + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -313,7 +315,8 @@ public: current_osd_subop_num(0), data_off(0), reply(NULL), pg(_pg), num_read(0), - num_write(0) { + num_write(0), + lock_to_release(NONE) { if (_ssc) { new_snapset = _ssc->snapset; snapset = &_ssc->snapset; @@ -321,6 +324,7 @@ public: } ~OpContext() { assert(!clone_obc); + assert(lock_to_release == NONE); if (reply) reply->put(); } @@ -379,7 +383,7 @@ public: if (--nref == 0) { assert(!obc); assert(src_obc.empty()); - delete ctx; + delete ctx; // must already be unlocked delete this; //generic_dout(0) << "deleting " << this << dendl; } @@ -390,6 +394,161 @@ public: protected: + /// Tracks pending readers or writers on an object + class RWTracker { + struct ObjState { + enum State { + NONE, + READ, + WRITE + }; + State state; /// rw state + uint64_t count; /// number of readers or writers + list<OpRequestRef> waiters; /// ops waiting on state change + + ObjState() : state(NONE), count(0) {} + bool get_read(OpRequestRef op) { + switch (state) { + case NONE: + assert(count == 0); + state = READ; + // fall through + case READ: + count++; + return true; + case WRITE: + waiters.push_back(op); + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + bool get_write(OpRequestRef op) { + switch (state) { + case NONE: + assert(count == 0); + state = WRITE; + // fall through + case WRITE: + count++; + return true; + case READ: + waiters.push_back(op); + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + void dec(list<OpRequestRef> *requeue) { + assert(count > 0); + assert(requeue); + assert(requeue->empty()); + count--; + if (count == 0) { + state = NONE; + requeue->swap(waiters); + } + } + void put_read(list<OpRequestRef> *requeue) { + assert(state == READ); + dec(requeue); + } + void put_write(list<OpRequestRef> *requeue) { + assert(state == WRITE); + dec(requeue); + } + void clear(list<OpRequestRef> *requeue) { + state = NONE; + count = 0; + assert(requeue); + assert(requeue->empty()); + requeue->swap(waiters); + } + bool empty() const { return state == NONE; } + }; + map<hobject_t, ObjState > obj_state; + public: + bool get_read(const hobject_t &hoid, OpRequestRef op) { + return obj_state[hoid].get_read(op); + } + bool get_write(const hobject_t &hoid, OpRequestRef op) { + return obj_state[hoid].get_write(op); + } + void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) { + obj_state[hoid].put_read(to_wake); + if (obj_state[hoid].empty()) { + obj_state.erase(hoid); + } + } + void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) { + obj_state[hoid].put_write(to_wake); + if (obj_state[hoid].empty()) { + obj_state.erase(hoid); + } + } + } rw_manager; + + /** + * Grabs locks for OpContext, should be cleaned up in close_op_ctx + * + * @param ctx [in,out] ctx to get locks for + * @return true on success, false if we are queued + */ + bool get_rw_locks(OpContext *ctx) { + if (ctx->op->may_write()) { + if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) { + ctx->lock_to_release = OpContext::W_LOCK; + return true; + } else { + assert(0 == "Currently there cannot be a read in flight here"); + return false; + } + } else { + assert(ctx->op->may_read()); + if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) { + ctx->lock_to_release = OpContext::R_LOCK; + return true; + } else { + return false; + } + } + } + + /** + * Cleans up OpContext + * + * @param ctx [in] ctx to clean up + */ + void close_op_ctx(OpContext *ctx) { + release_op_ctx_locks(ctx); + delete ctx; + } + + /** + * Releases ctx locks + * + * @param ctx [in] ctx to clean up + */ + void release_op_ctx_locks(OpContext *ctx) { + list<OpRequestRef> to_req; + switch (ctx->lock_to_release) { + case OpContext::W_LOCK: + rw_manager.put_write(ctx->obs->oi.soid, &to_req); + break; + case OpContext::R_LOCK: + rw_manager.put_read(ctx->obs->oi.soid, &to_req); + break; + case OpContext::NONE: + break; + default: + assert(0); + }; + ctx->lock_to_release = OpContext::NONE; + requeue_ops(to_req); + } + // replica ops // [primary|tail] xlist<RepGather*> repop_queue; |