diff options
author | Samuel Just <sam.just@inktank.com> | 2013-08-29 18:46:21 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-09-23 22:54:57 -0700 |
commit | b9f4c0644bae755d28d13529fa2dd0fa4aad6f68 (patch) | |
tree | dad5ca80fce99cbe45ce8315ded4ee540b3d20f6 | |
parent | e4fede70f90ce9e6d5c3fd671fe372b0d566e329 (diff) | |
download | ceph-b9f4c0644bae755d28d13529fa2dd0fa4aad6f68.tar.gz |
ReplicatedPG: extract PGBackend::Listener recovery callbacks
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/PGBackend.h | 6 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.h | 1 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 246 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 15 |
4 files changed, 159 insertions, 109 deletions
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index b17f0542d55..27dbd91b80e 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -53,6 +53,7 @@ const hobject_t &oid, const object_stat_sum_t &stat_diff, const ObjectRecoveryInfo &recovery_info, + ObjectContextRef obc, ObjectStore::Transaction *t ) = 0; @@ -71,6 +72,10 @@ const ObjectRecoveryInfo &recovery_info ) = 0; + virtual void begin_peer_recover( + int peer, + const hobject_t oid) = 0; + virtual void failed_push(int from, const hobject_t &soid) = 0; /** @@ -91,6 +96,7 @@ virtual const map<hobject_t, set<int> > &get_missing_loc() = 0; virtual const map<int, pg_missing_t> &get_peer_missing() = 0; + virtual const map<int, pg_info_t> &get_peer_info() = 0; virtual const pg_missing_t &get_local_missing() = 0; virtual const PGLog &get_log() = 0; virtual bool pgb_is_primary() const = 0; diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index bcd1239c626..4c20f248650 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -22,7 +22,6 @@ class ReplicatedBackend : public PGBackend { struct RPGHandle : public PGBackend::RecoveryHandle { map<int, vector<PushOp> > pushes; - map<int, vector<PushReplyOp> > push_replies; map<int, vector<PullOp> > pulls; }; private: diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 24dc8baf456..4dad42462c7 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -79,6 +79,136 @@ PGLSFilter::~PGLSFilter() { } +// ====================== +// PGBackend::Listener + + +void ReplicatedPG::on_local_recover_start( + const hobject_t &oid, + ObjectStore::Transaction *t) +{ + pg_log.revise_have(oid, eversion_t()); + remove_snap_mapped_object(*t, oid); + t->remove(coll, oid); +} + +void ReplicatedPG::on_local_recover( + const hobject_t &hoid, + const object_stat_sum_t &stat_diff, + const ObjectRecoveryInfo &_recovery_info, + ObjectContextRef obc, + ObjectStore::Transaction *t + ) +{ + ObjectRecoveryInfo recovery_info(_recovery_info); + if (recovery_info.soid.snap < CEPH_NOSNAP) { + assert(recovery_info.oi.snaps.size()); + OSDriver::OSTransaction _t(osdriver.get_transaction(t)); + set<snapid_t> snaps( + recovery_info.oi.snaps.begin(), + recovery_info.oi.snaps.end()); + snap_mapper.add_oid( + recovery_info.soid, + snaps, + &_t); + } + + if (pg_log.get_missing().is_missing(recovery_info.soid) && + pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { + assert(is_primary()); + const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; + if (latest->op == pg_log_entry_t::LOST_REVERT && + latest->reverting_to == recovery_info.version) { + dout(10) << " got old revert version " << recovery_info.version + << " for " << *latest << dendl; + recovery_info.version = latest->version; + // update the attr to the revert event version + recovery_info.oi.prior_version = recovery_info.oi.version; + recovery_info.oi.version = latest->version; + bufferlist bl; + ::encode(recovery_info.oi, bl); + t->setattr(coll, recovery_info.soid, OI_ATTR, bl); + } + } + + // keep track of active pushes for scrub + ++active_pushes; + + recover_got(recovery_info.soid, recovery_info.version); + + if (is_primary()) { + info.stats.stats.sum.add(stat_diff); + + assert(obc); + obc->obs.exists = true; + obc->ondisk_write_lock(); + + + t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); + t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); + t->register_on_complete( + new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch())); + + publish_stats_to_osd(); + if (waiting_for_missing_object.count(hoid)) { + dout(20) << " kicking waiters on " << hoid << dendl; + requeue_ops(waiting_for_missing_object[hoid]); + waiting_for_missing_object.erase(hoid); + if (pg_log.get_missing().missing.size() == 0) { + requeue_ops(waiting_for_all_missing); + waiting_for_all_missing.clear(); + } + } + } else { + t->register_on_applied( + new C_OSD_AppliedRecoveredObjectReplica(this)); + + } + + t->register_on_commit( + new C_OSD_CommittedPushedObject( + this, + get_osdmap()->get_epoch(), + info.last_complete)); + + // update pg + dirty_info = true; + write_if_dirty(*t); + +} + +void ReplicatedPG::on_global_recover( + const hobject_t &soid) +{ + publish_stats_to_osd(); + pushing.erase(soid); + dout(10) << "pushed " << soid << " to all replicas" << dendl; + finish_recovery_op(soid); + if (waiting_for_degraded_object.count(soid)) { + requeue_ops(waiting_for_degraded_object[soid]); + waiting_for_degraded_object.erase(soid); + } + finish_degraded_object(soid); +} + +void ReplicatedPG::on_peer_recover( + int peer, + const hobject_t &soid, + const ObjectRecoveryInfo &recovery_info) +{ + // done! + if (peer == backfill_target && backfills_in_flight.count(soid)) + backfills_in_flight.erase(soid); + else + peer_missing[peer].got(soid, recovery_info.version); +} + +void ReplicatedPG::begin_peer_recover( + int peer, + const hobject_t soid) +{ +} + // ======================= // pg changes @@ -6013,9 +6143,7 @@ void ReplicatedPG::submit_push_data( } if (first) { - pg_log.revise_have(recovery_info.soid, eversion_t()); - remove_snap_mapped_object(*t, recovery_info.soid); - t->remove(coll, recovery_info.soid); + on_local_recover_start(recovery_info.soid, t); t->remove(get_temp_coll(t), recovery_info.soid); t->touch(target_coll, recovery_info.soid); t->omap_setheader(target_coll, recovery_info.soid, omap_header); @@ -6065,41 +6193,6 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, q.get_start(), q.get_len(), q.get_start()); } } - - if (recovery_info.soid.snap < CEPH_NOSNAP) { - assert(recovery_info.oi.snaps.size()); - OSDriver::OSTransaction _t(osdriver.get_transaction(t)); - set<snapid_t> snaps( - recovery_info.oi.snaps.begin(), - recovery_info.oi.snaps.end()); - snap_mapper.add_oid( - recovery_info.soid, - snaps, - &_t); - } - - if (pg_log.get_missing().is_missing(recovery_info.soid) && - pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { - assert(is_primary()); - const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; - if (latest->op == pg_log_entry_t::LOST_REVERT && - latest->reverting_to == recovery_info.version) { - dout(10) << " got old revert version " << recovery_info.version - << " for " << *latest << dendl; - recovery_info.version = latest->version; - // update the attr to the revert event version - recovery_info.oi.prior_version = recovery_info.oi.version; - recovery_info.oi.version = latest->version; - bufferlist bl; - ::encode(recovery_info.oi, bl); - t->setattr(coll, recovery_info.soid, OI_ATTR, bl); - } - } - recover_got(recovery_info.soid, recovery_info.version); - - // update pg - dirty_info = true; - write_if_dirty(*t); } ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info) @@ -6209,50 +6302,10 @@ bool ReplicatedPG::handle_pull_response( info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size(); if (complete) { - info.stats.stats.sum.num_objects_recovered++; - - SnapSetContext *ssc; - if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) { - ssc = create_snapset_context(hoid.oid); - ssc->snapset = pi.recovery_info.ss; - } else { - ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false, - hoid.get_namespace()); - assert(ssc); - } - ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc); - obc->obs.exists = true; - - obc->ondisk_write_lock(); - - // keep track of active pushes for scrub - ++active_pushes; - - t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); - t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); - t->register_on_complete( - new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch())); - } - - t->register_on_commit( - new C_OSD_CommittedPushedObject( - this, - get_osdmap()->get_epoch(), - info.last_complete)); - - if (complete) { pulling.erase(hoid); pull_from_peer[from].erase(hoid); - publish_stats_to_osd(); - if (waiting_for_missing_object.count(hoid)) { - dout(20) << " kicking waiters on " << hoid << dendl; - requeue_ops(waiting_for_missing_object[hoid]); - waiting_for_missing_object.erase(hoid); - if (pg_log.get_missing().missing.size() == 0) { - requeue_ops(waiting_for_all_missing); - waiting_for_all_missing.clear(); - } - } + info.stats.stats.sum.num_objects_recovered++; + on_local_recover(hoid, object_stat_sum_t(), pi.recovery_info, pi.obc, t); return false; } else { response->soid = pop.soid; @@ -6286,12 +6339,7 @@ void ReplicatedPG::handle_push( bool complete = pop.after_progress.data_complete && pop.after_progress.omap_complete; - // keep track of active pushes for scrub - ++active_pushes; - response->soid = pop.recovery_info.soid; - t->register_on_applied( - new C_OSD_AppliedRecoveredObjectReplica(this)); submit_push_data(pop.recovery_info, first, complete, @@ -6302,11 +6350,13 @@ void ReplicatedPG::handle_push( pop.omap_entries, t); - t->register_on_commit( - new C_OSD_CommittedPushedObject( - this, - get_osdmap()->get_epoch(), - info.last_complete)); + if (complete) + on_local_recover( + pop.recovery_info.soid, + object_stat_sum_t(), + pop.recovery_info, + ObjectContextRef(), // ok, is replica + t); } void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes) @@ -6575,25 +6625,14 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) return true; } else { // done! - if (peer == backfill_target && backfills_in_flight.count(soid)) - backfills_in_flight.erase(soid); - else - peer_missing[peer].got(soid, pi->recovery_info.version); + on_peer_recover(peer, soid, pi->recovery_info); pushing[soid].erase(peer); pi = NULL; - publish_stats_to_osd(); if (pushing[soid].empty()) { - pushing.erase(soid); - dout(10) << "pushed " << soid << " to all replicas" << dendl; - finish_recovery_op(soid); - if (waiting_for_degraded_object.count(soid)) { - requeue_ops(waiting_for_degraded_object[soid]); - waiting_for_degraded_object.erase(soid); - } - finish_degraded_object(soid); + on_global_recover(soid); } else { dout(10) << "pushed " << soid << ", still waiting for push ack from " << pushing[soid].size() << " others" << dendl; @@ -6860,7 +6899,6 @@ void ReplicatedPG::sub_op_push(OpRequestRef op) t->register_on_complete(new C_OSD_SendMessageOnConn( osd, reply, m->get_connection())); } - t->register_on_commit(new C_OnPushCommit(this, op)); osd->store->queue_transaction(osr.get(), t); return; } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index ea371af2ad1..0a670ae835e 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -130,19 +130,23 @@ public: /// Listener methods void on_local_recover_start( const hobject_t &oid, - ObjectStore::Transaction *t) {} + ObjectStore::Transaction *t); void on_local_recover( const hobject_t &oid, const object_stat_sum_t &stat_diff, const ObjectRecoveryInfo &recovery_info, + ObjectContextRef obc, ObjectStore::Transaction *t - ) {} + ); void on_peer_recover( int peer, const hobject_t &oid, - const ObjectRecoveryInfo &recovery_info) {} + const ObjectRecoveryInfo &recovery_info); + void begin_peer_recover( + int peer, + const hobject_t oid); void on_global_recover( - const hobject_t &oid) {} + const hobject_t &oid); void failed_push(int from, const hobject_t &soid); template <typename T> @@ -207,6 +211,9 @@ public: const map<int, pg_missing_t> &get_peer_missing() { return peer_missing; } + const map<int, pg_info_t> &get_peer_info() { + return peer_info; + } const pg_missing_t &get_local_missing() { return pg_log.get_missing(); } |