summaryrefslogtreecommitdiff
path: root/src/osd/ReplicatedBackend.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedBackend.h')
-rw-r--r--src/osd/ReplicatedBackend.h309
1 files changed, 309 insertions, 0 deletions
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
new file mode 100644
index 00000000000..e34e55a618e
--- /dev/null
+++ b/src/osd/ReplicatedBackend.h
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef REPBACKEND_H
+#define REPBACKEND_H
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "osd_types.h"
+
+struct C_ReplicatedBackend_OnPullComplete;
+class ReplicatedBackend : public PGBackend {
+ struct RPGHandle : public PGBackend::RecoveryHandle {
+ map<int, vector<PushOp> > pushes;
+ map<int, vector<PullOp> > pulls;
+ };
+ friend struct C_ReplicatedBackend_OnPullComplete;
+private:
+ bool temp_created;
+ const coll_t temp_coll;
+ coll_t get_temp_coll() const {
+ return temp_coll;
+ }
+ bool have_temp_coll() const { return temp_created; }
+
+ // Track contents of temp collection, clear on reset
+ set<hobject_t> temp_contents;
+public:
+ coll_t coll;
+ OSDService *osd;
+ CephContext *cct;
+
+ ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
+
+ /// @see PGBackend::open_recovery_op
+ RPGHandle *_open_recovery_op() {
+ return new RPGHandle();
+ }
+ PGBackend::RecoveryHandle *open_recovery_op() {
+ return _open_recovery_op();
+ }
+
+ /// @see PGBackend::run_recovery_op
+ void run_recovery_op(
+ PGBackend::RecoveryHandle *h,
+ int priority);
+
+ /// @see PGBackend::recover_object
+ void recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h
+ );
+
+ void check_recovery_sources(const OSDMapRef osdmap);
+
+ /// @see PGBackend::handle_message
+ bool handle_message(
+ OpRequestRef op
+ );
+
+ void on_change(ObjectStore::Transaction *t);
+ void clear_state();
+ void on_flushed();
+
+ void temp_colls(list<coll_t> *out) {
+ if (temp_created)
+ out->push_back(temp_coll);
+ }
+ void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) {
+ coll_t target = coll_t::make_temp_coll(child);
+ if (!temp_created)
+ return;
+ t->create_collection(target);
+ t->split_collection(
+ temp_coll,
+ split_bits,
+ seed,
+ target);
+ }
+
+ virtual void dump_recovery_info(Formatter *f) const {
+ {
+ f->open_array_section("pull_from_peer");
+ for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ++i) {
+ f->open_object_section("pulling_from");
+ f->dump_int("pull_from", i->first);
+ {
+ f->open_array_section("pulls");
+ for (set<hobject_t>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("pull_info");
+ assert(pulling.count(*j));
+ pulling.find(*j)->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("pushing");
+ for (map<hobject_t, map<int, PushInfo> >::const_iterator i =
+ pushing.begin();
+ i != pushing.end();
+ ++i) {
+ f->open_object_section("object");
+ f->dump_stream("pushing") << i->first;
+ {
+ f->open_array_section("pushing_to");
+ for (map<int, PushInfo>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("push_progress");
+ f->dump_stream("object_pushing") << j->first;
+ {
+ f->open_object_section("push_info");
+ j->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ }
+private:
+ // push
+ struct PushInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+ };
+ map<hobject_t, map<int, PushInfo> > pushing;
+
+ // pull
+ struct PullInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef head_ctx;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+
+ coll_t get_temp_coll(ObjectStore::Transaction *t);
+ void add_temp_obj(const hobject_t &oid) {
+ temp_contents.insert(oid);
+ }
+ void clear_temp_obj(const hobject_t &oid) {
+ temp_contents.erase(oid);
+ }
+
+ map<hobject_t, PullInfo> pulling;
+
+ // Reverse mapping from osd peer to objects beging pulled from that peer
+ map<int, set<hobject_t> > pull_from_peer;
+
+ void sub_op_push(OpRequestRef op);
+ void sub_op_push_reply(OpRequestRef op);
+ void sub_op_pull(OpRequestRef op);
+
+ void _do_push(OpRequestRef op);
+ void _do_pull_response(OpRequestRef op);
+ void do_push(OpRequestRef op) {
+ if (is_primary()) {
+ _do_pull_response(op);
+ } else {
+ _do_push(op);
+ }
+ }
+ void do_pull(OpRequestRef op);
+ void do_push_reply(OpRequestRef op);
+
+ bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
+ void handle_pull(int peer, PullOp &op, PushOp *reply);
+ bool handle_pull_response(
+ int from, PushOp &op, PullOp *response,
+ list<ObjectContextRef> *to_continue,
+ ObjectStore::Transaction *t);
+ void handle_push(int from, PushOp &op, PushReplyOp *response,
+ ObjectStore::Transaction *t);
+
+ static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable);
+ void _failed_push(int from, const hobject_t &soid);
+
+ void send_pushes(int prio, map<int, vector<PushOp> > &pushes);
+ void prep_push_op_blank(const hobject_t& soid, PushOp *op);
+ int send_push_op_legacy(int priority, int peer,
+ PushOp &pop);
+ int send_pull_legacy(int priority, int peer,
+ const ObjectRecoveryInfo& recovery_info,
+ ObjectRecoveryProgress progress);
+ void send_pulls(
+ int priority,
+ map<int, vector<PullOp> > &pulls);
+
+ int build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat = 0);
+ void submit_push_data(ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ map<string, bufferptr> &attrs,
+ map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t);
+ void submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t);
+
+ void calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+ void prepare_pull(
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h);
+ int start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obj,
+ RPGHandle *h);
+ void prep_push_to_replica(
+ ObjectContextRef obc, const hobject_t& soid, int peer,
+ PushOp *pop);
+ void prep_push(ObjectContextRef obc,
+ const hobject_t& oid, int dest,
+ PushOp *op);
+ void prep_push(ObjectContextRef obc,
+ const hobject_t& soid, int peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *op);
+ void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+ ObjectRecoveryInfo recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc
+ );
+};
+
+#endif