diff options
author | Samuel Just <sam.just@inktank.com> | 2013-09-09 15:41:10 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-09-19 12:50:46 -0700 |
commit | dec9ad6f78213debfb0dbd7aa5e8ab7dc09ec95f (patch) | |
tree | 1b664c01018336ffdfd368ef2750b7afa30000f9 | |
parent | 2beaa6421633fc5770bcebec66eb73c0e8a099fa (diff) | |
download | ceph-dec9ad6f78213debfb0dbd7aa5e8ab7dc09ec95f.tar.gz |
ReplicatedBackend: wire in start_pushes
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 6 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.h | 13 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 62 |
3 files changed, 71 insertions, 10 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 6193f2e0e78..59ce9bbcceb 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -62,7 +62,11 @@ void ReplicatedBackend::recover_object( } else { assert(obc); assert(head); - // TODOSAM: handle recovering replicas + int started = start_pushes( + hoid, + obc, + h); + assert(started > 0); } } diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index e52c65fcbd0..ae33b3f5588 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -19,11 +19,13 @@ #include "PGBackend.h" #include "osd_types.h" +struct C_ReplicatedBackend_OnPullComplete; class ReplicatedBackend : public PGBackend { struct RPGHandle : public PGBackend::RecoveryHandle { map<int, vector<PushOp> > pushes; map<int, vector<PullOp> > pulls; }; + friend struct C_ReplicatedBackend_OnPullComplete; private: bool temp_created; const coll_t temp_coll; @@ -42,9 +44,12 @@ public: ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd); /// @see PGBackend::open_recovery_op - PGBackend::RecoveryHandle *open_recovery_op() { + RPGHandle *_open_recovery_op() { return new RPGHandle(); } + PGBackend::RecoveryHandle *open_recovery_op() { + return _open_recovery_op(); + } /// @see PGBackend::run_recovery_op void run_recovery_op( @@ -223,8 +228,10 @@ private: bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply); void handle_pull(int peer, PullOp &op, PushOp *reply); - bool handle_pull_response(int from, PushOp &op, PullOp *response, - ObjectStore::Transaction *t); + bool handle_pull_response( + int from, PushOp &op, PullOp *response, + list<pair<hobject_t, ObjectContextRef> > *to_continue, + ObjectStore::Transaction *t); void handle_push(int from, PushOp &op, PushReplyOp *response, ObjectStore::Transaction *t); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 40800f00e38..a92a5879894 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1603,6 +1603,28 @@ void ReplicatedBackend::_do_push(OpRequestRef op) get_parent()->queue_transaction(t); } +struct C_ReplicatedBackend_OnPullComplete : Context { + ReplicatedBackend *bc; + list<pair<hobject_t, ObjectContextRef> > to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(int) { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (list<pair<hobject_t, ObjectContextRef> >::iterator i = + to_continue.begin(); + i != to_continue.end(); + ++i) { + if (!bc->start_pushes(i->first, i->second, h)) { + bc->get_parent()->on_global_recover( + i->first); + } + } + bc->run_recovery_op(h, priority); + } +}; + void ReplicatedBackend::_do_pull_response(OpRequestRef op) { MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request); @@ -1611,13 +1633,23 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op) vector<PullOp> replies(1); ObjectStore::Transaction *t = new ObjectStore::Transaction; + list<pair<hobject_t, ObjectContextRef> > to_continue; for (vector<PushOp>::iterator i = m->pushes.begin(); i != m->pushes.end(); ++i) { - bool more = handle_pull_response(from, *i, &(replies.back()), t); + bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t); if (more) replies.push_back(PullOp()); } + if (!to_continue.empty()) { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + m->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + get_parent()->bless_context(c)); + } replies.erase(replies.end() - 1); if (replies.size()) { @@ -6273,7 +6305,9 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove bool ReplicatedBackend::handle_pull_response( int from, PushOp &pop, PullOp *response, - ObjectStore::Transaction *t) + list<pair<hobject_t, ObjectContextRef> > *to_continue, + ObjectStore::Transaction *t + ) { interval_set<uint64_t> data_included = pop.data_included; bufferlist data; @@ -6346,11 +6380,14 @@ bool ReplicatedBackend::handle_pull_response( pi.stat.num_keys_recovered += pop.omap_entries.size(); if (complete) { - pulling.erase(hoid); - pull_from_peer[from].erase(hoid); + to_continue->push_back(make_pair(hoid, pi.obc)); pi.stat.num_objects_recovered++; get_parent()->on_local_recover( hoid, pi.stat, pi.recovery_info, pi.obc, t); + pulling.erase(hoid); + pull_from_peer[from].erase(hoid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); return false; } else { response->soid = pop.soid; @@ -6934,14 +6971,27 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) if (is_primary()) { PullOp resp; - bool more = handle_pull_response(m->get_source().num(), pop, &resp, t); + RPGHandle *h = _open_recovery_op(); + list<pair<hobject_t, ObjectContextRef> > to_continue; + bool more = handle_pull_response( + m->get_source().num(), pop, &resp, + &to_continue, t); if (more) { send_pull_legacy( m->get_priority(), m->get_source().num(), resp.recovery_info, resp.recovery_progress); - } + } else { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + op->request->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + get_parent()->bless_context(c)); + } + run_recovery_op(h, op->request->get_priority()); } else { PushReplyOp resp; MOSDSubOpReply *reply = new MOSDSubOpReply( |