summaryrefslogtreecommitdiff
path: root/src/osd/ReplicatedBackend.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedBackend.cc')
-rw-r--r--src/osd/ReplicatedBackend.cc196
1 files changed, 196 insertions, 0 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
new file mode 100644
index 00000000000..b39207e14f8
--- /dev/null
+++ b/src/osd/ReplicatedBackend.cc
@@ -0,0 +1,196 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "ReplicatedBackend.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+ return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
+
+ReplicatedBackend::ReplicatedBackend(
+ PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+ PGBackend(pg), temp_created(false),
+ temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
+ coll(coll), osd(osd), cct(osd->cct) {}
+
+void ReplicatedBackend::run_recovery_op(
+ PGBackend::RecoveryHandle *_h,
+ int priority)
+{
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ send_pushes(priority, h->pushes);
+ send_pulls(priority, h->pulls);
+ delete h;
+}
+
+void ReplicatedBackend::recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *_h
+ )
+{
+ dout(10) << __func__ << ": " << hoid << dendl;
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ if (get_parent()->get_local_missing().is_missing(hoid)) {
+ assert(!obc);
+ // pull
+ prepare_pull(
+ hoid,
+ head,
+ h);
+ return;
+ } else {
+ assert(obc);
+ int started = start_pushes(
+ hoid,
+ obc,
+ h);
+ assert(started > 0);
+ }
+}
+
+void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
+{
+ for(map<int, set<hobject_t> >::iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ) {
+ if (osdmap->is_down(i->first)) {
+ dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
+ << ", osdmap has it marked down" << dendl;
+ for (set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ assert(pulling.count(*j) == 1);
+ get_parent()->cancel_pull(*j);
+ pulling.erase(*j);
+ }
+ pull_from_peer.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+bool ReplicatedBackend::handle_message(
+ OpRequestRef op
+ )
+{
+ dout(10) << __func__ << ": " << op << dendl;
+ switch (op->get_req()->get_type()) {
+ case MSG_OSD_PG_PUSH:
+ // TODOXXX: needs to be active possibly
+ do_push(op);
+ return true;
+
+ case MSG_OSD_PG_PULL:
+ do_pull(op);
+ return true;
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ do_push_reply(op);
+ return true;
+
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+ if (m->ops.size() >= 1) {
+ OSDOp *first = &m->ops[0];
+ switch (first->op.op) {
+ case CEPH_OSD_OP_PULL:
+ sub_op_pull(op);
+ return true;
+ case CEPH_OSD_OP_PUSH:
+ // TODOXXX: needs to be active possibly
+ sub_op_push(op);
+ return true;
+ default:
+ break;
+ }
+ }
+ break;
+ }
+
+ case MSG_OSD_SUBOPREPLY: {
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
+ if (r->ops.size() >= 1) {
+ OSDOp &first = r->ops[0];
+ switch (first.op.op) {
+ case CEPH_OSD_OP_PUSH:
+ // continue peer recovery
+ sub_op_push_reply(op);
+ return true;
+ }
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+ return false;
+}
+
+void ReplicatedBackend::clear_state()
+{
+ // clear pushing/pulling maps
+ pushing.clear();
+ pulling.clear();
+ pull_from_peer.clear();
+}
+
+void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+{
+ dout(10) << __func__ << dendl;
+ // clear temp
+ for (set<hobject_t>::iterator i = temp_contents.begin();
+ i != temp_contents.end();
+ ++i) {
+ dout(10) << __func__ << ": Removing oid "
+ << *i << " from the temp collection" << dendl;
+ t->remove(get_temp_coll(t), *i);
+ }
+ temp_contents.clear();
+ clear_state();
+}
+
+coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+{
+ if (temp_created)
+ return temp_coll;
+ if (!osd->store->collection_exists(temp_coll))
+ t->create_collection(temp_coll);
+ temp_created = true;
+ return temp_coll;
+}
+
+void ReplicatedBackend::on_flushed()
+{
+ if (have_temp_coll() &&
+ !osd->store->collection_empty(get_temp_coll())) {
+ vector<hobject_t> objects;
+ osd->store->collection_list(get_temp_coll(), objects);
+ derr << __func__ << ": found objects in the temp collection: "
+ << objects << ", crashing now"
+ << dendl;
+ assert(0 == "found garbage in the temp collection");
+ }
+}