summaryrefslogtreecommitdiff
path: root/src/osd/ReplicatedPG.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedPG.h')
-rw-r--r--src/osd/ReplicatedPG.h163
1 files changed, 161 insertions, 2 deletions
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index abee57ffe7d..15053217d01 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -300,6 +300,8 @@ public:
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
+ enum { W_LOCK, R_LOCK, NONE } lock_to_release;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
@@ -313,7 +315,8 @@ public:
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
- num_write(0) {
+ num_write(0),
+ lock_to_release(NONE) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;
@@ -321,6 +324,7 @@ public:
}
~OpContext() {
assert(!clone_obc);
+ assert(lock_to_release == NONE);
if (reply)
reply->put();
}
@@ -379,7 +383,7 @@ public:
if (--nref == 0) {
assert(!obc);
assert(src_obc.empty());
- delete ctx;
+ delete ctx; // must already be unlocked
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
@@ -390,6 +394,161 @@ public:
protected:
+ /// Tracks pending readers or writers on an object
+ class RWTracker {
+ struct ObjState {
+ enum State {
+ NONE,
+ READ,
+ WRITE
+ };
+ State state; /// rw state
+ uint64_t count; /// number of readers or writers
+ list<OpRequestRef> waiters; /// ops waiting on state change
+
+ ObjState() : state(NONE), count(0) {}
+ bool get_read(OpRequestRef op) {
+ switch (state) {
+ case NONE:
+ assert(count == 0);
+ state = READ;
+ // fall through
+ case READ:
+ count++;
+ return true;
+ case WRITE:
+ waiters.push_back(op);
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_write(OpRequestRef op) {
+ switch (state) {
+ case NONE:
+ assert(count == 0);
+ state = WRITE;
+ // fall through
+ case WRITE:
+ count++;
+ return true;
+ case READ:
+ waiters.push_back(op);
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ void dec(list<OpRequestRef> *requeue) {
+ assert(count > 0);
+ assert(requeue);
+ assert(requeue->empty());
+ count--;
+ if (count == 0) {
+ state = NONE;
+ requeue->swap(waiters);
+ }
+ }
+ void put_read(list<OpRequestRef> *requeue) {
+ assert(state == READ);
+ dec(requeue);
+ }
+ void put_write(list<OpRequestRef> *requeue) {
+ assert(state == WRITE);
+ dec(requeue);
+ }
+ void clear(list<OpRequestRef> *requeue) {
+ state = NONE;
+ count = 0;
+ assert(requeue);
+ assert(requeue->empty());
+ requeue->swap(waiters);
+ }
+ bool empty() const { return state == NONE; }
+ };
+ map<hobject_t, ObjState > obj_state;
+ public:
+ bool get_read(const hobject_t &hoid, OpRequestRef op) {
+ return obj_state[hoid].get_read(op);
+ }
+ bool get_write(const hobject_t &hoid, OpRequestRef op) {
+ return obj_state[hoid].get_write(op);
+ }
+ void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+ obj_state[hoid].put_read(to_wake);
+ if (obj_state[hoid].empty()) {
+ obj_state.erase(hoid);
+ }
+ }
+ void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+ obj_state[hoid].put_write(to_wake);
+ if (obj_state[hoid].empty()) {
+ obj_state.erase(hoid);
+ }
+ }
+ } rw_manager;
+
+ /**
+ * Grabs locks for OpContext, should be cleaned up in close_op_ctx
+ *
+ * @param ctx [in,out] ctx to get locks for
+ * @return true on success, false if we are queued
+ */
+ bool get_rw_locks(OpContext *ctx) {
+ if (ctx->op->may_write()) {
+ if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) {
+ ctx->lock_to_release = OpContext::W_LOCK;
+ return true;
+ } else {
+ assert(0 == "Currently there cannot be a read in flight here");
+ return false;
+ }
+ } else {
+ assert(ctx->op->may_read());
+ if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) {
+ ctx->lock_to_release = OpContext::R_LOCK;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Cleans up OpContext
+ *
+ * @param ctx [in] ctx to clean up
+ */
+ void close_op_ctx(OpContext *ctx) {
+ release_op_ctx_locks(ctx);
+ delete ctx;
+ }
+
+ /**
+ * Releases ctx locks
+ *
+ * @param ctx [in] ctx to clean up
+ */
+ void release_op_ctx_locks(OpContext *ctx) {
+ list<OpRequestRef> to_req;
+ switch (ctx->lock_to_release) {
+ case OpContext::W_LOCK:
+ rw_manager.put_write(ctx->obs->oi.soid, &to_req);
+ break;
+ case OpContext::R_LOCK:
+ rw_manager.put_read(ctx->obs->oi.soid, &to_req);
+ break;
+ case OpContext::NONE:
+ break;
+ default:
+ assert(0);
+ };
+ ctx->lock_to_release = OpContext::NONE;
+ requeue_ops(to_req);
+ }
+
// replica ops
// [primary|tail]
xlist<RepGather*> repop_queue;