diff options
author | Samuel Just <sam.just@inktank.com> | 2013-08-29 18:16:55 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-09-23 22:54:57 -0700 |
commit | e4fede70f90ce9e6d5c3fd671fe372b0d566e329 (patch) | |
tree | a9342a2e1b107b42c40c24e7e8706ce0755843c1 | |
parent | 545b4982f859fa12529ce4b5d832b62652dac72e (diff) | |
download | ceph-e4fede70f90ce9e6d5c3fd671fe372b0d566e329.tar.gz |
PG,ReplicatedPG: handle do_request in ReplicatedPG,PGBackend
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/PG.cc | 70 | ||||
-rw-r--r-- | src/osd/PG.h | 7 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 8 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 69 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 3 |
5 files changed, 68 insertions, 89 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc index f319d160a39..919d3e3913a 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1399,76 +1399,6 @@ void PG::queue_op(OpRequestRef op) osd->op_wq.queue(make_pair(PGRef(this), op)); } -void PG::do_request( - OpRequestRef op, - ThreadPool::TPHandle &handle) -{ - // do any pending flush - do_pending_flush(); - - if (!op_has_sufficient_caps(op)) { - osd->reply_op_error(op, -EPERM); - return; - } - assert(!op_must_wait_for_map(get_osdmap(), op)); - if (can_discard_request(op)) { - return; - } - if (!flushed) { - dout(20) << " !flushed, waiting for active on " << op << dendl; - waiting_for_active.push_back(op); - return; - } - - switch (op->request->get_type()) { - case CEPH_MSG_OSD_OP: - if (is_replay() || !is_active()) { - dout(20) << " replay, waiting for active on " << op << dendl; - waiting_for_active.push_back(op); - return; - } - do_op(op); // do it now - break; - - case MSG_OSD_SUBOP: - do_sub_op(op); - break; - - case MSG_OSD_SUBOPREPLY: - do_sub_op_reply(op); - break; - - case MSG_OSD_PG_SCAN: - do_scan(op, handle); - break; - - case MSG_OSD_PG_BACKFILL: - do_backfill(op); - break; - - case MSG_OSD_PG_PUSH: - if (!is_active()) { - waiting_for_active.push_back(op); - op->mark_delayed("waiting for active"); - return; - } - do_push(op); - break; - - case MSG_OSD_PG_PULL: - do_pull(op); - break; - - case MSG_OSD_PG_PUSH_REPLY: - do_push_reply(op); - break; - - default: - assert(0 == "bad message type in do_request"); - } -} - - void PG::replay_queued_ops() { assert(is_replay() && is_active()); diff --git a/src/osd/PG.h b/src/osd/PG.h index b869a0e5e23..74809eea268 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1793,10 +1793,10 @@ public: // abstract bits - void do_request( + virtual void do_request( OpRequestRef op, ThreadPool::TPHandle &handle - ); + ) = 0; virtual void do_op(OpRequestRef op) = 0; virtual void do_sub_op(OpRequestRef op) = 0; @@ -1806,9 +1806,6 @@ public: ThreadPool::TPHandle &handle ) = 0; virtual void do_backfill(OpRequestRef op) = 0; - virtual void do_push(OpRequestRef op) = 0; - virtual void do_pull(OpRequestRef op) = 0; - virtual void do_push_reply(OpRequestRef op) = 0; virtual void snap_trimmer() = 0; virtual int do_command(cmdmap_t cmdmap, ostream& ss, diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index d020b18d901..da57630e78b 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -92,12 +92,14 @@ bool ReplicatedBackend::handle_message( // TODOXXX: needs to be active possibly sub_op_push(op); return true; + default: + break; } } break; } - case MSG_OSD_SUBOPREPLY: + case MSG_OSD_SUBOPREPLY: { MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request); if (r->ops.size() >= 1) { OSDOp &first = r->ops[0]; @@ -110,6 +112,10 @@ bool ReplicatedBackend::handle_message( } break; } + + default: + break; + } return false; } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 85ed07845b0..24dc8baf456 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -643,6 +643,62 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo src_oloc.key = oid.name; } +void ReplicatedPG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) +{ + // do any pending flush + do_pending_flush(); + + if (!op_has_sufficient_caps(op)) { + osd->reply_op_error(op, -EPERM); + return; + } + assert(!op_must_wait_for_map(get_osdmap(), op)); + if (can_discard_request(op)) { + return; + } + if (!flushed) { + dout(20) << " !flushed, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + + if (pgbackend->handle_message(op)) + return; + + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + if (is_replay() || !is_active()) { + dout(20) << " replay, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + do_op(op); // do it now + break; + + case MSG_OSD_SUBOP: + do_sub_op(op); + break; + + case MSG_OSD_SUBOPREPLY: + do_sub_op_reply(op); + break; + + case MSG_OSD_PG_SCAN: + do_scan(op, handle); + break; + + case MSG_OSD_PG_BACKFILL: + do_backfill(op); + break; + + default: + assert(0 == "bad message type in do_request"); + } +} + + /** do_op - do an op * pg lock will be held (if multithreaded) * osd_lock NOT held. @@ -1258,11 +1314,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) OSDOp *first = NULL; if (m->ops.size() >= 1) { first = &m->ops[0]; - switch (first->op.op) { - case CEPH_OSD_OP_PULL: - sub_op_pull(op); - return; - } } if (!is_active()) { @@ -1273,9 +1324,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) if (first) { switch (first->op.op) { - case CEPH_OSD_OP_PUSH: - sub_op_push(op); - return; case CEPH_OSD_OP_DELETE: sub_op_remove(op); return; @@ -1304,11 +1352,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) if (r->ops.size() >= 1) { OSDOp& first = r->ops[0]; switch (first.op.op) { - case CEPH_OSD_OP_PUSH: - // continue peer recovery - sub_op_push_reply(op); - return; - case CEPH_OSD_OP_SCRUB_RESERVE: sub_op_scrub_reserve_reply(op); return; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 29fed8ef542..ea371af2ad1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -903,6 +903,9 @@ public: int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata, bufferlist& odata); + void do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle); void do_op(OpRequestRef op); bool pg_op_must_wait(MOSDOp *op); void do_pg_op(OpRequestRef op); |