diff options
-rw-r--r-- | doc/dev/osd_internals/erasure_coding/recovery.rst | 4 | ||||
-rw-r--r-- | src/common/WorkQueue.h | 37 | ||||
-rw-r--r-- | src/common/hobject.h | 24 | ||||
-rw-r--r-- | src/include/Context.h | 20 | ||||
-rw-r--r-- | src/os/FileStore.cc | 12 | ||||
-rw-r--r-- | src/os/FileStore.h | 4 | ||||
-rw-r--r-- | src/osd/Makefile.am | 3 | ||||
-rw-r--r-- | src/osd/OSD.cc | 39 | ||||
-rw-r--r-- | src/osd/OSD.h | 15 | ||||
-rw-r--r-- | src/osd/PG.cc | 70 | ||||
-rw-r--r-- | src/osd/PG.h | 15 | ||||
-rw-r--r-- | src/osd/PGBackend.h | 210 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 196 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.h | 309 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 1219 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 391 | ||||
-rw-r--r-- | src/osd/osd_types.h | 1 |
17 files changed, 1722 insertions, 847 deletions
diff --git a/doc/dev/osd_internals/erasure_coding/recovery.rst b/doc/dev/osd_internals/erasure_coding/recovery.rst new file mode 100644 index 00000000000..793a5b003dc --- /dev/null +++ b/doc/dev/osd_internals/erasure_coding/recovery.rst @@ -0,0 +1,4 @@ +=================== +PGBackend Recovery +=================== + diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index b2742accdce..794b577a71d 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -390,6 +390,43 @@ public: void drain(WorkQueue_* wq = 0); }; +class GenContextWQ : + public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> { + list<GenContext<ThreadPool::TPHandle&>*> _queue; +public: + GenContextWQ(const string &name, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueueVal< + GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {} + + void _enqueue(GenContext<ThreadPool::TPHandle&> *c) { + _queue.push_back(c); + }; + void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) { + _queue.push_front(c); + } + bool _empty() { + return _queue.empty(); + } + GenContext<ThreadPool::TPHandle&> *_dequeue() { + assert(!_queue.empty()); + GenContext<ThreadPool::TPHandle&> *c = _queue.front(); + _queue.pop_front(); + return c; + } + void _process(GenContext<ThreadPool::TPHandle&> *c, ThreadPool::TPHandle &tp) { + c->complete(tp); + } +}; +class C_QueueInWQ : public Context { + GenContextWQ *wq; + GenContext<ThreadPool::TPHandle&> *c; +public: + C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c) + : wq(wq), c(c) {} + void finish(int) { + wq->queue(c); + } +}; #endif diff --git a/src/common/hobject.h b/src/common/hobject.h index f8f58b4a245..e483b664347 100644 --- a/src/common/hobject.h +++ b/src/common/hobject.h @@ -79,6 +79,30 @@ public: return ret; } + /// @return head version of this hobject_t + hobject_t get_head() const { + hobject_t ret(*this); + ret.snap = CEPH_NOSNAP; + return ret; + } + + /// @return snapdir version of this hobject_t + hobject_t get_snapdir() const { + hobject_t ret(*this); + ret.snap = CEPH_SNAPDIR; + return ret; + } + + /// @return true if object is neither head nor snapdir + bool is_snap() const { + return (snap != CEPH_NOSNAP) && (snap != CEPH_SNAPDIR); + } + + /// @return true iff the object should have a snapset in it's attrs + bool has_snapset() const { + return !is_snap(); + } + /* Do not use when a particular hash function is needed */ explicit hobject_t(const sobject_t &o) : oid(o.oid), snap(o.snap), max(false), pool(-1) { diff --git a/src/include/Context.h b/src/include/Context.h index 9ec4414a047..663313ceec1 100644 --- a/src/include/Context.h +++ b/src/include/Context.h @@ -28,6 +28,26 @@ #define mydout(cct, v) lgeneric_subdout(cct, context, v) /* + * GenContext - abstract callback class + */ +template <typename T> +class GenContext { + GenContext(const GenContext& other); + const GenContext& operator=(const GenContext& other); + + protected: + virtual void finish(T t) = 0; + + public: + GenContext() {} + virtual ~GenContext() {} // we want a virtual destructor!!! + virtual void complete(T t) { + finish(t); + delete this; + } +}; + +/* * Context - abstract callback class */ class Context { diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 5cd22e2b348..343fb25c0e4 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1812,7 +1812,7 @@ int FileStore::_do_transactions( for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p, trans_num++) { - r = _do_transaction(**p, op_seq, trans_num); + r = _do_transaction(**p, op_seq, trans_num, handle); if (r < 0) break; if (handle) @@ -2074,7 +2074,9 @@ int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos) } } -unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_num) +unsigned FileStore::_do_transaction( + Transaction& t, uint64_t op_seq, int trans_num, + ThreadPool::TPHandle *handle) { dout(10) << "_do_transaction on " << &t << dendl; @@ -2082,6 +2084,9 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n SequencerPosition spos(op_seq, trans_num, 0); while (i.have_op()) { + if (handle) + handle->reset_tp_timeout(); + int op = i.get_op(); int r = 0; @@ -4019,6 +4024,7 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start, int min, int max, snapid_t seq, vector<ghobject_t> *ls, ghobject_t *next) { + dout(10) << "collection_list_partial: " << c << dendl; Index index; int r = get_index(c, &index); if (r < 0) @@ -4030,6 +4036,8 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start, assert(!m_filestore_fail_eio || r != -EIO); return r; } + if (ls) + dout(20) << "objects: " << *ls << dendl; return 0; } diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 8fd726767a1..b9017985a34 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -365,7 +365,9 @@ public: int do_transactions(list<Transaction*> &tls, uint64_t op_seq) { return _do_transactions(tls, op_seq, 0); } - unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num); + unsigned _do_transaction( + Transaction& t, uint64_t op_seq, int trans_num, + ThreadPool::TPHandle *handle); int queue_transactions(Sequencer *osr, list<Transaction*>& tls, TrackedOpRef op = TrackedOpRef()); diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index ea7c036f858..9d3bc1d5e47 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -9,6 +9,7 @@ libosd_la_SOURCES = \ osd/PG.cc \ osd/PGLog.cc \ osd/ReplicatedPG.cc \ + osd/ReplicatedBackend.cc \ osd/Ager.cc \ osd/OSD.cc \ osd/OSDCap.cc \ @@ -35,6 +36,8 @@ noinst_HEADERS += \ osd/PG.h \ osd/PGLog.h \ osd/ReplicatedPG.h \ + osd/PGBackend.h \ + osd/ReplicatedBackend.h \ osd/Watch.h \ osd/osd_types.h diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 529fb6ffb1b..9a2fbb5c576 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -180,6 +180,7 @@ OSDService::OSDService(OSD *osd) : scrub_wq(osd->scrub_wq), scrub_finalize_wq(osd->scrub_finalize_wq), rep_scrub_wq(osd->rep_scrub_wq), + push_wq("push_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp), class_handler(osd->class_handler), publish_lock("OSDService::publish_lock"), pre_publish_lock("OSDService::pre_publish_lock"), @@ -3418,16 +3419,16 @@ void OSD::RemoveWQ::_process(pair<PGRef, DeletingStateRef> item) if (!item.second->start_clearing()) return; - if (pg->have_temp_coll()) { + list<coll_t> colls_to_remove; + pg->get_colls(&colls_to_remove); + for (list<coll_t>::iterator i = colls_to_remove.begin(); + i != colls_to_remove.end(); + ++i) { bool cont = remove_dir( - pg->cct, store, &mapper, &driver, pg->osr.get(), pg->get_temp_coll(), item.second); + pg->cct, store, &mapper, &driver, pg->osr.get(), *i, item.second); if (!cont) return; } - bool cont = remove_dir( - pg->cct, store, &mapper, &driver, pg->osr.get(), coll, item.second); - if (!cont) - return; if (!item.second->start_deleting()) return; @@ -3438,9 +3439,12 @@ void OSD::RemoveWQ::_process(pair<PGRef, DeletingStateRef> item) OSD::make_infos_oid(), pg->log_oid, t); - if (pg->have_temp_coll()) - t->remove_collection(pg->get_temp_coll()); - t->remove_collection(coll); + + for (list<coll_t>::iterator i = colls_to_remove.begin(); + i != colls_to_remove.end(); + ++i) { + t->remove_collection(*i); + } // We need the sequencer to stick around until the op is complete store->queue_transaction( @@ -5895,22 +5899,11 @@ void OSD::split_pgs( dout(10) << "m_seed " << i->ps() << dendl; dout(10) << "split_bits is " << split_bits << dendl; - rctx->transaction->create_collection( - coll_t(*i)); - rctx->transaction->split_collection( - coll_t(parent->info.pgid), + parent->split_colls( + *i, split_bits, i->m_seed, - coll_t(*i)); - if (parent->have_temp_coll()) { - rctx->transaction->create_collection( - coll_t::make_temp_coll(*i)); - rctx->transaction->split_collection( - coll_t::make_temp_coll(parent->info.pgid), - split_bits, - i->m_seed, - coll_t::make_temp_coll(*i)); - } + rctx->transaction); parent->split_into( *i, child, diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 15dc0440352..5fe667344a9 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -307,6 +307,7 @@ public: ThreadPool::WorkQueue<PG> &scrub_wq; ThreadPool::WorkQueue<PG> &scrub_finalize_wq; ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq; + GenContextWQ push_wq; ClassHandler *&class_handler; void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued); @@ -635,6 +636,20 @@ public: OSDService(OSD *osd); ~OSDService(); }; + +struct C_OSD_SendMessageOnConn: public Context { + OSDService *osd; + Message *reply; + ConnectionRef conn; + C_OSD_SendMessageOnConn( + OSDService *osd, + Message *reply, + ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {} + void finish(int) { + osd->send_message_osd_cluster(reply, conn.get()); + } +}; + class OSD : public Dispatcher, public md_config_obs_t { /** OSD **/ diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 6afa5599376..f1985bf961b 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1399,76 +1399,6 @@ void PG::queue_op(OpRequestRef op) osd->op_wq.queue(make_pair(PGRef(this), op)); } -void PG::do_request( - OpRequestRef op, - ThreadPool::TPHandle &handle) -{ - // do any pending flush - do_pending_flush(); - - if (!op_has_sufficient_caps(op)) { - osd->reply_op_error(op, -EPERM); - return; - } - assert(!op_must_wait_for_map(get_osdmap(), op)); - if (can_discard_request(op)) { - return; - } - if (!flushed) { - dout(20) << " !flushed, waiting for active on " << op << dendl; - waiting_for_active.push_back(op); - return; - } - - switch (op->request->get_type()) { - case CEPH_MSG_OSD_OP: - if (is_replay() || !is_active()) { - dout(20) << " replay, waiting for active on " << op << dendl; - waiting_for_active.push_back(op); - return; - } - do_op(op); // do it now - break; - - case MSG_OSD_SUBOP: - do_sub_op(op); - break; - - case MSG_OSD_SUBOPREPLY: - do_sub_op_reply(op); - break; - - case MSG_OSD_PG_SCAN: - do_scan(op, handle); - break; - - case MSG_OSD_PG_BACKFILL: - do_backfill(op); - break; - - case MSG_OSD_PG_PUSH: - if (!is_active()) { - waiting_for_active.push_back(op); - op->mark_delayed("waiting for active"); - return; - } - do_push(op); - break; - - case MSG_OSD_PG_PULL: - do_pull(op); - break; - - case MSG_OSD_PG_PUSH_REPLY: - do_push_reply(op); - break; - - default: - assert(0 == "bad message type in do_request"); - } -} - - void PG::replay_queued_ops() { assert(is_replay() && is_active()); diff --git a/src/osd/PG.h b/src/osd/PG.h index cdbe827a4a9..74809eea268 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -870,8 +870,12 @@ public: virtual void _scrub(ScrubMap &map) { } virtual void _scrub_clear_state() { } virtual void _scrub_finish() { } - virtual coll_t get_temp_coll() = 0; - virtual bool have_temp_coll() = 0; + virtual void get_colls(list<coll_t> *out) = 0; + virtual void split_colls( + pg_t child, + int split_bits, + int seed, + ObjectStore::Transaction *t) = 0; virtual bool _report_snap_collection_errors( const hobject_t &hoid, const map<string, bufferptr> &attrs, @@ -1789,10 +1793,10 @@ public: // abstract bits - void do_request( + virtual void do_request( OpRequestRef op, ThreadPool::TPHandle &handle - ); + ) = 0; virtual void do_op(OpRequestRef op) = 0; virtual void do_sub_op(OpRequestRef op) = 0; @@ -1802,9 +1806,6 @@ public: ThreadPool::TPHandle &handle ) = 0; virtual void do_backfill(OpRequestRef op) = 0; - virtual void do_push(OpRequestRef op) = 0; - virtual void do_pull(OpRequestRef op) = 0; - virtual void do_push_reply(OpRequestRef op) = 0; virtual void snap_trimmer() = 0; virtual int do_command(cmdmap_t cmdmap, ostream& ss, diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h new file mode 100644 index 00000000000..e3cc05bf345 --- /dev/null +++ b/src/osd/PGBackend.h @@ -0,0 +1,210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef PGBACKEND_H +#define PGBACKEND_H + +#include "osd_types.h" +#include "include/Context.h" +#include <string> + + /** + * PGBackend + * + * PGBackend defines an interface for logic handling IO and + * replication on RADOS objects. The PGBackend implementation + * is responsible for: + * + * 1) Handling client operations + * 2) Handling object recovery + * 3) Handling object access + */ + class PGBackend { + public: + /** + * Provides interfaces for PGBackend callbacks + * + * The intention is that the parent calls into the PGBackend + * implementation holding a lock and that the callbacks are + * called under the same locks. + */ + class Listener { + public: + /// Recovery + + virtual void on_local_recover_start( + const hobject_t &oid, + ObjectStore::Transaction *t) = 0; + /** + * Called with the transaction recovering oid + */ + virtual void on_local_recover( + const hobject_t &oid, + const object_stat_sum_t &stat_diff, + const ObjectRecoveryInfo &recovery_info, + ObjectContextRef obc, + ObjectStore::Transaction *t + ) = 0; + + /** + * Called when transaction recovering oid is durable and + * applied on all replicas + */ + virtual void on_global_recover(const hobject_t &oid) = 0; + + /** + * Called when peer is recovered + */ + virtual void on_peer_recover( + int peer, + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info, + const object_stat_sum_t &stat + ) = 0; + + virtual void begin_peer_recover( + int peer, + const hobject_t oid) = 0; + + virtual void failed_push(int from, const hobject_t &soid) = 0; + + + virtual void cancel_pull(const hobject_t &soid) = 0; + + /** + * Bless a context + * + * Wraps a context in whatever outer layers the parent usually + * uses to call into the PGBackend + */ + virtual Context *bless_context(Context *c) = 0; + virtual GenContext<ThreadPool::TPHandle&> *bless_gencontext( + GenContext<ThreadPool::TPHandle&> *c) = 0; + + virtual void send_message(int to_osd, Message *m) = 0; + virtual void queue_transaction(ObjectStore::Transaction *t) = 0; + virtual epoch_t get_epoch() = 0; + virtual const vector<int> &get_acting() = 0; + virtual std::string gen_dbg_prefix() const = 0; + + virtual const map<hobject_t, set<int> > &get_missing_loc() = 0; + virtual const map<int, pg_missing_t> &get_peer_missing() = 0; + virtual const map<int, pg_info_t> &get_peer_info() = 0; + virtual const pg_missing_t &get_local_missing() = 0; + virtual const PGLog &get_log() = 0; + virtual bool pgb_is_primary() const = 0; + virtual OSDMapRef pgb_get_osdmap() const = 0; + virtual const pg_info_t &get_info() const = 0; + + virtual ObjectContextRef get_obc( + const hobject_t &hoid, + map<string, bufferptr> &attrs) = 0; + + virtual ~Listener() {} + }; + Listener *parent; + Listener *get_parent() const { return parent; } + PGBackend(Listener *l) : parent(l) {} + bool is_primary() const { return get_parent()->pgb_is_primary(); } + OSDMapRef get_osdmap() const { return get_parent()->pgb_get_osdmap(); } + const pg_info_t &get_info() { return get_parent()->get_info(); } + + std::string gen_prefix() const { + return parent->gen_dbg_prefix(); + } + + /** + * RecoveryHandle + * + * We may want to recover multiple objects in the same set of + * messages. RecoveryHandle is an interface for the opaque + * object used by the implementation to store the details of + * the pending recovery operations. + */ + struct RecoveryHandle { + virtual ~RecoveryHandle() {} + }; + + /// Get a fresh recovery operation + virtual RecoveryHandle *open_recovery_op() = 0; + + /// run_recovery_op: finish the operation represented by h + virtual void run_recovery_op( + RecoveryHandle *h, ///< [in] op to finish + int priority ///< [in] msg priority + ) = 0; + + /** + * recover_object + * + * Triggers a recovery operation on the specified hobject_t + * onreadable must be called before onwriteable + * + * On each replica (primary included), get_parent()->on_not_missing() + * must be called when the transaction finalizing the recovery + * is queued. Similarly, get_parent()->on_readable() must be called + * when the transaction is applied in the backing store. + * + * get_parent()->on_not_degraded() should be called on the primary + * when writes can resume on the object. + * + * obc may be NULL if the primary lacks the object. + * + * head may be NULL only if the head/snapdir is missing + * + * @param missing [in] set of info, missing pairs for queried nodes + * @param overlaps [in] mapping of object to file offset overlaps + */ + virtual void recover_object( + const hobject_t &hoid, ///< [in] object to recover + ObjectContextRef head, ///< [in] context of the head/snapdir object + ObjectContextRef obc, ///< [in] context of the object + RecoveryHandle *h ///< [in,out] handle to attach recovery op to + ) = 0; + + /// gives PGBackend a crack at an incoming message + virtual bool handle_message( + OpRequestRef op ///< [in] message received + ) = 0; ///< @return true if the message was handled + + virtual void check_recovery_sources(const OSDMapRef osdmap) = 0; + + /** + * implementation should clear itself, contexts blessed prior to on_change + * won't be called after on_change() + */ + virtual void on_change(ObjectStore::Transaction *t) = 0; + virtual void clear_state() = 0; + + virtual void on_flushed() = 0; + + + virtual void split_colls( + pg_t child, + int split_bits, + int seed, + ObjectStore::Transaction *t) = 0; + + virtual void temp_colls(list<coll_t> *out) = 0; + + virtual void dump_recovery_info(Formatter *f) const = 0; + + virtual coll_t get_temp_coll(ObjectStore::Transaction *t) = 0; + virtual void add_temp_obj(const hobject_t &oid) = 0; + virtual void clear_temp_obj(const hobject_t &oid) = 0; + + virtual ~PGBackend() {} + }; + +#endif diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc new file mode 100644 index 00000000000..9868e7af2c8 --- /dev/null +++ b/src/osd/ReplicatedBackend.cc @@ -0,0 +1,196 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "ReplicatedBackend.h" +#include "messages/MOSDSubOp.h" +#include "messages/MOSDSubOpReply.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPushReply.h" + +#define dout_subsys ceph_subsys_osd +#define DOUT_PREFIX_ARGS this +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { + return *_dout << pgb->get_parent()->gen_dbg_prefix(); +} + +ReplicatedBackend::ReplicatedBackend( + PGBackend::Listener *pg, coll_t coll, OSDService *osd) : + PGBackend(pg), temp_created(false), + temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)), + coll(coll), osd(osd), cct(osd->cct) {} + +void ReplicatedBackend::run_recovery_op( + PGBackend::RecoveryHandle *_h, + int priority) +{ + RPGHandle *h = static_cast<RPGHandle *>(_h); + send_pushes(priority, h->pushes); + send_pulls(priority, h->pulls); + delete h; +} + +void ReplicatedBackend::recover_object( + const hobject_t &hoid, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *_h + ) +{ + dout(10) << __func__ << ": " << hoid << dendl; + RPGHandle *h = static_cast<RPGHandle *>(_h); + if (get_parent()->get_local_missing().is_missing(hoid)) { + assert(!obc); + // pull + prepare_pull( + hoid, + head, + h); + return; + } else { + assert(obc); + int started = start_pushes( + hoid, + obc, + h); + assert(started > 0); + } +} + +void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap) +{ + for(map<int, set<hobject_t> >::iterator i = pull_from_peer.begin(); + i != pull_from_peer.end(); + ) { + if (osdmap->is_down(i->first)) { + dout(10) << "check_recovery_sources resetting pulls from osd." << i->first + << ", osdmap has it marked down" << dendl; + for (set<hobject_t>::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + assert(pulling.count(*j) == 1); + get_parent()->cancel_pull(*j); + pulling.erase(*j); + } + pull_from_peer.erase(i++); + } else { + ++i; + } + } +} + +bool ReplicatedBackend::handle_message( + OpRequestRef op + ) +{ + dout(10) << __func__ << ": " << op << dendl; + switch (op->request->get_type()) { + case MSG_OSD_PG_PUSH: + // TODOXXX: needs to be active possibly + do_push(op); + return true; + + case MSG_OSD_PG_PULL: + do_pull(op); + return true; + + case MSG_OSD_PG_PUSH_REPLY: + do_push_reply(op); + return true; + + case MSG_OSD_SUBOP: { + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + if (m->ops.size() >= 1) { + OSDOp *first = &m->ops[0]; + switch (first->op.op) { + case CEPH_OSD_OP_PULL: + sub_op_pull(op); + return true; + case CEPH_OSD_OP_PUSH: + // TODOXXX: needs to be active possibly + sub_op_push(op); + return true; + default: + break; + } + } + break; + } + + case MSG_OSD_SUBOPREPLY: { + MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request); + if (r->ops.size() >= 1) { + OSDOp &first = r->ops[0]; + switch (first.op.op) { + case CEPH_OSD_OP_PUSH: + // continue peer recovery + sub_op_push_reply(op); + return true; + } + } + break; + } + + default: + break; + } + return false; +} + +void ReplicatedBackend::clear_state() +{ + // clear pushing/pulling maps + pushing.clear(); + pulling.clear(); + pull_from_peer.clear(); +} + +void ReplicatedBackend::on_change(ObjectStore::Transaction *t) +{ + dout(10) << __func__ << dendl; + // clear temp + for (set<hobject_t>::iterator i = temp_contents.begin(); + i != temp_contents.end(); + ++i) { + dout(10) << __func__ << ": Removing oid " + << *i << " from the temp collection" << dendl; + t->remove(get_temp_coll(t), *i); + } + temp_contents.clear(); + clear_state(); +} + +coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t) +{ + if (temp_created) + return temp_coll; + if (!osd->store->collection_exists(temp_coll)) + t->create_collection(temp_coll); + temp_created = true; + return temp_coll; +} + +void ReplicatedBackend::on_flushed() +{ + if (have_temp_coll() && + !osd->store->collection_empty(get_temp_coll())) { + vector<hobject_t> objects; + osd->store->collection_list(get_temp_coll(), objects); + derr << __func__ << ": found objects in the temp collection: " + << objects << ", crashing now" + << dendl; + assert(0 == "found garbage in the temp collection"); + } +} diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h new file mode 100644 index 00000000000..e34e55a618e --- /dev/null +++ b/src/osd/ReplicatedBackend.h @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef REPBACKEND_H +#define REPBACKEND_H + +#include "OSD.h" +#include "PGBackend.h" +#include "osd_types.h" + +struct C_ReplicatedBackend_OnPullComplete; +class ReplicatedBackend : public PGBackend { + struct RPGHandle : public PGBackend::RecoveryHandle { + map<int, vector<PushOp> > pushes; + map<int, vector<PullOp> > pulls; + }; + friend struct C_ReplicatedBackend_OnPullComplete; +private: + bool temp_created; + const coll_t temp_coll; + coll_t get_temp_coll() const { + return temp_coll; + } + bool have_temp_coll() const { return temp_created; } + + // Track contents of temp collection, clear on reset + set<hobject_t> temp_contents; +public: + coll_t coll; + OSDService *osd; + CephContext *cct; + + ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd); + + /// @see PGBackend::open_recovery_op + RPGHandle *_open_recovery_op() { + return new RPGHandle(); + } + PGBackend::RecoveryHandle *open_recovery_op() { + return _open_recovery_op(); + } + + /// @see PGBackend::run_recovery_op + void run_recovery_op( + PGBackend::RecoveryHandle *h, + int priority); + + /// @see PGBackend::recover_object + void recover_object( + const hobject_t &hoid, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *h + ); + + void check_recovery_sources(const OSDMapRef osdmap); + + /// @see PGBackend::handle_message + bool handle_message( + OpRequestRef op + ); + + void on_change(ObjectStore::Transaction *t); + void clear_state(); + void on_flushed(); + + void temp_colls(list<coll_t> *out) { + if (temp_created) + out->push_back(temp_coll); + } + void split_colls( + pg_t child, + int split_bits, + int seed, + ObjectStore::Transaction *t) { + coll_t target = coll_t::make_temp_coll(child); + if (!temp_created) + return; + t->create_collection(target); + t->split_collection( + temp_coll, + split_bits, + seed, + target); + } + + virtual void dump_recovery_info(Formatter *f) const { + { + f->open_array_section("pull_from_peer"); + for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin(); + i != pull_from_peer.end(); + ++i) { + f->open_object_section("pulling_from"); + f->dump_int("pull_from", i->first); + { + f->open_array_section("pulls"); + for (set<hobject_t>::const_iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + f->open_object_section("pull_info"); + assert(pulling.count(*j)); + pulling.find(*j)->second.dump(f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); + } + f->close_section(); + } + { + f->open_array_section("pushing"); + for (map<hobject_t, map<int, PushInfo> >::const_iterator i = + pushing.begin(); + i != pushing.end(); + ++i) { + f->open_object_section("object"); + f->dump_stream("pushing") << i->first; + { + f->open_array_section("pushing_to"); + for (map<int, PushInfo>::const_iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + f->open_object_section("push_progress"); + f->dump_stream("object_pushing") << j->first; + { + f->open_object_section("push_info"); + j->second.dump(f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); + } + f->close_section(); + } + f->close_section(); + } + } +private: + // push + struct PushInfo { + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + ObjectContextRef obc; + object_stat_sum_t stat; + + void dump(Formatter *f) const { + { + f->open_object_section("recovery_progress"); + recovery_progress.dump(f); + f->close_section(); + } + { + f->open_object_section("recovery_info"); + recovery_info.dump(f); + f->close_section(); + } + } + }; + map<hobject_t, map<int, PushInfo> > pushing; + + // pull + struct PullInfo { + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + ObjectContextRef head_ctx; + ObjectContextRef obc; + object_stat_sum_t stat; + + void dump(Formatter *f) const { + { + f->open_object_section("recovery_progress"); + recovery_progress.dump(f); + f->close_section(); + } + { + f->open_object_section("recovery_info"); + recovery_info.dump(f); + f->close_section(); + } + } + + bool is_complete() const { + return recovery_progress.is_complete(recovery_info); + } + }; + + coll_t get_temp_coll(ObjectStore::Transaction *t); + void add_temp_obj(const hobject_t &oid) { + temp_contents.insert(oid); + } + void clear_temp_obj(const hobject_t &oid) { + temp_contents.erase(oid); + } + + map<hobject_t, PullInfo> pulling; + + // Reverse mapping from osd peer to objects beging pulled from that peer + map<int, set<hobject_t> > pull_from_peer; + + void sub_op_push(OpRequestRef op); + void sub_op_push_reply(OpRequestRef op); + void sub_op_pull(OpRequestRef op); + + void _do_push(OpRequestRef op); + void _do_pull_response(OpRequestRef op); + void do_push(OpRequestRef op) { + if (is_primary()) { + _do_pull_response(op); + } else { + _do_push(op); + } + } + void do_pull(OpRequestRef op); + void do_push_reply(OpRequestRef op); + + bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply); + void handle_pull(int peer, PullOp &op, PushOp *reply); + bool handle_pull_response( + int from, PushOp &op, PullOp *response, + list<ObjectContextRef> *to_continue, + ObjectStore::Transaction *t); + void handle_push(int from, PushOp &op, PushReplyOp *response, + ObjectStore::Transaction *t); + + static void trim_pushed_data(const interval_set<uint64_t> ©_subset, + const interval_set<uint64_t> &intervals_received, + bufferlist data_received, + interval_set<uint64_t> *intervals_usable, + bufferlist *data_usable); + void _failed_push(int from, const hobject_t &soid); + + void send_pushes(int prio, map<int, vector<PushOp> > &pushes); + void prep_push_op_blank(const hobject_t& soid, PushOp *op); + int send_push_op_legacy(int priority, int peer, + PushOp &pop); + int send_pull_legacy(int priority, int peer, + const ObjectRecoveryInfo& recovery_info, + ObjectRecoveryProgress progress); + void send_pulls( + int priority, + map<int, vector<PullOp> > &pulls); + + int build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op, + object_stat_sum_t *stat = 0); + void submit_push_data(ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + const interval_set<uint64_t> &intervals_included, + bufferlist data_included, + bufferlist omap_header, + map<string, bufferptr> &attrs, + map<string, bufferlist> &omap_entries, + ObjectStore::Transaction *t); + void submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t); + + void calc_clone_subsets( + SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets); + void prepare_pull( + const hobject_t& soid, + ObjectContextRef headctx, + RPGHandle *h); + int start_pushes( + const hobject_t &soid, + ObjectContextRef obj, + RPGHandle *h); + void prep_push_to_replica( + ObjectContextRef obc, const hobject_t& soid, int peer, + PushOp *pop); + void prep_push(ObjectContextRef obc, + const hobject_t& oid, int dest, + PushOp *op); + void prep_push(ObjectContextRef obc, + const hobject_t& soid, int peer, + eversion_t version, + interval_set<uint64_t> &data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets, + PushOp *op); + void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets); + ObjectRecoveryInfo recalc_subsets( + const ObjectRecoveryInfo& recovery_info, + SnapSetContext *ssc + ); +}; + +#endif diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d3201c91046..7831f95818d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -60,8 +60,9 @@ #define dout_subsys ceph_subsys_osd #define DOUT_PREFIX_ARGS this, osd->whoami, get_osdmap() #undef dout_prefix -#define dout_prefix _prefix(_dout, this, osd->whoami, get_osdmap()) -static ostream& _prefix(std::ostream *_dout, PG *pg, int whoami, OSDMapRef osdmap) { +#define dout_prefix _prefix(_dout, this) +template <typename T> +static ostream& _prefix(std::ostream *_dout, T *pg) { return *_dout << pg->gen_prefix(); } @@ -79,6 +80,159 @@ PGLSFilter::~PGLSFilter() { } +static void log_subop_stats( + OSDService *osd, + OpRequestRef op, int tag_inb, int tag_lat) +{ + utime_t now = ceph_clock_now(g_ceph_context); + utime_t latency = now; + latency -= op->request->get_recv_stamp(); + + uint64_t inb = op->request->get_data().length(); + + osd->logger->inc(l_osd_sop); + + osd->logger->inc(l_osd_sop_inb, inb); + osd->logger->tinc(l_osd_sop_lat, latency); + + if (tag_inb) + osd->logger->inc(tag_inb, inb); + osd->logger->tinc(tag_lat, latency); +} + +// ====================== +// PGBackend::Listener + + +void ReplicatedPG::on_local_recover_start( + const hobject_t &oid, + ObjectStore::Transaction *t) +{ + pg_log.revise_have(oid, eversion_t()); + remove_snap_mapped_object(*t, oid); + t->remove(coll, oid); +} + +void ReplicatedPG::on_local_recover( + const hobject_t &hoid, + const object_stat_sum_t &stat_diff, + const ObjectRecoveryInfo &_recovery_info, + ObjectContextRef obc, + ObjectStore::Transaction *t + ) +{ + ObjectRecoveryInfo recovery_info(_recovery_info); + if (recovery_info.soid.snap < CEPH_NOSNAP) { + assert(recovery_info.oi.snaps.size()); + OSDriver::OSTransaction _t(osdriver.get_transaction(t)); + set<snapid_t> snaps( + recovery_info.oi.snaps.begin(), + recovery_info.oi.snaps.end()); + snap_mapper.add_oid( + recovery_info.soid, + snaps, + &_t); + } + + if (pg_log.get_missing().is_missing(recovery_info.soid) && + pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { + assert(is_primary()); + const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; + if (latest->op == pg_log_entry_t::LOST_REVERT && + latest->reverting_to == recovery_info.version) { + dout(10) << " got old revert version " << recovery_info.version + << " for " << *latest << dendl; + recovery_info.version = latest->version; + // update the attr to the revert event version + recovery_info.oi.prior_version = recovery_info.oi.version; + recovery_info.oi.version = latest->version; + bufferlist bl; + ::encode(recovery_info.oi, bl); + t->setattr(coll, recovery_info.soid, OI_ATTR, bl); + } + } + + // keep track of active pushes for scrub + ++active_pushes; + + recover_got(recovery_info.soid, recovery_info.version); + + if (is_primary()) { + info.stats.stats.sum.add(stat_diff); + + assert(obc); + obc->obs.exists = true; + obc->ondisk_write_lock(); + obc->obs.oi = recovery_info.oi; // may have been updated above + + + t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); + t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); + + publish_stats_to_osd(); + if (waiting_for_missing_object.count(hoid)) { + dout(20) << " kicking waiters on " << hoid << dendl; + requeue_ops(waiting_for_missing_object[hoid]); + waiting_for_missing_object.erase(hoid); + if (pg_log.get_missing().missing.size() == 0) { + requeue_ops(waiting_for_all_missing); + waiting_for_all_missing.clear(); + } + } + } else { + t->register_on_applied( + new C_OSD_AppliedRecoveredObjectReplica(this)); + + } + + t->register_on_commit( + new C_OSD_CommittedPushedObject( + this, + get_osdmap()->get_epoch(), + info.last_complete)); + + // update pg + dirty_info = true; + write_if_dirty(*t); + +} + +void ReplicatedPG::on_global_recover( + const hobject_t &soid) +{ + publish_stats_to_osd(); + dout(10) << "pushed " << soid << " to all replicas" << dendl; + assert(recovering.count(soid)); + recovering.erase(soid); + finish_recovery_op(soid); + if (waiting_for_degraded_object.count(soid)) { + requeue_ops(waiting_for_degraded_object[soid]); + waiting_for_degraded_object.erase(soid); + } + finish_degraded_object(soid); +} + +void ReplicatedPG::on_peer_recover( + int peer, + const hobject_t &soid, + const ObjectRecoveryInfo &recovery_info, + const object_stat_sum_t &stat) +{ + info.stats.stats.sum.add(stat); + publish_stats_to_osd(); + // done! + peer_missing[peer].got(soid, recovery_info.version); + if (peer == backfill_target && backfills_in_flight.count(soid)) + backfills_in_flight.erase(soid); +} + +void ReplicatedPG::begin_peer_recover( + int peer, + const hobject_t soid) +{ + peer_missing[peer].revise_have(soid, eversion_t()); +} + // ======================= // pg changes @@ -117,18 +271,18 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o assert(g != missing.missing.end()); const eversion_t &v(g->second.need); - map<hobject_t, PullInfo>::const_iterator p = pulling.find(soid); - if (p != pulling.end()) { - dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl; + set<hobject_t>::const_iterator p = recovering.find(soid); + if (p != recovering.end()) { + dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl; } else if (missing_loc.find(soid) == missing_loc.end()) { dout(7) << "missing " << soid << " v " << v << ", is unfound." << dendl; } else { - dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl; - map<int, vector<PullOp> > pulls; - prepare_pull(soid, v, cct->_conf->osd_client_op_priority, &pulls); - send_pulls(cct->_conf->osd_client_op_priority, pulls); + dout(7) << "missing " << soid << " v " << v << ", recovering." << dendl; + PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); + recover_missing(soid, v, cct->_conf->osd_client_op_priority, h); + pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority); } waiting_for_missing_object[soid].push_back(op); op->mark_delayed("waiting for missing object"); @@ -165,15 +319,15 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef assert(is_degraded_object(soid)); // we don't have it (yet). - if (pushing.count(soid)) { + if (recovering.count(soid)) { dout(7) << "degraded " << soid - << ", already pushing" + << ", already recovering" << dendl; } else { dout(7) << "degraded " << soid - << ", pushing" + << ", recovering" << dendl; eversion_t v; for (unsigned i = 1; i < acting.size(); i++) { @@ -184,9 +338,9 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef break; } } - map<int, vector<PushOp> > pushes; - prep_object_replica_pushes(soid, v, cct->_conf->osd_client_op_priority, &pushes); - send_pushes(cct->_conf->osd_client_op_priority, pushes); + PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); + prep_object_replica_pushes(soid, v, h); + pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority); } waiting_for_degraded_object[soid].push_back(op); op->mark_delayed("waiting for degraded object"); @@ -628,9 +782,8 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, pg_t p, const hobject_t& oid, const hobject_t& ioid) : PG(o, curmap, _pool, p, oid, ioid), + pgbackend(new ReplicatedBackend(this, coll_t(p), o)), snapset_contexts_lock("ReplicatedPG::snapset_contexts"), - temp_created(false), - temp_coll(coll_t::make_temp_coll(p)), temp_seq(0), snap_trimmer_machine(this) { @@ -644,6 +797,62 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo src_oloc.key = oid.name; } +void ReplicatedPG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) +{ + // do any pending flush + do_pending_flush(); + + if (!op_has_sufficient_caps(op)) { + osd->reply_op_error(op, -EPERM); + return; + } + assert(!op_must_wait_for_map(get_osdmap(), op)); + if (can_discard_request(op)) { + return; + } + if (!flushed) { + dout(20) << " !flushed, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + + if (pgbackend->handle_message(op)) + return; + + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + if (is_replay() || !is_active()) { + dout(20) << " replay, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + do_op(op); // do it now + break; + + case MSG_OSD_SUBOP: + do_sub_op(op); + break; + + case MSG_OSD_SUBOPREPLY: + do_sub_op_reply(op); + break; + + case MSG_OSD_PG_SCAN: + do_scan(op, handle); + break; + + case MSG_OSD_PG_BACKFILL: + do_backfill(op); + break; + + default: + assert(0 == "bad message type in do_request"); + } +} + + /** do_op - do an op * pg lock will be held (if multithreaded) * osd_lock NOT held. @@ -1237,26 +1446,6 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) << " lat " << latency << dendl; } -void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat) -{ - utime_t now = ceph_clock_now(cct); - utime_t latency = now; - latency -= op->request->get_recv_stamp(); - - uint64_t inb = op->request->get_data().length(); - - osd->logger->inc(l_osd_sop); - - osd->logger->inc(l_osd_sop_inb, inb); - osd->logger->tinc(l_osd_sop_lat, latency); - - if (tag_inb) - osd->logger->inc(tag_inb, inb); - osd->logger->tinc(tag_lat, latency); -} - - - void ReplicatedPG::do_sub_op(OpRequestRef op) { MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); @@ -1267,11 +1456,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) OSDOp *first = NULL; if (m->ops.size() >= 1) { first = &m->ops[0]; - switch (first->op.op) { - case CEPH_OSD_OP_PULL: - sub_op_pull(op); - return; - } } if (!is_active()) { @@ -1282,9 +1466,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) if (first) { switch (first->op.op) { - case CEPH_OSD_OP_PUSH: - sub_op_push(op); - return; case CEPH_OSD_OP_DELETE: sub_op_remove(op); return; @@ -1313,11 +1494,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) if (r->ops.size() >= 1) { OSDOp& first = r->ops[0]; switch (first.op.op) { - case CEPH_OSD_OP_PUSH: - // continue peer recovery - sub_op_push_reply(op); - return; - case CEPH_OSD_OP_SCRUB_RESERVE: sub_op_scrub_reserve_reply(op); return; @@ -1403,7 +1579,7 @@ void ReplicatedPG::do_scan( } } -void ReplicatedPG::_do_push(OpRequestRef op) +void ReplicatedBackend::_do_push(OpRequestRef op) { MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request); assert(m->get_header().type == MSG_OSD_PG_PUSH); @@ -1420,18 +1596,43 @@ void ReplicatedPG::_do_push(OpRequestRef op) MOSDPGPushReply *reply = new MOSDPGPushReply; reply->set_priority(m->get_priority()); - reply->pgid = info.pgid; + reply->pgid = get_info().pgid; reply->map_epoch = m->map_epoch; reply->replies.swap(replies); reply->compute_cost(cct); - t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + t->register_on_complete( + get_parent()->bless_context( + new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection()))); - osd->store->queue_transaction(osr.get(), t); + get_parent()->queue_transaction(t); } -void ReplicatedPG::_do_pull_response(OpRequestRef op) +struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> { + ReplicatedBackend *bc; + list<ObjectContextRef> to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(ThreadPool::TPHandle &handle) { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (list<ObjectContextRef>::iterator i = + to_continue.begin(); + i != to_continue.end(); + ++i) { + if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) { + bc->get_parent()->on_global_recover( + (*i)->obs.oi.soid); + } + handle.reset_tp_timeout(); + } + bc->run_recovery_op(h, priority); + } +}; + +void ReplicatedBackend::_do_pull_response(OpRequestRef op) { MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request); assert(m->get_header().type == MSG_OSD_PG_PUSH); @@ -1439,31 +1640,45 @@ void ReplicatedPG::_do_pull_response(OpRequestRef op) vector<PullOp> replies(1); ObjectStore::Transaction *t = new ObjectStore::Transaction; + list<ObjectContextRef> to_continue; for (vector<PushOp>::iterator i = m->pushes.begin(); i != m->pushes.end(); ++i) { - bool more = handle_pull_response(from, *i, &(replies.back()), t); + bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t); if (more) replies.push_back(PullOp()); } + if (!to_continue.empty()) { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + m->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new C_QueueInWQ( + &osd->push_wq, + get_parent()->bless_gencontext(c))); + } replies.erase(replies.end() - 1); if (replies.size()) { MOSDPGPull *reply = new MOSDPGPull; reply->set_priority(m->get_priority()); - reply->pgid = info.pgid; + reply->pgid = get_info().pgid; reply->map_epoch = m->map_epoch; reply->pulls.swap(replies); reply->compute_cost(cct); - t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + t->register_on_complete( + get_parent()->bless_context( + new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection()))); } - osd->store->queue_transaction(osr.get(), t); + get_parent()->queue_transaction(t); } -void ReplicatedPG::do_pull(OpRequestRef op) +void ReplicatedBackend::do_pull(OpRequestRef op) { MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request); assert(m->get_header().type == MSG_OSD_PG_PULL); @@ -1479,7 +1694,7 @@ void ReplicatedPG::do_pull(OpRequestRef op) send_pushes(m->get_priority(), replies); } -void ReplicatedPG::do_push_reply(OpRequestRef op) +void ReplicatedBackend::do_push_reply(OpRequestRef op) { MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request); assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY); @@ -3976,19 +4191,9 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) } } -bool ReplicatedPG::have_temp_coll() -{ - return temp_created || osd->store->collection_exists(temp_coll); -} - coll_t ReplicatedPG::get_temp_coll(ObjectStore::Transaction *t) { - if (temp_created) - return temp_coll; - if (!osd->store->collection_exists(temp_coll)) - t->create_collection(temp_coll); - temp_created = true; - return temp_coll; + return pgbackend->get_temp_coll(t); } hobject_t ReplicatedPG::generate_temp_object() @@ -3996,6 +4201,7 @@ hobject_t ReplicatedPG::generate_temp_object() ostringstream ss; ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq); hobject_t hoid(object_t(ss.str()), "", CEPH_NOSNAP, 0, -1, ""); + pgbackend->add_temp_obj(hoid); dout(20) << __func__ << " " << hoid << dendl; return hoid; } @@ -4261,7 +4467,6 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) if (cop->temp_cursor.is_initial()) { cop->temp_coll = get_temp_coll(&tctx->local_t); cop->temp_oid = generate_temp_object(); - temp_contents.insert(cop->temp_oid); repop->ctx->new_temp_oid = cop->temp_oid; } @@ -4331,7 +4536,7 @@ int ReplicatedPG::finish_copy(OpContext *ctx) // finish writing to temp object, then move into place _write_copy_chunk(cop, &t); t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid); - temp_contents.erase(cop->temp_oid); + pgbackend->clear_temp_obj(cop->temp_oid); ctx->discard_temp_oid = cop->temp_oid; } @@ -4896,7 +5101,8 @@ void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContextRef obc) void ReplicatedPG::populate_obc_watchers(ObjectContextRef obc) { assert(is_active()); - assert(!is_missing_object(obc->obs.oi.soid) || + assert((recovering.count(obc->obs.oi.soid) || + !is_missing_object(obc->obs.oi.soid)) || (pg_log.get_log().objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary() pg_log.get_log().objects.find(obc->obs.oi.soid)->second->op == pg_log_entry_t::LOST_REVERT && @@ -5009,23 +5215,37 @@ ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi, } ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, - bool can_create) -{ + bool can_create, + map<string, bufferptr> *attrs) +{ + assert( + attrs || !pg_log.get_missing().is_missing(soid) || + // or this is a revert... see recover_primary() + (pg_log.get_log().objects.count(soid) && + pg_log.get_log().objects.find(soid)->second->op == + pg_log_entry_t::LOST_REVERT)); ObjectContextRef obc = object_contexts.lookup(soid); if (obc) { dout(10) << "get_object_context " << obc << " " << soid << dendl; } else { // check disk bufferlist bv; - int r = osd->store->getattr(coll, soid, OI_ATTR, bv); - if (r < 0) { - if (!can_create) - return ObjectContextRef(); // -ENOENT! - - // new object. - object_info_t oi(soid); - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace()); - return create_object_context(oi, ssc); + if (attrs) { + assert(attrs->count(OI_ATTR)); + bv.push_back(attrs->find(OI_ATTR)->second); + } else { + int r = osd->store->getattr(coll, soid, OI_ATTR, bv); + if (r < 0) { + if (!can_create) + return ObjectContextRef(); // -ENOENT! + + // new object. + object_info_t oi(soid); + SnapSetContext *ssc = get_snapset_context( + soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace(), + soid.has_snapset() ? attrs : 0); + return create_object_context(oi, ssc); + } } object_info_t oi(bv); @@ -5037,10 +5257,11 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, obc->obs.oi = oi; obc->obs.exists = true; - if (can_create) { - obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace()); - register_snapset_context(obc->ssc); - } + obc->ssc = get_snapset_context( + soid.oid, soid.get_key(), soid.hash, + true, soid.get_namespace(), + soid.has_snapset() ? attrs : 0); + register_snapset_context(obc->ssc); populate_obc_watchers(obc); dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl; @@ -5259,11 +5480,13 @@ SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) return ssc; } -SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, - const string& key, - ps_t seed, - bool can_create, - const string& nspace) +SnapSetContext *ReplicatedPG::get_snapset_context( + const object_t& oid, + const string& key, + ps_t seed, + bool can_create, + const string& nspace, + map<string, bufferptr> *attrs) { Mutex::Locker l(snapset_contexts_lock); SnapSetContext *ssc; @@ -5272,20 +5495,25 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, ssc = p->second; } else { bufferlist bv; - hobject_t head(oid, key, CEPH_NOSNAP, seed, - info.pgid.pool(), nspace); - int r = osd->store->getattr(coll, head, SS_ATTR, bv); - if (r < 0) { - // try _snapset - hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed, - info.pgid.pool(), nspace); - r = osd->store->getattr(coll, snapdir, SS_ATTR, bv); - if (r < 0 && !can_create) - return NULL; + if (!attrs) { + hobject_t head(oid, key, CEPH_NOSNAP, seed, + info.pgid.pool(), nspace); + int r = osd->store->getattr(coll, head, SS_ATTR, bv); + if (r < 0) { + // try _snapset + hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed, + info.pgid.pool(), nspace); + r = osd->store->getattr(coll, snapdir, SS_ATTR, bv); + if (r < 0 && !can_create) + return NULL; + } + } else { + assert(attrs->count(SS_ATTR)); + bv.push_back(attrs->find(SS_ATTR)->second); } ssc = new SnapSetContext(oid); _register_snapset_context(ssc); - if (r >= 0) { + if (bv.length()) { bufferlist::iterator bvp = bv.begin(); ssc->snapset.decode(bvp); } @@ -5361,12 +5589,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) if (m->new_temp_oid != hobject_t()) { dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; - temp_contents.insert(m->new_temp_oid); + pgbackend->add_temp_obj(m->new_temp_oid); get_temp_coll(&rm->localt); } if (m->discard_temp_oid != hobject_t()) { dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; - temp_contents.erase(m->discard_temp_oid); + pgbackend->clear_temp_obj(m->discard_temp_oid); } ::decode(rm->opt, p); @@ -5491,7 +5719,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) << last_peering_reset << dendl; } - log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); + log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); bool done = rm->applied && rm->committed; unlock(); if (done) { @@ -5532,11 +5760,12 @@ void ReplicatedPG::sub_op_modify_reply(OpRequestRef op) // =========================================================== -void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, - pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets) +void ReplicatedBackend::calc_head_subsets( + ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) { dout(10) << "calc_head_subsets " << head << " clone_overlap " << snapset.clone_overlap << dendl; @@ -5586,11 +5815,12 @@ void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, con << " clone_subsets " << clone_subsets << dendl; } -void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid, - const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets) +void ReplicatedBackend::calc_clone_subsets( + SnapSet& snapset, const hobject_t& soid, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set<uint64_t>& data_subset, + map<hobject_t, interval_set<uint64_t> >& clone_subsets) { dout(10) << "calc_clone_subsets " << soid << " clone_overlap " << snapset.clone_overlap << dendl; @@ -5675,95 +5905,69 @@ void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid, */ enum { PULL_NONE, PULL_OTHER, PULL_YES }; -int ReplicatedPG::prepare_pull( - const hobject_t& soid, eversion_t v, - int priority, - map<int, vector<PullOp> > *pulls) -{ +void ReplicatedBackend::prepare_pull( + const hobject_t& soid, + ObjectContextRef headctx, + RPGHandle *h) +{ + assert(get_parent()->get_local_missing().missing.count(soid)); + eversion_t v = get_parent()->get_local_missing().missing.find( + soid)->second.need; + const map<hobject_t, set<int> > &missing_loc( + get_parent()->get_missing_loc()); + const map<int, pg_missing_t > &peer_missing( + get_parent()->get_peer_missing()); int fromosd = -1; - map<hobject_t,set<int> >::iterator q = missing_loc.find(soid); - if (q != missing_loc.end()) { - // randomize the list of possible sources - // should we take weights into account? - vector<int> shuffle(q->second.begin(), q->second.end()); - random_shuffle(shuffle.begin(), shuffle.end()); - for (vector<int>::iterator p = shuffle.begin(); - p != shuffle.end(); - ++p) { - if (get_osdmap()->is_up(*p)) { - fromosd = *p; - break; - } - } - } - if (fromosd < 0) { - dout(7) << "pull " << soid - << " v " << v - << " but it is unfound" << dendl; - return PULL_NONE; - } + map<hobject_t,set<int> >::const_iterator q = missing_loc.find(soid); + assert(q != missing_loc.end()); + assert(!q->second.empty()); + + // pick a pullee + vector<int> shuffle(q->second.begin(), q->second.end()); + random_shuffle(shuffle.begin(), shuffle.end()); + vector<int>::iterator p = shuffle.begin(); + assert(get_osdmap()->is_up(*p)); + fromosd = *p; + assert(fromosd >= 0); + + dout(7) << "pull " << soid + << "v " << v + << " on osds " << *p + << " from osd." << fromosd + << dendl; assert(peer_missing.count(fromosd)); - if (peer_missing[fromosd].is_missing(soid, v)) { - assert(peer_missing[fromosd].missing[soid].have != v); + const pg_missing_t &pmissing = peer_missing.find(fromosd)->second; + if (pmissing.is_missing(soid, v)) { + assert(pmissing.missing.find(soid)->second.have != v); dout(10) << "pulling soid " << soid << " from osd " << fromosd - << " at version " << peer_missing[fromosd].missing[soid].have + << " at version " << pmissing.missing.find(soid)->second.have << " rather than at version " << v << dendl; - v = peer_missing[fromosd].missing[soid].have; - assert(pg_log.get_log().objects.count(soid) && - pg_log.get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT && - pg_log.get_log().objects.find(soid)->second->reverting_to == v); + v = pmissing.missing.find(soid)->second.have; + assert(get_parent()->get_log().get_log().objects.count(soid) && + (get_parent()->get_log().get_log().objects.find(soid)->second->op == + pg_log_entry_t::LOST_REVERT) && + (get_parent()->get_log().get_log().objects.find( + soid)->second->reverting_to == + v)); } - dout(7) << "pull " << soid - << " v " << v - << " on osds " << missing_loc[soid] - << " from osd." << fromosd - << dendl; - ObjectRecoveryInfo recovery_info; - // is this a snapped object? if so, consult the snapset.. we may not need the entire object! - if (soid.snap && soid.snap < CEPH_NOSNAP) { - // do we have the head and/or snapdir? - hobject_t head = soid; - head.snap = CEPH_NOSNAP; - if (pg_log.get_missing().is_missing(head)) { - if (pulling.count(head)) { - dout(10) << " missing but already pulling head " << head << dendl; - return PULL_NONE; - } else { - int r = prepare_pull( - head, pg_log.get_missing().missing.find(head)->second.need, priority, - pulls); - if (r != PULL_NONE) - return PULL_OTHER; - return PULL_NONE; - } - } - head.snap = CEPH_SNAPDIR; - if (pg_log.get_missing().is_missing(head)) { - if (pulling.count(head)) { - dout(10) << " missing but already pulling snapdir " << head << dendl; - return PULL_NONE; - } else { - int r = prepare_pull( - head, pg_log.get_missing().missing.find(head)->second.need, priority, - pulls); - if (r != PULL_NONE) - return PULL_OTHER; - return PULL_NONE; - } - } - + if (soid.is_snap()) { + assert(!get_parent()->get_local_missing().is_missing( + soid.get_head()) || + !get_parent()->get_local_missing().is_missing( + soid.get_snapdir())); + assert(headctx); // check snapset - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace()); + SnapSetContext *ssc = headctx->ssc; assert(ssc); dout(10) << " snapset " << ssc->snapset << dendl; - calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill, + calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(), + get_info().last_backfill, recovery_info.copy_subset, recovery_info.clone_subset); - put_snapset_context(ssc); // FIXME: this may overestimate if we are pulling multiple clones in parallel... dout(10) << " pulling " << recovery_info << dendl; } else { @@ -5773,8 +5977,8 @@ int ReplicatedPG::prepare_pull( recovery_info.size = ((uint64_t)-1); } - (*pulls)[fromosd].push_back(PullOp()); - PullOp &op = (*pulls)[fromosd].back(); + h->pulls[fromosd].push_back(PullOp()); + PullOp &op = h->pulls[fromosd].back(); op.soid = soid; op.recovery_info = recovery_info; @@ -5788,11 +5992,78 @@ int ReplicatedPG::prepare_pull( assert(!pulling.count(soid)); pull_from_peer[fromosd].insert(soid); PullInfo &pi = pulling[soid]; + pi.head_ctx = headctx; pi.recovery_info = op.recovery_info; pi.recovery_progress = op.recovery_progress; - pi.priority = priority; +} +int ReplicatedPG::recover_missing( + const hobject_t &soid, eversion_t v, + int priority, + PGBackend::RecoveryHandle *h) +{ + map<hobject_t,set<int> >::iterator q = missing_loc.find(soid); + if (q == missing_loc.end()) { + dout(7) << "pull " << soid + << " v " << v + << " but it is unfound" << dendl; + return PULL_NONE; + } + + // is this a snapped object? if so, consult the snapset.. we may not need the entire object! + ObjectContextRef obc; + ObjectContextRef head_obc; + if (soid.snap && soid.snap < CEPH_NOSNAP) { + // do we have the head and/or snapdir? + hobject_t head = soid.get_head(); + if (pg_log.get_missing().is_missing(head)) { + if (recovering.count(head)) { + dout(10) << " missing but already recovering head " << head << dendl; + return PULL_NONE; + } else { + int r = recover_missing( + head, pg_log.get_missing().missing.find(head)->second.need, priority, + h); + if (r != PULL_NONE) + return PULL_OTHER; + return PULL_NONE; + } + } + head = soid.get_snapdir(); + if (pg_log.get_missing().is_missing(head)) { + if (recovering.count(head)) { + dout(10) << " missing but already recovering snapdir " << head << dendl; + return PULL_NONE; + } else { + int r = recover_missing( + head, pg_log.get_missing().missing.find(head)->second.need, priority, + h); + if (r != PULL_NONE) + return PULL_OTHER; + return PULL_NONE; + } + } + + // we must have one or the other + head_obc = get_object_context( + soid.get_head(), + false, + 0); + if (!head_obc) + head_obc = get_object_context( + soid.get_snapdir(), + false, + 0); + assert(head_obc); + } start_recovery_op(soid); + assert(!recovering.count(soid)); + recovering.insert(soid); + pgbackend->recover_object( + soid, + head_obc, + obc, + h); return PULL_YES; } @@ -5816,15 +6087,14 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) * intelligently push an object to a replica. make use of existing * clones/heads and dup data ranges where possible. */ -void ReplicatedPG::prep_push_to_replica( +void ReplicatedBackend::prep_push_to_replica( ObjectContextRef obc, const hobject_t& soid, int peer, - int prio, PushOp *pop) { const object_info_t& oi = obc->obs.oi; uint64_t size = obc->obs.oi.size; - dout(10) << __func__ << soid << " v" << oi.version + dout(10) << __func__ << ": " << soid << " v" << oi.version << " size " << size << " to osd." << peer << dendl; map<hobject_t, interval_set<uint64_t> > clone_subsets; @@ -5837,41 +6107,48 @@ void ReplicatedPG::prep_push_to_replica( // try to base push off of clones that succeed/preceed poid // we need the head (and current SnapSet) locally to do that. - if (pg_log.get_missing().is_missing(head)) { + if (get_parent()->get_local_missing().is_missing(head)) { dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; - return prep_push(prio, obc, soid, peer, pop); + return prep_push(obc, soid, peer, pop); } hobject_t snapdir = head; snapdir.snap = CEPH_SNAPDIR; - if (pg_log.get_missing().is_missing(snapdir)) { - dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl; - return prep_push(prio, obc, soid, peer, pop); + if (get_parent()->get_local_missing().is_missing(snapdir)) { + dout(15) << "push_to_replica missing snapdir " << snapdir + << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop); } - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace()); + SnapSetContext *ssc = obc->ssc; assert(ssc); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - calc_clone_subsets(ssc->snapset, soid, peer_missing[peer], - peer_info[peer].last_backfill, + map<int, pg_missing_t>::const_iterator pm = + get_parent()->get_peer_missing().find(peer); + assert(pm != get_parent()->get_peer_missing().end()); + map<int, pg_info_t>::const_iterator pi = + get_parent()->get_peer_info().find(peer); + assert(pi != get_parent()->get_peer_info().end()); + calc_clone_subsets(ssc->snapset, soid, + pm->second, + pi->second.last_backfill, data_subset, clone_subsets); - put_snapset_context(ssc); } else if (soid.snap == CEPH_NOSNAP) { // pushing head or unversioned object. // base this on partially on replica's clones? - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace()); + SnapSetContext *ssc = obc->ssc; assert(ssc); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - calc_head_subsets(obc, ssc->snapset, soid, peer_missing[peer], - peer_info[peer].last_backfill, - data_subset, clone_subsets); - put_snapset_context(ssc); + calc_head_subsets( + obc, + ssc->snapset, soid, get_parent()->get_peer_missing().find(peer)->second, + get_parent()->get_peer_info().find(peer)->second.last_backfill, + data_subset, clone_subsets); } - prep_push(prio, obc, soid, peer, oi.version, data_subset, clone_subsets, pop); + prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop); } -void ReplicatedPG::prep_push(int prio, - ObjectContextRef obc, +void ReplicatedBackend::prep_push(ObjectContextRef obc, const hobject_t& soid, int peer, PushOp *pop) { @@ -5880,13 +6157,12 @@ void ReplicatedPG::prep_push(int prio, data_subset.insert(0, obc->obs.oi.size); map<hobject_t, interval_set<uint64_t> > clone_subsets; - prep_push(prio, obc, soid, peer, + prep_push(obc, soid, peer, obc->obs.oi.version, data_subset, clone_subsets, pop); } -void ReplicatedPG::prep_push( - int prio, +void ReplicatedBackend::prep_push( ObjectContextRef obc, const hobject_t& soid, int peer, eversion_t version, @@ -5894,9 +6170,10 @@ void ReplicatedPG::prep_push( map<hobject_t, interval_set<uint64_t> >& clone_subsets, PushOp *pop) { - peer_missing[peer].revise_have(soid, eversion_t()); + get_parent()->begin_peer_recover(peer, soid); // take note. PushInfo &pi = pushing[soid][peer]; + pi.obc = obc; pi.recovery_info.size = obc->obs.oi.size; pi.recovery_info.copy_subset = data_subset; pi.recovery_info.clone_subset = clone_subsets; @@ -5907,19 +6184,20 @@ void ReplicatedPG::prep_push( pi.recovery_progress.data_recovered_to = 0; pi.recovery_progress.data_complete = 0; pi.recovery_progress.omap_complete = 0; - pi.priority = prio; ObjectRecoveryProgress new_progress; - build_push_op(pi.recovery_info, - pi.recovery_progress, - &new_progress, - pop); + int r = build_push_op(pi.recovery_info, + pi.recovery_progress, + &new_progress, + pop, + &(pi.stat)); + assert(r == 0); pi.recovery_progress = new_progress; } -int ReplicatedPG::send_pull_legacy(int prio, int peer, - const ObjectRecoveryInfo &recovery_info, - ObjectRecoveryProgress progress) +int ReplicatedBackend::send_pull_legacy(int prio, int peer, + const ObjectRecoveryInfo &recovery_info, + ObjectRecoveryProgress progress) { // send op tid_t tid = osd->get_tid(); @@ -5932,7 +6210,7 @@ int ReplicatedPG::send_pull_legacy(int prio, int peer, << " from osd." << peer << " tid " << tid << dendl; - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, + MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, recovery_info.soid, false, CEPH_OSD_FLAG_ACK, get_osdmap()->get_epoch(), tid, recovery_info.version); @@ -5949,7 +6227,7 @@ int ReplicatedPG::send_pull_legacy(int prio, int peer, return 0; } -void ReplicatedPG::submit_push_data( +void ReplicatedBackend::submit_push_data( ObjectRecoveryInfo &recovery_info, bool first, bool complete, @@ -5971,9 +6249,7 @@ void ReplicatedPG::submit_push_data( } if (first) { - pg_log.revise_have(recovery_info.soid, eversion_t()); - remove_snap_mapped_object(*t, recovery_info.soid); - t->remove(coll, recovery_info.soid); + get_parent()->on_local_recover_start(recovery_info.soid, t); t->remove(get_temp_coll(t), recovery_info.soid); t->touch(target_coll, recovery_info.soid); t->omap_setheader(target_coll, recovery_info.soid, omap_header); @@ -6007,8 +6283,8 @@ void ReplicatedPG::submit_push_data( } } -void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, - ObjectStore::Transaction *t) +void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) { for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = recovery_info.clone_subset.begin(); @@ -6023,67 +6299,29 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, q.get_start(), q.get_len(), q.get_start()); } } - - if (recovery_info.soid.snap < CEPH_NOSNAP) { - assert(recovery_info.oi.snaps.size()); - OSDriver::OSTransaction _t(osdriver.get_transaction(t)); - set<snapid_t> snaps( - recovery_info.oi.snaps.begin(), - recovery_info.oi.snaps.end()); - snap_mapper.add_oid( - recovery_info.soid, - snaps, - &_t); - } - - if (pg_log.get_missing().is_missing(recovery_info.soid) && - pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { - assert(is_primary()); - const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; - if (latest->op == pg_log_entry_t::LOST_REVERT && - latest->reverting_to == recovery_info.version) { - dout(10) << " got old revert version " << recovery_info.version - << " for " << *latest << dendl; - recovery_info.version = latest->version; - // update the attr to the revert event version - recovery_info.oi.prior_version = recovery_info.oi.version; - recovery_info.oi.version = latest->version; - bufferlist bl; - ::encode(recovery_info.oi, bl); - t->setattr(coll, recovery_info.soid, OI_ATTR, bl); - } - } - recover_got(recovery_info.soid, recovery_info.version); - - // update pg - dirty_info = true; - write_if_dirty(*t); } -ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info) +ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( + const ObjectRecoveryInfo& recovery_info, + SnapSetContext *ssc) { if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) return recovery_info; - - SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid, - recovery_info.soid.get_key(), - recovery_info.soid.hash, - false, - recovery_info.soid.get_namespace()); - assert(ssc); ObjectRecoveryInfo new_info = recovery_info; new_info.copy_subset.clear(); new_info.clone_subset.clear(); assert(ssc); - calc_clone_subsets(ssc->snapset, new_info.soid, pg_log.get_missing(), info.last_backfill, + calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(), + get_info().last_backfill, new_info.copy_subset, new_info.clone_subset); - put_snapset_context(ssc); return new_info; } -bool ReplicatedPG::handle_pull_response( +bool ReplicatedBackend::handle_pull_response( int from, PushOp &pop, PullOp *response, - ObjectStore::Transaction *t) + list<ObjectContextRef> *to_continue, + ObjectStore::Transaction *t + ) { interval_set<uint64_t> data_included = pop.data_included; bufferlist data; @@ -6115,7 +6353,13 @@ bool ReplicatedPG::handle_pull_response( pop.recovery_info.copy_subset); } - pi.recovery_info = recalc_subsets(pi.recovery_info); + bool first = pi.recovery_progress.first; + if (first) { + pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset); + pi.recovery_info.oi = pi.obc->obs.oi; + pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc); + } + interval_set<uint64_t> usable_intervals; bufferlist usable_data; @@ -6127,33 +6371,15 @@ bool ReplicatedPG::handle_pull_response( data_included = usable_intervals; data.claim(usable_data); - info.stats.stats.sum.num_bytes_recovered += data.length(); - bool first = pi.recovery_progress.first; pi.recovery_progress = pop.after_progress; + pi.stat.num_bytes_recovered += data.length(); + dout(10) << "new recovery_info " << pi.recovery_info << ", new progress " << pi.recovery_progress << dendl; - if (first) { - bufferlist oibl; - if (pop.attrset.count(OI_ATTR)) { - oibl.push_back(pop.attrset[OI_ATTR]); - ::decode(pi.recovery_info.oi, oibl); - } else { - assert(0); - } - bufferlist ssbl; - if (pop.attrset.count(SS_ATTR)) { - ssbl.push_back(pop.attrset[SS_ATTR]); - ::decode(pi.recovery_info.ss, ssbl); - } else { - assert(pi.recovery_info.soid.snap != CEPH_NOSNAP && - pi.recovery_info.soid.snap != CEPH_SNAPDIR); - } - } - bool complete = pi.is_complete(); submit_push_data(pi.recovery_info, first, @@ -6164,53 +6390,17 @@ bool ReplicatedPG::handle_pull_response( pop.omap_entries, t); - info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size(); - - if (complete) { - info.stats.stats.sum.num_objects_recovered++; - - SnapSetContext *ssc; - if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) { - ssc = create_snapset_context(hoid.oid); - ssc->snapset = pi.recovery_info.ss; - } else { - ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false, - hoid.get_namespace()); - assert(ssc); - } - ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc); - obc->obs.exists = true; - - obc->ondisk_write_lock(); - - // keep track of active pushes for scrub - ++active_pushes; - - t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); - t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); - t->register_on_complete( - new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch())); - } - - t->register_on_commit( - new C_OSD_CommittedPushedObject( - this, - get_osdmap()->get_epoch(), - info.last_complete)); + pi.stat.num_keys_recovered += pop.omap_entries.size(); if (complete) { + to_continue->push_back(pi.obc); + pi.stat.num_objects_recovered++; + get_parent()->on_local_recover( + hoid, pi.stat, pi.recovery_info, pi.obc, t); pulling.erase(hoid); pull_from_peer[from].erase(hoid); - publish_stats_to_osd(); - if (waiting_for_missing_object.count(hoid)) { - dout(20) << " kicking waiters on " << hoid << dendl; - requeue_ops(waiting_for_missing_object[hoid]); - waiting_for_missing_object.erase(hoid); - if (pg_log.get_missing().missing.size() == 0) { - requeue_ops(waiting_for_all_missing); - waiting_for_all_missing.clear(); - } - } + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); return false; } else { response->soid = pop.soid; @@ -6226,11 +6416,11 @@ struct C_OnPushCommit : public Context { C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {} void finish(int) { op->mark_event("committed"); - pg->log_subop_stats(op, l_osd_push_inb, l_osd_sop_push_lat); + log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat); } }; -void ReplicatedPG::handle_push( +void ReplicatedBackend::handle_push( int from, PushOp &pop, PushReplyOp *response, ObjectStore::Transaction *t) { @@ -6244,12 +6434,7 @@ void ReplicatedPG::handle_push( bool complete = pop.after_progress.data_complete && pop.after_progress.omap_complete; - // keep track of active pushes for scrub - ++active_pushes; - response->soid = pop.recovery_info.soid; - t->register_on_applied( - new C_OSD_AppliedRecoveredObjectReplica(this)); submit_push_data(pop.recovery_info, first, complete, @@ -6260,14 +6445,16 @@ void ReplicatedPG::handle_push( pop.omap_entries, t); - t->register_on_commit( - new C_OSD_CommittedPushedObject( - this, - get_osdmap()->get_epoch(), - info.last_complete)); + if (complete) + get_parent()->on_local_recover( + pop.recovery_info.soid, + object_stat_sum_t(), + pop.recovery_info, + ObjectContextRef(), // ok, is replica + t); } -void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes) +void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes) { for (map<int, vector<PushOp> >::iterator i = pushes.begin(); i != pushes.end(); @@ -6291,7 +6478,7 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes) uint64_t cost = 0; uint64_t pushes = 0; MOSDPGPush *msg = new MOSDPGPush(); - msg->pgid = info.pgid; + msg->pgid = get_info().pgid; msg->map_epoch = get_osdmap()->get_epoch(); msg->set_priority(prio); for (; @@ -6312,7 +6499,7 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes) } } -void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls) +void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls) { for (map<int, vector<PullOp> >::iterator i = pulls.begin(); i != pulls.end(); @@ -6339,7 +6526,7 @@ void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls) << " to osd." << i->first << dendl; MOSDPGPull *msg = new MOSDPGPull(); msg->set_priority(prio); - msg->pgid = info.pgid; + msg->pgid = get_info().pgid; msg->map_epoch = get_osdmap()->get_epoch(); msg->pulls.swap(i->second); msg->compute_cost(cct); @@ -6348,22 +6535,11 @@ void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls) } } -int ReplicatedPG::send_push(int prio, int peer, - const ObjectRecoveryInfo &recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress) -{ - PushOp op; - int r = build_push_op(recovery_info, progress, out_progress, &op); - if (r < 0) - return r; - return send_push_op_legacy(prio, peer, op); -} - -int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress, - PushOp *out_op) +int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op, + object_stat_sum_t *stat) { ObjectRecoveryProgress _new_progress; if (!out_progress) @@ -6387,7 +6563,7 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, object_info_t oi(bv); if (oi.version != recovery_info.version) { - osd->clog.error() << info.pgid << " push " + osd->clog.error() << get_info().pgid << " push " << recovery_info.soid << " v " << " failed because local copy is " << oi.version << "\n"; @@ -6450,11 +6626,14 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, if (new_progress.is_complete(recovery_info)) { new_progress.data_complete = true; - info.stats.stats.sum.num_objects_recovered++; + if (stat) + stat->num_objects_recovered++; } - info.stats.stats.sum.num_keys_recovered += out_op->omap_entries.size(); - info.stats.stats.sum.num_bytes_recovered += out_op->data.length(); + if (stat) { + stat->num_keys_recovered += out_op->omap_entries.size(); + stat->num_bytes_recovered += out_op->data.length(); + } osd->logger->inc(l_osd_push); osd->logger->inc(l_osd_push_outb, out_op->data.length()); @@ -6468,11 +6647,11 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, return 0; } -int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop) +int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop) { tid_t tid = osd->get_tid(); osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, pop.soid, + MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid, false, 0, get_osdmap()->get_epoch(), tid, pop.recovery_info.version); subop->ops = vector<OSDOp>(1); @@ -6493,14 +6672,14 @@ int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop) return 0; } -void ReplicatedPG::prep_push_op_blank(const hobject_t& soid, PushOp *op) +void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) { op->recovery_info.version = eversion_t(); op->version = eversion_t(); op->soid = soid; } -void ReplicatedPG::sub_op_push_reply(OpRequestRef op) +void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) { MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request); const hobject_t& soid = reply->get_poid(); @@ -6515,10 +6694,10 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op) PushOp pop; bool more = handle_push_reply(peer, rop, &pop); if (more) - send_push_op_legacy(pushing[soid][peer].priority, peer, pop); + send_push_op_legacy(op->request->get_priority(), peer, pop); } -bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) +bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) { const hobject_t &soid = op.soid; if (pushing.count(soid) == 0) { @@ -6538,32 +6717,25 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) << pi->recovery_progress.data_recovered_to << " of " << pi->recovery_info.copy_subset << dendl; ObjectRecoveryProgress new_progress; - build_push_op( + int r = build_push_op( pi->recovery_info, - pi->recovery_progress, &new_progress, reply); + pi->recovery_progress, &new_progress, reply, + &(pi->stat)); + assert(r == 0); pi->recovery_progress = new_progress; return true; } else { // done! - if (peer == backfill_target && backfills_in_flight.count(soid)) - backfills_in_flight.erase(soid); - else - peer_missing[peer].got(soid, pi->recovery_info.version); + get_parent()->on_peer_recover( + peer, soid, pi->recovery_info, + pi->stat); pushing[soid].erase(peer); pi = NULL; - publish_stats_to_osd(); if (pushing[soid].empty()) { - pushing.erase(soid); - dout(10) << "pushed " << soid << " to all replicas" << dendl; - finish_recovery_op(soid); - if (waiting_for_degraded_object.count(soid)) { - requeue_ops(waiting_for_degraded_object[soid]); - waiting_for_degraded_object.erase(soid); - } - finish_degraded_object(soid); + get_parent()->on_global_recover(soid); } else { dout(10) << "pushed " << soid << ", still waiting for push ack from " << pushing[soid].size() << " others" << dendl; @@ -6601,7 +6773,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) * process request to pull an entire object. * NOTE: called from opqueue. */ -void ReplicatedPG::sub_op_pull(OpRequestRef op) +void ReplicatedBackend::sub_op_pull(OpRequestRef op) { MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); assert(m->get_header().type == MSG_OSD_SUBOP); @@ -6628,16 +6800,17 @@ void ReplicatedPG::sub_op_pull(OpRequestRef op) m->get_source().num(), reply); - log_subop_stats(op, 0, l_osd_sop_pull_lat); + log_subop_stats(osd, op, 0, l_osd_sop_pull_lat); } -void ReplicatedPG::handle_pull(int peer, PullOp &op, PushOp *reply) +void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply) { const hobject_t &soid = op.soid; struct stat st; int r = osd->store->stat(coll, soid, &st); if (r != 0) { - osd->clog.error() << info.pgid << " " << peer << " tried to pull " << soid + osd->clog.error() << get_info().pgid << " " + << peer << " tried to pull " << soid << " but got " << cpp_strerror(-r) << "\n"; prep_push_op_blank(soid, reply); } else { @@ -6754,7 +6927,7 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) * @param intervals_usable intervals we want to keep * @param data_usable matching data we want to keep */ -void ReplicatedPG::trim_pushed_data( +void ReplicatedBackend::trim_pushed_data( const interval_set<uint64_t> ©_subset, const interval_set<uint64_t> &intervals_received, bufferlist data_received, @@ -6792,7 +6965,7 @@ void ReplicatedPG::trim_pushed_data( /** op_push * NOTE: called from opqueue. */ -void ReplicatedPG::sub_op_push(OpRequestRef op) +void ReplicatedBackend::sub_op_push(OpRequestRef op) { op->mark_started(); MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request); @@ -6812,14 +6985,29 @@ void ReplicatedPG::sub_op_push(OpRequestRef op) if (is_primary()) { PullOp resp; - bool more = handle_pull_response(m->get_source().num(), pop, &resp, t); + RPGHandle *h = _open_recovery_op(); + list<ObjectContextRef> to_continue; + bool more = handle_pull_response( + m->get_source().num(), pop, &resp, + &to_continue, t); if (more) { send_pull_legacy( m->get_priority(), m->get_source().num(), resp.recovery_info, resp.recovery_progress); - } + } else { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + op->request->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new C_QueueInWQ( + &osd->push_wq, + get_parent()->bless_gencontext(c))); + } + run_recovery_op(h, op->request->get_priority()); } else { PushReplyOp resp; MOSDSubOpReply *reply = new MOSDSubOpReply( @@ -6828,15 +7016,16 @@ void ReplicatedPG::sub_op_push(OpRequestRef op) assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); handle_push(m->get_source().num(), pop, &resp, t); t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + osd, reply, m->get_connection())); } - t->register_on_commit(new C_OnPushCommit(this, op)); - osd->store->queue_transaction(osr.get(), t); + get_parent()->queue_transaction(t); return; } -void ReplicatedPG::_failed_push(int from, const hobject_t &soid) +void ReplicatedPG::failed_push(int from, const hobject_t &soid) { + assert(recovering.count(soid)); + recovering.erase(soid); map<hobject_t,set<int> >::iterator p = missing_loc.find(soid); if (p != missing_loc.end()) { dout(0) << "_failed_push " << soid << " from osd." << from @@ -6849,9 +7038,15 @@ void ReplicatedPG::_failed_push(int from, const hobject_t &soid) dout(0) << "_failed_push " << soid << " from osd." << from << " but not in missing_loc ???" << dendl; } - finish_recovery_op(soid); // close out this attempt, +} + +void ReplicatedBackend::_failed_push(int from, const hobject_t &soid) +{ + get_parent()->failed_push(from, soid); pull_from_peer[from].erase(soid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); pulling.erase(soid); } @@ -7147,20 +7342,6 @@ void ReplicatedPG::on_shutdown() cancel_recovery(); } -void ReplicatedPG::on_flushed() -{ - assert(object_contexts.empty()); - if (have_temp_coll() && - !osd->store->collection_empty(get_temp_coll())) { - vector<hobject_t> objects; - osd->store->collection_list(get_temp_coll(), objects); - derr << __func__ << ": found objects in the temp collection: " - << objects << ", crashing now" - << dendl; - assert(0 == "found garbage in the temp collection"); - } -} - void ReplicatedPG::on_activate() { for (unsigned i = 1; i<acting.size(); i++) { @@ -7223,20 +7404,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) // any dups apply_and_flush_repops(is_primary()); - // clear pushing/pulling maps - pushing.clear(); - pulling.clear(); - pull_from_peer.clear(); - - // clear temp - for (set<hobject_t>::iterator i = temp_contents.begin(); - i != temp_contents.end(); - ++i) { - dout(10) << __func__ << ": Removing oid " - << *i << " from the temp collection" << dendl; - t->remove(get_temp_coll(t), *i); - } - temp_contents.clear(); + pgbackend->on_change(t); // clear snap_trimmer state snap_trimmer_machine.process_event(Reset()); @@ -7262,9 +7430,16 @@ void ReplicatedPG::_clear_recovery_state() backfill_pos = hobject_t(); backfills_in_flight.clear(); pending_backfill_updates.clear(); - pulling.clear(); - pushing.clear(); - pull_from_peer.clear(); + recovering.clear(); + pgbackend->clear_state(); +} + +void ReplicatedPG::cancel_pull(const hobject_t &soid) +{ + assert(recovering.count(soid)); + recovering.erase(soid); + finish_recovery_op(soid); + pg_log.set_last_requested(0); // get recover_primary to start over } void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) @@ -7283,26 +7458,10 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) } dout(10) << "check_recovery_sources source osd." << *p << " now down" << dendl; now_down.insert(*p); - - // reset pulls? - map<int, set<hobject_t> >::iterator j = pull_from_peer.find(*p); - if (j != pull_from_peer.end()) { - dout(10) << "check_recovery_sources resetting pulls from osd." << *p - << ", osdmap has it marked down" << dendl; - for (set<hobject_t>::iterator i = j->second.begin(); - i != j->second.end(); - ++i) { - assert(pulling.count(*i) == 1); - pulling.erase(*i); - finish_recovery_op(*i); - } - pg_log.set_last_requested(0); - pull_from_peer.erase(j++); - } - - // remove from missing_loc_sources missing_loc_sources.erase(p++); } + pgbackend->check_recovery_sources(osdmap); + if (now_down.empty()) { dout(10) << "check_recovery_sources no source osds (" << missing_loc_sources << ") went down" << dendl; } else { @@ -7388,7 +7547,8 @@ int ReplicatedPG::start_recovery_ops( } bool deferred_backfill = false; - if (state_test(PG_STATE_BACKFILL) && + if (recovering.empty() && + state_test(PG_STATE_BACKFILL) && backfill_target >= 0 && started < max && missing.num_missing() == 0 && !waiting_on_backfill) { @@ -7416,9 +7576,11 @@ int ReplicatedPG::start_recovery_ops( dout(10) << " started " << started << dendl; osd->logger->inc(l_osd_rop, started); - if (started || recovery_ops_active > 0 || deferred_backfill) + if (!recovering.empty() || + started || recovery_ops_active > 0 || deferred_backfill) return started; + assert(recovering.empty()); assert(recovery_ops_active == 0); int unfound = get_num_unfound(); @@ -7484,7 +7646,8 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) const pg_missing_t &missing = pg_log.get_missing(); - dout(10) << "recover_primary pulling " << pulling.size() << " in pg" << dendl; + dout(10) << "recover_primary recovering " << recovering.size() + << " in pg" << dendl; dout(10) << "recover_primary " << missing << dendl; dout(25) << "recover_primary " << missing.missing << dendl; @@ -7493,7 +7656,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) int started = 0; int skipped = 0; - map<int, vector<PullOp> > pulls; + PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); map<version_t, hobject_t>::const_iterator p = missing.rmissing.lower_bound(pg_log.get_log().last_requested); while (p != missing.rmissing.end()) { @@ -7524,8 +7687,8 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) << (unfound ? " (unfound)":"") << (missing.is_missing(soid) ? " (missing)":"") << (missing.is_missing(head) ? " (missing head)":"") - << (pulling.count(soid) ? " (pulling)":"") - << (pulling.count(head) ? " (pulling head)":"") + << (recovering.count(soid) ? " (recovering)":"") + << (recovering.count(head) ? " (recovering head)":"") << dendl; if (latest) { @@ -7600,14 +7763,14 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) } } - if (!pulling.count(soid)) { - if (pulling.count(head)) { + if (!recovering.count(soid)) { + if (recovering.count(head)) { ++skipped; } else if (unfound) { ++skipped; } else { - int r = prepare_pull( - soid, need, cct->_conf->osd_recovery_op_priority, &pulls); + int r = recover_missing( + soid, need, cct->_conf->osd_recovery_op_priority, h); switch (r) { case PULL_YES: ++started; @@ -7629,14 +7792,14 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) if (!skipped) pg_log.set_last_requested(v); } - - send_pulls(cct->_conf->osd_recovery_op_priority, pulls); + + pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority); return started; } int ReplicatedPG::prep_object_replica_pushes( - const hobject_t& soid, eversion_t v, int prio, - map<int, vector<PushOp> > *pushes) + const hobject_t& soid, eversion_t v, + PGBackend::RecoveryHandle *h) { dout(10) << __func__ << ": on " << soid << dendl; @@ -7663,30 +7826,46 @@ int ReplicatedPG::prep_object_replica_pushes( return 0; } - dout(10) << " ondisk_read_lock for " << soid << dendl; + start_recovery_op(soid); + assert(!recovering.count(soid)); + recovering.insert(soid); + + /* We need this in case there is an in progress write on the object. In fact, + * the only possible write is an update to the xattr due to a lost_revert -- + * a client write would be blocked since the object is degraded. + * In almost all cases, therefore, this lock should be uncontended. + */ obc->ondisk_read_lock(); - + pgbackend->recover_object( + soid, + ObjectContextRef(), + obc, // has snapset context + h); + obc->ondisk_read_unlock(); + return 1; +} + +int ReplicatedBackend::start_pushes( + const hobject_t &soid, + ObjectContextRef obc, + RPGHandle *h) +{ + int pushes = 0; // who needs it? - bool started = false; - for (unsigned i=1; i<acting.size(); i++) { - int peer = acting[i]; - if (peer_missing.count(peer) && - peer_missing[peer].is_missing(soid)) { - if (!started) { - start_recovery_op(soid); - started = true; - } - (*pushes)[peer].push_back(PushOp()); - prep_push_to_replica(obc, soid, peer, prio, - &((*pushes)[peer].back()) + for (unsigned i=1; i<get_parent()->get_acting().size(); i++) { + int peer = get_parent()->get_acting()[i]; + map<int, pg_missing_t>::const_iterator j = + get_parent()->get_peer_missing().find(peer); + assert(j != get_parent()->get_peer_missing().end()); + if (j->second.is_missing(soid)) { + ++pushes; + h->pushes[peer].push_back(PushOp()); + prep_push_to_replica(obc, soid, peer, + &(h->pushes[peer].back()) ); } } - - dout(10) << " ondisk_read_unlock on " << soid << dendl; - obc->ondisk_read_unlock(); - - return 1; + return pushes; } int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) @@ -7694,7 +7873,7 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) dout(10) << __func__ << "(" << max << ")" << dendl; int started = 0; - map<int, vector<PushOp> > pushes; + PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); // this is FAR from an optimal recovery order. pretty lame, really. for (unsigned i=1; i<acting.size(); i++) { @@ -7714,8 +7893,8 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) handle.reset_tp_timeout(); const hobject_t soid(p->second); - if (pushing.count(soid)) { - dout(10) << __func__ << ": already pushing " << soid << dendl; + if (recovering.count(soid)) { + dout(10) << __func__ << ": already recovering" << soid << dendl; continue; } @@ -7730,13 +7909,11 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl; map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid); started += prep_object_replica_pushes(soid, r->second.need, - cct->_conf->osd_recovery_op_priority, - &pushes); + h); } } - send_pushes(cct->_conf->osd_recovery_op_priority, pushes); - + pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority); return started; } @@ -7899,15 +8076,16 @@ int ReplicatedPG::recover_backfill( send_remove_op(i->first, i->second, backfill_target); } + PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); map<int, vector<PushOp> > pushes; for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin(); i != to_push.end(); ++i) { handle.reset_tp_timeout(); prep_backfill_object_push( - i->first, i->second.first, i->second.second, backfill_target, &pushes); + i->first, i->second.first, i->second.second, backfill_target, h); } - send_pushes(cct->_conf->osd_recovery_op_priority, pushes); + pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority); release_waiting_for_backfill_pos(); dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is " @@ -7953,20 +8131,25 @@ int ReplicatedPG::recover_backfill( void ReplicatedPG::prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, - map<int, vector<PushOp> > *pushes) + PGBackend::RecoveryHandle *h) { dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl; backfills_in_flight.insert(oid); + map<int, pg_missing_t>::iterator bpm = peer_missing.find(backfill_target); + assert(bpm != peer_missing.end()); + bpm->second.add(oid, eversion_t(), eversion_t()); - if (!pushing.count(oid)) - start_recovery_op(oid); + assert(!recovering.count(oid)); + + start_recovery_op(oid); + recovering.insert(oid); ObjectContextRef obc = get_object_context(oid, false); - obc->ondisk_read_lock(); - (*pushes)[peer].push_back(PushOp()); - prep_push_to_replica(obc, oid, peer, cct->_conf->osd_recovery_op_priority, - &((*pushes)[peer].back())); - obc->ondisk_read_unlock(); + pgbackend->recover_object( + oid, + ObjectContextRef(), + obc, + h); } void ReplicatedPG::scan_range( diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index e880bdecade..e24592e932f 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -33,6 +33,9 @@ #include "common/sharedptr_registry.hpp" +#include "PGBackend.h" +#include "ReplicatedBackend.h" + class MOSDSubOpReply; class ReplicatedPG; @@ -80,7 +83,7 @@ public: virtual bool filter(bufferlist& xattr_data, bufferlist& outdata); }; -class ReplicatedPG : public PG { +class ReplicatedPG : public PG, public PGBackend::Listener { friend class OSD; friend class Watch; @@ -122,6 +125,119 @@ public: }; typedef boost::shared_ptr<CopyOp> CopyOpRef; + boost::scoped_ptr<PGBackend> pgbackend; + + /// Listener methods + void on_local_recover_start( + const hobject_t &oid, + ObjectStore::Transaction *t); + void on_local_recover( + const hobject_t &oid, + const object_stat_sum_t &stat_diff, + const ObjectRecoveryInfo &recovery_info, + ObjectContextRef obc, + ObjectStore::Transaction *t + ); + void on_peer_recover( + int peer, + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info, + const object_stat_sum_t &stat + ); + void begin_peer_recover( + int peer, + const hobject_t oid); + void on_global_recover( + const hobject_t &oid); + void failed_push(int from, const hobject_t &soid); + void cancel_pull(const hobject_t &soid); + + template <typename T> + class BlessedGenContext : public GenContext<T> { + ReplicatedPG *pg; + GenContext<T> *c; + epoch_t e; + public: + BlessedGenContext(ReplicatedPG *pg, GenContext<T> *c, epoch_t e) + : pg(pg), c(c), e(e) {} + void finish(T t) { + pg->lock(); + if (pg->pg_has_reset_since(e)) + delete c; + else + c->complete(t); + pg->unlock(); + } + }; + class BlessedContext : public Context { + ReplicatedPG *pg; + Context *c; + epoch_t e; + public: + BlessedContext(ReplicatedPG *pg, Context *c, epoch_t e) + : pg(pg), c(c), e(e) {} + void finish(int r) { + pg->lock(); + if (pg->pg_has_reset_since(e)) + delete c; + else + c->complete(r); + pg->unlock(); + } + }; + Context *bless_context(Context *c) { + return new BlessedContext(this, c, get_osdmap()->get_epoch()); + } + GenContext<ThreadPool::TPHandle&> *bless_gencontext( + GenContext<ThreadPool::TPHandle&> *c) { + return new BlessedGenContext<ThreadPool::TPHandle&>( + this, c, get_osdmap()->get_epoch()); + } + + void send_message(int to_osd, Message *m) { + osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch()); + } + void queue_transaction(ObjectStore::Transaction *t) { + osd->store->queue_transaction(osr.get(), t); + } + epoch_t get_epoch() { + return get_osdmap()->get_epoch(); + } + const vector<int> &get_acting() { + return acting; + } + std::string gen_dbg_prefix() const { return gen_prefix(); } + + const map<hobject_t, set<int> > &get_missing_loc() { + return missing_loc; + } + const map<int, pg_missing_t> &get_peer_missing() { + return peer_missing; + } + const map<int, pg_info_t> &get_peer_info() { + return peer_info; + } + const pg_missing_t &get_local_missing() { + return pg_log.get_missing(); + } + const PGLog &get_log() { + return pg_log; + } + bool pgb_is_primary() const { + return is_primary(); + } + OSDMapRef pgb_get_osdmap() const { + return get_osdmap(); + } + const pg_info_t &get_info() const { + return info; + } + ObjectContextRef get_obc( + const hobject_t &hoid, + map<string, bufferptr> &attrs) { + return get_object_context(hoid, true, &attrs); + } + /* * Capture all object state associated with an in-progress read or write. */ @@ -339,7 +455,11 @@ public: protected: ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc); - ObjectContextRef get_object_context(const hobject_t& soid, bool can_create); + ObjectContextRef get_object_context( + const hobject_t& soid, + bool can_create, + map<string, bufferptr> *attrs = 0 + ); void context_registry_on_change(); void object_context_destructor_callback(ObjectContext *obc); @@ -362,8 +482,11 @@ protected: void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc); SnapSetContext *create_snapset_context(const object_t& oid); - SnapSetContext *get_snapset_context(const object_t& oid, const string &key, - ps_t seed, bool can_create, const string &nspace); + SnapSetContext *get_snapset_context( + const object_t& oid, const string &key, + ps_t seed, bool can_create, const string &nspace, + map<string, bufferptr> *attrs = 0 + ); void register_snapset_context(SnapSetContext *ssc) { Mutex::Locker l(snapset_contexts_lock); _register_snapset_context(ssc); @@ -378,90 +501,7 @@ protected: } void put_snapset_context(SnapSetContext *ssc); - // push - struct PushInfo { - ObjectRecoveryProgress recovery_progress; - ObjectRecoveryInfo recovery_info; - int priority; - - void dump(Formatter *f) const { - { - f->open_object_section("recovery_progress"); - recovery_progress.dump(f); - f->close_section(); - } - { - f->open_object_section("recovery_info"); - recovery_info.dump(f); - f->close_section(); - } - } - }; - map<hobject_t, map<int, PushInfo> > pushing; - - // pull - struct PullInfo { - ObjectRecoveryProgress recovery_progress; - ObjectRecoveryInfo recovery_info; - int priority; - - void dump(Formatter *f) const { - { - f->open_object_section("recovery_progress"); - recovery_progress.dump(f); - f->close_section(); - } - { - f->open_object_section("recovery_info"); - recovery_info.dump(f); - f->close_section(); - } - } - - bool is_complete() const { - return recovery_progress.is_complete(recovery_info); - } - }; - map<hobject_t, PullInfo> pulling; - - ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info); - static void trim_pushed_data(const interval_set<uint64_t> ©_subset, - const interval_set<uint64_t> &intervals_received, - bufferlist data_received, - interval_set<uint64_t> *intervals_usable, - bufferlist *data_usable); - bool handle_pull_response( - int from, PushOp &op, PullOp *response, - ObjectStore::Transaction *t); - void handle_push( - int from, PushOp &op, PushReplyOp *response, - ObjectStore::Transaction *t); - void send_pushes(int prio, map<int, vector<PushOp> > &pushes); - int send_push(int priority, int peer, - const ObjectRecoveryInfo& recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress = 0); - int build_push_op(const ObjectRecoveryInfo &recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress, - PushOp *out_op); - int send_push_op_legacy(int priority, int peer, - PushOp &pop); - - int send_pull_legacy(int priority, int peer, - const ObjectRecoveryInfo& recovery_info, - ObjectRecoveryProgress progress); - void submit_push_data(ObjectRecoveryInfo &recovery_info, - bool first, - bool complete, - const interval_set<uint64_t> &intervals_included, - bufferlist data_included, - bufferlist omap_header, - map<string, bufferptr> &attrs, - map<string, bufferlist> &omap_entries, - ObjectStore::Transaction *t); - void submit_push_complete(ObjectRecoveryInfo &recovery_info, - ObjectStore::Transaction *t); + set<hobject_t> recovering; /* * Backfill @@ -504,54 +544,17 @@ protected: f->close_section(); } { - f->open_array_section("pull_from_peer"); - for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin(); - i != pull_from_peer.end(); + f->open_array_section("recovering"); + for (set<hobject_t>::const_iterator i = recovering.begin(); + i != recovering.end(); ++i) { - f->open_object_section("pulling_from"); - f->dump_int("pull_from", i->first); - { - f->open_array_section("pulls"); - for (set<hobject_t>::const_iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - f->open_object_section("pull_info"); - assert(pulling.count(*j)); - pulling.find(*j)->second.dump(f); - f->close_section(); - } - f->close_section(); - } - f->close_section(); + f->dump_stream("object") << *i; } f->close_section(); } { - f->open_array_section("pushing"); - for (map<hobject_t, map<int, PushInfo> >::const_iterator i = - pushing.begin(); - i != pushing.end(); - ++i) { - f->open_object_section("object"); - f->dump_stream("pushing") << i->first; - { - f->open_array_section("pushing_to"); - for (map<int, PushInfo>::const_iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - f->open_object_section("push_progress"); - f->dump_stream("object_pushing") << j->first; - { - f->open_object_section("push_info"); - j->second.dump(f); - f->close_section(); - } - f->close_section(); - } - f->close_section(); - } - f->close_section(); - } + f->open_object_section("pg_backend"); + pgbackend->dump_recovery_info(f); f->close_section(); } } @@ -559,53 +562,19 @@ protected: /// leading edge of backfill hobject_t backfill_pos; - // Reverse mapping from osd peer to objects beging pulled from that peer - map<int, set<hobject_t> > pull_from_peer; - int prep_object_replica_pushes(const hobject_t& soid, eversion_t v, - int priority, - map<int, vector<PushOp> > *pushes); - void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, - pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets); - void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set<uint64_t>& data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets); - void prep_push_to_replica( - ObjectContextRef obc, - const hobject_t& oid, - int dest, - int priority, - PushOp *push_op); - void prep_push(int priority, - ObjectContextRef obc, - const hobject_t& oid, int dest, - PushOp *op); - void prep_push(int priority, - ObjectContextRef obc, - const hobject_t& soid, int peer, - eversion_t version, - interval_set<uint64_t> &data_subset, - map<hobject_t, interval_set<uint64_t> >& clone_subsets, - PushOp *op); - void prep_push_op_blank(const hobject_t& soid, PushOp *op); + PGBackend::RecoveryHandle *h); void finish_degraded_object(const hobject_t& oid); // Cancels/resets pulls from peer void check_recovery_sources(const OSDMapRef map); - void send_pulls( - int priority, - map<int, vector<PullOp> > &pulls); - int prepare_pull( - const hobject_t& oid, eversion_t v, + int recover_missing( + const hobject_t& oid, + eversion_t v, int priority, - map<int, vector<PullOp> > *pulls - ); + PGBackend::RecoveryHandle *h); // low level ops @@ -657,7 +626,7 @@ protected: void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, - map<int, vector<PushOp> > *pushes); + PGBackend::RecoveryHandle *h); void send_remove_op(const hobject_t& oid, eversion_t v, int peer); @@ -731,35 +700,6 @@ protected: pg->_committed_pushed_object(epoch, last_complete); } }; - struct C_OSD_SendMessageOnConn: public Context { - OSDService *osd; - Message *reply; - ConnectionRef conn; - C_OSD_SendMessageOnConn( - OSDService *osd, - Message *reply, - ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {} - void finish(int) { - osd->send_message_osd_cluster(reply, conn.get()); - } - }; - struct C_OSD_CompletedPull : public Context { - ReplicatedPGRef pg; - hobject_t hoid; - epoch_t epoch; - C_OSD_CompletedPull( - ReplicatedPG *pg, - const hobject_t &hoid, - epoch_t epoch) : pg(pg), hoid(hoid), epoch(epoch) {} - void finish(int) { - pg->lock(); - if (!pg->pg_has_reset_since(epoch)) { - pg->finish_recovery_op(hoid); - } - pg->unlock(); - } - }; - friend struct C_OSD_CompletedPull; struct C_OSD_AppliedRecoveredObjectReplica : public Context { ReplicatedPGRef pg; C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p) : @@ -780,14 +720,6 @@ protected: void _applied_recovered_object_replica(); void _committed_pushed_object(epoch_t epoch, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); - void sub_op_push(OpRequestRef op); - void _failed_push(int from, const hobject_t &soid); - void sub_op_push_reply(OpRequestRef op); - bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply); - void sub_op_pull(OpRequestRef op); - void handle_pull(int peer, PullOp &op, PushOp *reply); - - void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat); // -- copyfrom -- map<hobject_t, CopyOpRef> copy_ops; @@ -828,6 +760,9 @@ public: int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata, bufferlist& odata); + void do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle); void do_op(OpRequestRef op); bool pg_op_must_wait(MOSDOp *op); void do_pg_op(OpRequestRef op); @@ -837,17 +772,7 @@ public: OpRequestRef op, ThreadPool::TPHandle &handle); void do_backfill(OpRequestRef op); - void _do_push(OpRequestRef op); - void _do_pull_response(OpRequestRef op); - void do_push(OpRequestRef op) { - if (is_primary()) { - _do_pull_response(op); - } else { - _do_push(op); - } - } - void do_pull(OpRequestRef op); - void do_push_reply(OpRequestRef op); + RepGather *trim_object(const hobject_t &coid); void snap_trimmer(); int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops); @@ -857,16 +782,27 @@ public: void do_osd_op_effects(OpContext *ctx); private: - bool temp_created; - coll_t temp_coll; - set<hobject_t> temp_contents; ///< contents of temp collection, clear on reset uint64_t temp_seq; ///< last id for naming temp objects coll_t get_temp_coll(ObjectStore::Transaction *t); hobject_t generate_temp_object(); ///< generate a new temp object name public: - bool have_temp_coll(); - coll_t get_temp_coll() { - return temp_coll; + void get_colls(list<coll_t> *out) { + out->push_back(coll); + return pgbackend->temp_colls(out); + } + void split_colls( + pg_t child, + int split_bits, + int seed, + ObjectStore::Transaction *t) { + coll_t target = coll_t(child); + t->create_collection(target); + t->split_collection( + coll, + split_bits, + seed, + target); + pgbackend->split_colls(child, split_bits, seed, t); } private: struct NotTrimming; @@ -952,7 +888,10 @@ public: void on_role_change(); void on_change(ObjectStore::Transaction *t); void on_activate(); - void on_flushed(); + void on_flushed() { + assert(object_contexts.empty()); + pgbackend->on_flushed(); + } void on_removal(ObjectStore::Transaction *t); void on_shutdown(); }; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 66cce34b264..884b8ada8cc 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -46,6 +46,7 @@ typedef hobject_t collection_list_handle_t; +typedef uint8_t shard_id_t; /** * osd request identifier |