diff options
-rw-r--r-- | src/osd/ReplicatedPG.cc | 151 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 93 |
2 files changed, 167 insertions, 77 deletions
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 9df0495271b..d02a9c9cc48 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3692,7 +3692,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) result = -EINVAL; goto fail; } - if (!ctx->copy_op) { + if (!ctx->copy_cb) { // start pg_t raw_pg; get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg); @@ -3704,13 +3704,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) result = -EINVAL; break; } - result = start_copy(ctx, src, src_oloc, src_version); + hobject_t temp_target = generate_temp_object(); + CopyFromCallback *cb = new CopyFromCallback(ctx, temp_target); + ctx->copy_cb = cb; + result = start_copy(cb, ctx->obc, src, src_oloc, src_version, + temp_target); if (result < 0) goto fail; result = -EINPROGRESS; } else { // finish - result = finish_copy(ctx); + assert(ctx->copy_cb->get_result() >= 0); + result = finish_copyfrom(ctx); } } break; @@ -4307,11 +4312,12 @@ struct C_Copyfrom : public Context { } }; -int ReplicatedPG::start_copy(OpContext *ctx, - hobject_t src, object_locator_t oloc, version_t version) +int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc, + hobject_t src, object_locator_t oloc, version_t version, + const hobject_t& temp_dest_oid) { - const hobject_t& dest = ctx->obs->oi.soid; - dout(10) << __func__ << " " << dest << " ctx " << ctx + const hobject_t& dest = obc->obs.oi.soid; + dout(10) << __func__ << " " << dest << " from " << src << " " << oloc << " v" << version << dendl; @@ -4323,19 +4329,18 @@ int ReplicatedPG::start_copy(OpContext *ctx, cancel_copy(cop); } - CopyOpRef cop(new CopyOp(ctx, src, oloc, version)); + CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, temp_dest_oid)); copy_ops[dest] = cop; - ctx->copy_op = cop; - ++ctx->obc->copyfrom_readside; + ++obc->copyfrom_readside; - _copy_some(ctx, cop); + _copy_some(obc, cop); return 0; } -void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) +void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) { - dout(10) << __func__ << " " << ctx << " " << cop << dendl; + dout(10) << __func__ << " " << obc << " " << cop << dendl; ObjectOperation op; if (cop->version) { op.assert_version(cop->version); @@ -4349,7 +4354,7 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) &cop->data, &cop->omap, &cop->rval); - C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid, + C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid, get_last_peering_reset()); osd->objecter_lock.Lock(); tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op, @@ -4377,50 +4382,49 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) << " tid " << cop->objecter_tid << dendl; return; } - OpContext *ctx = cop->ctx; + ObjectContextRef obc = cop->obc; cop->objecter_tid = 0; - if (r < 0) { - copy_ops.erase(ctx->obc->obs.oi.soid); - --ctx->obc->copyfrom_readside; - kick_object_context_blocked(ctx->obc); - reply_ctx(ctx, r); - return; - } - assert(cop->rval >= 0); - if (!cop->cursor.is_complete()) { - // write out what we have so far - vector<OSDOp> ops; - tid_t rep_tid = osd->get_tid(); - osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); - OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this); - tctx->mtime = ceph_clock_now(g_ceph_context); - RepGather *repop = new_repop(tctx, ctx->obc, rep_tid); - - if (cop->temp_cursor.is_initial()) { - cop->temp_coll = get_temp_coll(&tctx->local_t); - cop->temp_oid = generate_temp_object(); - repop->ctx->new_temp_oid = cop->temp_oid; - } + CopyResults results; + if (r >= 0) { + assert(cop->rval >= 0); + + if (!cop->cursor.is_complete()) { + // write out what we have so far + vector<OSDOp> ops; + tid_t rep_tid = osd->get_tid(); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); + OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); + tctx->mtime = ceph_clock_now(g_ceph_context); + RepGather *repop = new_repop(tctx, obc, rep_tid); + + if (cop->temp_cursor.is_initial()) { + cop->temp_coll = get_temp_coll(&tctx->local_t); + repop->ctx->new_temp_oid = cop->temp_oid; + } - _write_copy_chunk(cop, &tctx->op_t); + _write_copy_chunk(cop, &tctx->op_t); - issue_repop(repop, repop->ctx->mtime); - eval_repop(repop); - repop->put(); + issue_repop(repop, repop->ctx->mtime); + eval_repop(repop); + repop->put(); - dout(10) << __func__ << " fetching more" << dendl; - _copy_some(ctx, cop); - return; + dout(10) << __func__ << " fetching more" << dendl; + _copy_some(obc, cop); + return; + } else { + _build_finish_copy_transaction(cop, results.get<3>()); + results.get<1>() = cop->temp_cursor.data_offset; + } } dout(20) << __func__ << " complete; committing" << dendl; - execute_ctx(ctx); + results.get<0>() = cop->rval; + cop->cb->complete(results); - copy_ops.erase(ctx->obc->obs.oi.soid); - --ctx->obc->copyfrom_readside; - ctx->copy_op.reset(); - kick_object_context_blocked(ctx->obc); + copy_ops.erase(obc->obs.oi.soid); + --obc->copyfrom_readside; + kick_object_context_blocked(obc); } void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) @@ -4447,16 +4451,12 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) cop->temp_cursor = cop->cursor; } -int ReplicatedPG::finish_copy(OpContext *ctx) +void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop, + ObjectStore::Transaction& t) { - CopyOpRef cop = ctx->copy_op; - ObjectState& obs = ctx->new_obs; - ObjectStore::Transaction& t = ctx->op_t; + ObjectState& obs = cop->obc->obs; - if (!obs.exists) { - ctx->delta_stats.num_objects++; - obs.exists = true; - } else { + if (obs.exists) { t.remove(coll, obs.oi.soid); } @@ -4470,18 +4470,34 @@ int ReplicatedPG::finish_copy(OpContext *ctx) _write_copy_chunk(cop, &t); t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid); pgbackend->clear_temp_obj(cop->temp_oid); - ctx->discard_temp_oid = cop->temp_oid; } +} + +int ReplicatedPG::finish_copyfrom(OpContext *ctx) +{ + dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl; + ObjectState& obs = ctx->new_obs; + CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb); + + if (!ctx->obs->exists) { + ctx->delta_stats.num_objects++; + obs.exists = true; + } + if (cb->is_temp_obj_used()) { + ctx->discard_temp_oid = cb->temp_obj; + } + ctx->op_t.swap(cb->results.get<3>()); + ctx->op_t.append(cb->results.get<3>()); interval_set<uint64_t> ch; if (obs.oi.size > 0) ch.insert(0, obs.oi.size); ctx->modified_ranges.union_of(ch); - if (cop->cursor.data_offset != obs.oi.size) { + if (cb->get_data_size() != obs.oi.size) { ctx->delta_stats.num_bytes -= obs.oi.size; + obs.oi.size = cb->get_data_size(); ctx->delta_stats.num_bytes += obs.oi.size; - obs.oi.size = cop->cursor.data_offset; } ctx->delta_stats.num_wr++; ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10); @@ -4491,8 +4507,7 @@ int ReplicatedPG::finish_copy(OpContext *ctx) void ReplicatedPG::cancel_copy(CopyOpRef cop) { - OpContext *ctx = cop->ctx; - dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx + dout(10) << __func__ << " " << cop->obc->obs.oi.soid << " from " << cop->src << " " << cop->oloc << " v" << cop->version << dendl; @@ -4502,13 +4517,13 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop) osd->objecter->op_cancel(cop->objecter_tid); } - copy_ops.erase(ctx->obc->obs.oi.soid); - --ctx->obc->copyfrom_readside; - ctx->copy_op.reset(); - - kick_object_context_blocked(ctx->obc); + copy_ops.erase(cop->obc->obs.oi.soid); + --cop->obc->copyfrom_readside; - delete ctx; + kick_object_context_blocked(cop->obc); + bool temp_obj_created = !cop->cursor.is_initial(); + CopyResults result(-ECANCELED, 0, temp_obj_created, ObjectStore::Transaction()); + cop->cb->complete(result); } void ReplicatedPG::cancel_copy_ops() diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5abfc4cea56..c277c0d3f86 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -18,6 +18,7 @@ #define CEPH_REPLICATEDPG_H #include <boost/optional.hpp> +#include <boost/tuple/tuple.hpp> #include "include/assert.h" #include "common/cmdparse.h" @@ -93,9 +94,11 @@ public: * state associated with a copy operation */ struct OpContext; + class CopyCallback; struct CopyOp { - OpContext *ctx; + CopyCallback *cb; + ObjectContextRef obc; hobject_t src; object_locator_t oloc; version_t version; @@ -114,15 +117,82 @@ public: hobject_t temp_oid; object_copy_cursor_t temp_cursor; - CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) - : ctx(c), src(s), oloc(l), version(v), + CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l, + version_t v, const hobject_t& dest) + : cb(cb_), obc(_obc), src(s), oloc(l), version(v), objecter_tid(0), size(0), - rval(-1) + rval(-1), + temp_oid(dest) {} }; typedef boost::shared_ptr<CopyOp> CopyOpRef; + /** + * The CopyCallback class defines an interface for completions to the + * copy_start code. Users of the copy infrastructure must implement + * one and give an instance of the class to start_copy. + * + * The implementer is responsible for making sure that the CopyCallback + * can associate itself with the correct copy operation. The presence + * of the closing Transaction ensures that write operations can be performed + * atomically with the copy being completed (which doing them in separate + * transactions would not allow); if you are doing the copy for a read + * op you will have to generate a separate op to finish the copy with. + */ + /// return code, total object size, data in temp object?, final Transaction + typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults; + class CopyCallback : public GenContext<CopyResults&> { + protected: + CopyCallback() {} + /** + * results.get<0>() is the return code: 0 for success; -ECANCELLED if + * the operation was cancelled by the local OSD; -errno for other issues. + * results.get<1>() is the total size of the object (for updating pg stats) + * results.get<2>() indicates whether we have already written data to + * the temp object (so it needs to get cleaned up, if the return code + * indicates a failure) + * results.get<3>() is a Transaction; if non-empty you need to perform + * its results before any other accesses to the object in order to + * complete the copy. + */ + virtual void finish(CopyResults& results_) = 0; + + public: + /// Provide the final size of the copied object to the CopyCallback + virtual ~CopyCallback() {}; + }; + + class CopyFromCallback: public CopyCallback { + public: + CopyResults results; + OpContext *ctx; + hobject_t temp_obj; + CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) : + ctx(ctx_), temp_obj(temp_obj_) {} + ~CopyFromCallback() {} + + virtual void finish(CopyResults& results_) { + results = results_; + int r = results.get<0>(); + if (r >= 0) { + ctx->pg->execute_ctx(ctx); + } + ctx->copy_cb = NULL; + if (r < 0) { + if (r != -ECANCELED) { // on cancel just toss it out; client resends + ctx->pg->osd->reply_op_error(ctx->op, r); + } + delete ctx; + } + } + + bool is_temp_obj_used() { return results.get<2>(); } + uint64_t get_data_size() { return results.get<1>(); } + int get_result() { return results.get<0>(); } + }; + friend class CopyFromCallback; + boost::scoped_ptr<PGBackend> pgbackend; PGBackend *get_pgbackend() { return pgbackend.get(); @@ -300,7 +370,7 @@ public: int num_read; ///< count read ops int num_write; ///< count update ops - CopyOpRef copy_op; + CopyFromCallback *copy_cb; hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking @@ -317,7 +387,8 @@ public: current_osd_subop_num(0), data_off(0), reply(NULL), pg(_pg), num_read(0), - num_write(0) { + num_write(0), + copy_cb(NULL) { if (_ssc) { new_snapset = _ssc->snapset; snapset = &_ssc->snapset; @@ -737,11 +808,15 @@ protected: // -- copyfrom -- map<hobject_t, CopyOpRef> copy_ops; - int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version); + int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src, + object_locator_t oloc, version_t version, + const hobject_t& temp_dest_oid); void process_copy_chunk(hobject_t oid, tid_t tid, int r); void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t); - void _copy_some(OpContext *ctx, CopyOpRef cop); - int finish_copy(OpContext *ctx); + void _copy_some(ObjectContextRef obc, CopyOpRef cop); + void _build_finish_copy_transaction(CopyOpRef cop, + ObjectStore::Transaction& t); + int finish_copyfrom(OpContext *ctx); void cancel_copy(CopyOpRef cop); void cancel_copy_ops(); |