diff options
author | Sage Weil <sage@inktank.com> | 2013-10-22 13:45:42 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-10-22 13:45:42 -0700 |
commit | 54e111c672e4f9a4e785268651099a08bf88e2d4 (patch) | |
tree | 5d2665c61452b213e9aa3800d1a896a68f2c184a | |
parent | 5a2142ee433c8a1dcda857863be78da6eb96a56a (diff) | |
parent | 001833c7743b6ead24de9e0d218deacb55be2978 (diff) | |
download | ceph-54e111c672e4f9a4e785268651099a08bf88e2d4.tar.gz |
Merge remote-tracking branch 'gh/wip-promote-copies' into wip-tier
-rwxr-xr-x | qa/workunits/rados/caching_redirects.sh | 60 | ||||
-rw-r--r-- | src/common/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/rados.h | 3 | ||||
-rw-r--r-- | src/os/ObjectStore.h | 10 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 307 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 165 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 91 | ||||
-rw-r--r-- | src/osd/osd_types.h | 37 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 44 | ||||
-rw-r--r-- | src/test/encoding/types.h | 1 |
10 files changed, 529 insertions, 190 deletions
diff --git a/qa/workunits/rados/caching_redirects.sh b/qa/workunits/rados/caching_redirects.sh index 19b940b5b4c..b6007bf0cab 100755 --- a/qa/workunits/rados/caching_redirects.sh +++ b/qa/workunits/rados/caching_redirects.sh @@ -11,22 +11,22 @@ expect_false() #create pools, set up tier relationship ceph osd pool create base_pool 2 -ceph osd pool create partial_cache 2 -ceph osd pool create data_cache 2 -ceph osd tier add base_pool partial_cache -ceph osd tier add base_pool data_cache +ceph osd pool create partial_wrong 2 +ceph osd pool create wrong_cache 2 +ceph osd tier add base_pool partial_wrong +ceph osd tier add base_pool wrong_cache -# populate base_pool and data_cache with some data +# populate base_pool with some data echo "foo" > foo.txt echo "bar" > bar.txt echo "baz" > baz.txt rados -p base_pool put fooobj foo.txt rados -p base_pool put barobj bar.txt -# data_cache is backwards so we can tell we read from it -rados -p data_cache put fooobj bar.txt -rados -p data_cache put barobj foo.txt -# partial_cache gets barobj backwards -rados -p partial_cache put barobj foo.txt +# fill in wrong_cache backwards so we can tell we read from it +rados -p wrong_cache put fooobj bar.txt +rados -p wrong_cache put barobj foo.txt +# partial_wrong gets barobj backwards so we can check promote and non-promote +rados -p partial_wrong put barobj foo.txt # get the objects back before setting a caching pool rados -p base_pool get fooobj tmp.txt @@ -34,22 +34,25 @@ diff -q tmp.txt foo.txt rados -p base_pool get barobj tmp.txt diff -q tmp.txt bar.txt -# set up redirect and make sure we get redirect-based results -ceph osd tier set-overlay base_pool partial_cache -ceph osd tier cache-mode partial_cache writeback +# set up redirect and make sure we get backwards results +ceph osd tier set-overlay base_pool wrong_cache +ceph osd tier cache-mode wrong_cache writeback rados -p base_pool get fooobj tmp.txt -diff -q tmp.txt foo.txt +diff -q tmp.txt bar.txt rados -p base_pool get barobj tmp.txt diff -q tmp.txt foo.txt -# switch cache pools and make sure contents differ +# switch cache pools and make sure we're doing promote ceph osd tier remove-overlay base_pool -ceph osd tier set-overlay base_pool data_cache -ceph osd tier cache-mode data_cache writeback +ceph osd tier set-overlay base_pool partial_wrong +ceph osd tier cache-mode partial_wrong writeback rados -p base_pool get fooobj tmp.txt -diff -q tmp.txt bar.txt +diff -q tmp.txt foo.txt # hurray, it promoted! rados -p base_pool get barobj tmp.txt -diff -q tmp.txt foo.txt +diff -q tmp.txt foo.txt # yep, we read partial_wrong's local object! + +# try a nonexistent object and make sure we get an error +expect_false rados -p base_pool get bazobj tmp.txt # drop the cache entirely and make sure contents are still the same ceph osd tier remove-overlay base_pool @@ -57,3 +60,22 @@ rados -p base_pool get fooobj tmp.txt diff -q tmp.txt foo.txt rados -p base_pool get barobj tmp.txt diff -q tmp.txt bar.txt + +# create an empty cache pool and make sure it has objects after reading +ceph osd pool create empty_cache 2 + +touch empty.txt +rados -p empty_cache ls > tmp.txt +diff -q tmp.txt empty.txt + +ceph osd tier add base_pool empty_cache +ceph osd tier set-overlay base_pool empty_cache +ceph osd tier cache-mode empty_cache writeback +rados -p base_pool get fooobj tmp.txt +rados -p base_pool get barobj tmp.txt +expect_false rados -p base_pool get bazobj tmp.txt + +rados -p empty_cache ls > tmp.txt +expect_false diff -q tmp.txt empty.txt +rados -p base_pool ls > tmp2.txt +diff -q tmp.txt tmp2.txt
\ No newline at end of file diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc index 2749fbb0a3b..a693953711f 100644 --- a/src/common/ceph_strings.cc +++ b/src/common/ceph_strings.cc @@ -48,6 +48,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPPUT: return "tmapput"; case CEPH_OSD_OP_WATCH: return "watch"; + case CEPH_OSD_OP_COPY_GET_CLASSIC: return "copy-get-classic"; case CEPH_OSD_OP_COPY_GET: return "copy-get"; case CEPH_OSD_OP_COPY_FROM: return "copy-from"; case CEPH_OSD_OP_UNDIRTY: return "undirty"; diff --git a/src/include/rados.h b/src/include/rados.h index 70c04d4167d..bf4f5b5fce4 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -218,9 +218,10 @@ enum { CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, CEPH_OSD_OP_COPY_FROM = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 26, - CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27, + CEPH_OSD_OP_COPY_GET_CLASSIC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27, CEPH_OSD_OP_UNDIRTY = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 28, CEPH_OSD_OP_ISDIRTY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 29, + CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 30, /** multi **/ CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 07473b344f5..69f3cb1fba7 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -905,6 +905,16 @@ public: return r; } virtual int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset, bool user_only = false) {return 0;}; + int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferlist>& aset, bool user_only = false) { + map<string,bufferptr> bmap; + int r = getattrs(cid, oid, bmap, user_only); + for (map<string,bufferptr>::iterator i = bmap.begin(); + i != bmap.end(); + ++i) { + aset[i->first].append(i->second); + } + return r; + } // collections diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3995ae0f320..753b66f336f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1206,15 +1206,19 @@ void ReplicatedPG::do_op(OpRequestRef op) bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r) { + if (obc.get() && obc->is_blocked()) { + // we're already doing something with this object + return false; + } switch(pool.info.cache_mode) { case pg_pool_t::CACHEMODE_NONE: return false; break; case pg_pool_t::CACHEMODE_WRITEBACK: - if (obc.get()) { + if (obc.get() && obc->obs.exists) { // we have the object already return false; - } else { - do_cache_redirect(op, obc); + } else { // try and promote! + promote_object(op, obc); return true; } break; @@ -1222,12 +1226,17 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, do_cache_redirect(op, obc); return true; break; - case pg_pool_t::CACHEMODE_READONLY: - if (obc.get() && !r) { + case pg_pool_t::CACHEMODE_READONLY: // TODO: clean this case up + if (!obc.get() && r == -ENOENT) { // we don't have the object and op's a read + promote_object(op, obc); + return true; + } else if (obc.get() && obc->obs.exists) { // we have the object locally return false; - } else { + } else if (!r) { // it must be a write do_cache_redirect(op, obc); return true; + } else { // crap, there was a failure of some kind + return false; } break; default: @@ -1250,6 +1259,31 @@ void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc) return; } +void ReplicatedPG::promote_object(OpRequestRef op, ObjectContextRef obc) +{ + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); + if (!obc.get()) { // we need to create an ObjectContext + int r = find_object_context( + hobject_t(m->get_oid(), + m->get_object_locator().key, + m->get_snapid(), + m->get_pg().ps(), + m->get_object_locator().get_pool(), + m->get_object_locator().nspace), + &obc, true, NULL); + assert(r == 0); // a lookup that allows creates can't fail now + } + + hobject_t temp_target = generate_temp_object(); + PromoteCallback *cb = new PromoteCallback(op, obc, temp_target, this); + object_locator_t oloc(m->get_object_locator()); + oloc.pool = pool.info.tier_of; + start_copy(cb, obc, obc->obs.oi.soid, oloc, 0, temp_target); + + assert(obc->is_blocked()); + wait_for_blocked_object(obc->obs.oi.soid, op); +} + void ReplicatedPG::execute_ctx(OpContext *ctx) { dout(10) << __func__ << " " << ctx << dendl; @@ -3647,84 +3681,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } break; - case CEPH_OSD_OP_COPY_GET: + case CEPH_OSD_OP_COPY_GET_CLASSIC: ++ctx->num_read; - { - object_copy_cursor_t cursor; - uint64_t out_max; - try { - ::decode(cursor, bp); - ::decode(out_max, bp); - } - catch (buffer::error& e) { - result = -EINVAL; - goto fail; - } - - // size, mtime - ::encode(oi.size, osd_op.outdata); - ::encode(oi.mtime, osd_op.outdata); - - // attrs - map<string,bufferptr> out_attrs; - if (!cursor.attr_complete) { - result = osd->store->getattrs(coll, soid, out_attrs, true); - if (result < 0) - break; - cursor.attr_complete = true; - dout(20) << " got attrs" << dendl; - } - ::encode(out_attrs, osd_op.outdata); - - int64_t left = out_max - osd_op.outdata.length(); - - // data - bufferlist bl; - if (left > 0 && !cursor.data_complete) { - if (cursor.data_offset < oi.size) { - result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl); - if (result < 0) - return result; - assert(result <= left); - left -= result; - cursor.data_offset += result; - } - if (cursor.data_offset == oi.size) { - cursor.data_complete = true; - dout(20) << " got data" << dendl; - } - } - ::encode(bl, osd_op.outdata); + result = fill_in_copy_get(bp, osd_op, oi, true); + if (result == -EINVAL) + goto fail; + break; - // omap - std::map<std::string,bufferlist> out_omap; - if (left > 0 && !cursor.omap_complete) { - ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid); - assert(iter); - if (iter->valid()) { - iter->upper_bound(cursor.omap_offset); - for (; left > 0 && iter->valid(); iter->next()) { - out_omap.insert(make_pair(iter->key(), iter->value())); - left -= iter->key().length() + 4 + iter->value().length() + 4; - } - } - if (iter->valid()) { - cursor.omap_offset = iter->key(); - } else { - cursor.omap_complete = true; - dout(20) << " got omap" << dendl; - } - } - ::encode(out_omap, osd_op.outdata); - - dout(20) << " cursor.is_complete=" << cursor.is_complete() - << " " << out_attrs.size() << " attrs" - << " " << bl.length() << " bytes" - << " " << out_omap.size() << " keys" - << dendl; - ::encode(cursor, osd_op.outdata); - result = 0; - } + case CEPH_OSD_OP_COPY_GET: + ++ctx->num_read; + result = fill_in_copy_get(bp, osd_op, oi, false); + if (result == -EINVAL) + goto fail; break; case CEPH_OSD_OP_COPY_FROM: @@ -3757,15 +3725,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) hobject_t temp_target = generate_temp_object(); CopyFromCallback *cb = new CopyFromCallback(ctx, temp_target); ctx->copy_cb = cb; - result = start_copy(cb, ctx->obc, src, src_oloc, src_version, + start_copy(cb, ctx->obc, src, src_oloc, src_version, temp_target); - if (result < 0) - goto fail; result = -EINPROGRESS; } else { // finish assert(ctx->copy_cb->get_result() >= 0); - result = finish_copyfrom(ctx); + finish_copyfrom(ctx); + result = 0; } } break; @@ -4362,7 +4329,93 @@ struct C_Copyfrom : public Context { } }; -int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc, +int ReplicatedPG::fill_in_copy_get(bufferlist::iterator& bp, OSDOp& osd_op, + object_info_t& oi, bool classic) +{ + hobject_t& soid = oi.soid; + int result = 0; + object_copy_cursor_t cursor; + uint64_t out_max; + try { + ::decode(cursor, bp); + ::decode(out_max, bp); + } + catch (buffer::error& e) { + result = -EINVAL; + return result; + } + + object_copy_data_t reply_obj; + // size, mtime + reply_obj.size = oi.size; + reply_obj.mtime = oi.mtime; + reply_obj.category = oi.category; + + // attrs + map<string,bufferlist>& out_attrs = reply_obj.attrs; + if (!cursor.attr_complete) { + result = osd->store->getattrs(coll, soid, out_attrs, true); + if (result < 0) + return result; + cursor.attr_complete = true; + dout(20) << " got attrs" << dendl; + } + + int64_t left = out_max - osd_op.outdata.length(); + + // data + bufferlist& bl = reply_obj.data; + if (left > 0 && !cursor.data_complete) { + if (cursor.data_offset < oi.size) { + result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl); + if (result < 0) + return result; + assert(result <= left); + left -= result; + cursor.data_offset += result; + } + if (cursor.data_offset == oi.size) { + cursor.data_complete = true; + dout(20) << " got data" << dendl; + } + } + + // omap + std::map<std::string,bufferlist>& out_omap = reply_obj.omap; + if (left > 0 && !cursor.omap_complete) { + ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid); + assert(iter); + if (iter->valid()) { + iter->upper_bound(cursor.omap_offset); + for (; left > 0 && iter->valid(); iter->next()) { + out_omap.insert(make_pair(iter->key(), iter->value())); + left -= iter->key().length() + 4 + iter->value().length() + 4; + } + } + if (iter->valid()) { + cursor.omap_offset = iter->key(); + } else { + cursor.omap_complete = true; + dout(20) << " got omap" << dendl; + } + } + + dout(20) << " cursor.is_complete=" << cursor.is_complete() + << " " << out_attrs.size() << " attrs" + << " " << bl.length() << " bytes" + << " " << out_omap.size() << " keys" + << dendl; + reply_obj.cursor = cursor; + if (classic) { + reply_obj.encode_classic(osd_op.outdata); + } else { + ::encode(reply_obj, osd_op.outdata); + } + result = 0; + return result; +} + +void ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src, object_locator_t oloc, version_t version, const hobject_t& temp_dest_oid) { @@ -4384,24 +4437,23 @@ int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc, ++obc->copyfrom_readside; _copy_some(obc, cop); - - return 0; } void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) { dout(10) << __func__ << " " << obc << " " << cop << dendl; ObjectOperation op; - if (cop->version) { - op.assert_version(cop->version); + if (cop->results->user_version) { + op.assert_version(cop->results->user_version); } else { // we should learn the version after the first chunk, if we didn't know // it already! assert(cop->cursor.is_initial()); } op.copy_get(&cop->cursor, cct->_conf->osd_copyfrom_max_chunk, - &cop->size, &cop->mtime, &cop->attrs, - &cop->data, &cop->omap, + &cop->results->object_size, &cop->results->mtime, + &cop->results->category, + &cop->attrs, &cop->data, &cop->omap, &cop->rval); C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid, @@ -4412,7 +4464,7 @@ void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) new C_OnFinisher(fin, &osd->objecter_finisher), // discover the object version if we don't know it yet - cop->version ? NULL : &cop->version); + cop->results->user_version ? NULL : &cop->results->user_version); fin->tid = tid; cop->objecter_tid = tid; osd->objecter_lock.Unlock(); @@ -4435,7 +4487,6 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) ObjectContextRef obc = cop->obc; cop->objecter_tid = 0; - CopyResults results; if (r >= 0) { assert(cop->rval >= 0); @@ -4454,12 +4505,11 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) _copy_some(obc, cop); return; } - _build_finish_copy_transaction(cop, results.get<3>()); - results.get<1>() = cop->temp_cursor.data_offset; + _build_finish_copy_transaction(cop, cop->results->final_tx); } dout(20) << __func__ << " complete; committing" << dendl; - results.get<0>() = r; + CopyCallbackResults results(r, cop->results); cop->cb->complete(results); copy_ops.erase(obc->obs.oi.soid); @@ -4513,7 +4563,7 @@ void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop, } } -int ReplicatedPG::finish_copyfrom(OpContext *ctx) +void ReplicatedPG::finish_copyfrom(OpContext *ctx) { dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl; ObjectState& obs = ctx->new_obs; @@ -4526,8 +4576,8 @@ int ReplicatedPG::finish_copyfrom(OpContext *ctx) if (cb->is_temp_obj_used()) { ctx->discard_temp_oid = cb->temp_obj; } - ctx->op_t.swap(cb->results.get<3>()); - ctx->op_t.append(cb->results.get<3>()); + ctx->op_t.swap(cb->results->final_tx); + ctx->op_t.append(cb->results->final_tx); interval_set<uint64_t> ch; if (obs.oi.size > 0) @@ -4541,15 +4591,53 @@ int ReplicatedPG::finish_copyfrom(OpContext *ctx) } ctx->delta_stats.num_wr++; ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10); +} - return 0; +void ReplicatedPG::finish_promote(CopyResults *results, ObjectContextRef obc, + hobject_t& temp_obj) +{ + vector<OSDOp> ops; + tid_t rep_tid = osd->get_tid(); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); + OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); + tctx->mtime = ceph_clock_now(g_ceph_context); + tctx->op_t.swap(results->final_tx); + if (results->started_temp_obj) { + tctx->discard_temp_oid = temp_obj; + } + + RepGather *repop = new_repop(tctx, obc, rep_tid); + C_KickBlockedObject *blockedcb = new C_KickBlockedObject(obc, this); + repop->ondone = blockedcb; + object_stat_sum_t delta; + ++delta.num_objects; + obc->obs.exists = true; + delta.num_bytes += results->object_size; + obc->obs.oi.category = results->category; + info.stats.stats.add(delta, obc->obs.oi.category); + tctx->at_version.epoch = get_osdmap()->get_epoch(); + tctx->at_version.version = pg_log.get_head().version + 1; + tctx->user_at_version = results->user_version; + + tctx->log.push_back(pg_log_entry_t( + pg_log_entry_t::MODIFY, + obc->obs.oi.soid, + tctx->at_version, + tctx->obs->oi.version, + tctx->user_at_version, + osd_reqid_t(), + repop->ctx->mtime)); + append_log(tctx->log, eversion_t(), tctx->local_t); + issue_repop(repop, repop->ctx->mtime); + eval_repop(repop); + repop->put(); } void ReplicatedPG::cancel_copy(CopyOpRef cop) { dout(10) << __func__ << " " << cop->obc->obs.oi.soid - << " from " << cop->src << " " << cop->oloc << " v" << cop->version - << dendl; + << " from " << cop->src << " " << cop->oloc + << " v" << cop->results->user_version << dendl; // cancel objecter op, if we can if (cop->objecter_tid) { @@ -4561,8 +4649,7 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop) --cop->obc->copyfrom_readside; kick_object_context_blocked(cop->obc); - bool temp_obj_created = !cop->cursor.is_initial(); - CopyResults result(-ECANCELED, 0, temp_obj_created, ObjectStore::Transaction()); + CopyCallbackResults result(-ECANCELED, cop->results); cop->cb->complete(result); } @@ -4743,14 +4830,14 @@ void ReplicatedPG::eval_repop(RepGather *repop) if (m) dout(10) << "eval_repop " << *repop << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"") - << (repop->done ? " DONE" : "") + << (repop->done() ? " DONE" : "") << dendl; else dout(10) << "eval_repop " << *repop << " (no op)" - << (repop->done ? " DONE" : "") + << (repop->done() ? " DONE" : "") << dendl; - if (repop->done) + if (repop->done()) return; // apply? @@ -4855,7 +4942,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) // done. if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty() && repop->applied) { - repop->done = true; + repop->mark_done(); calc_min_last_complete_ondisk(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index bbe3f56be5b..ebba20361a6 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -97,18 +97,35 @@ public: struct OpContext; class CopyCallback; + /** + * CopyResults stores the object metadata of interest to a copy initiator. + */ + struct CopyResults { + utime_t mtime; ///< the copy source's mtime + size_t object_size; ///< the copied object's size + bool started_temp_obj; ///< true if the callback needs to delete temp object + /** + * Final transaction; if non-empty the callback must execute it before any + * other accesses to the object (in order to complete the copy). + */ + ObjectStore::Transaction final_tx; + string category; ///< The copy source's category + version_t user_version; ///< The copy source's user version + CopyResults() : object_size(0), started_temp_obj(false), + user_version(0) {} + }; + struct CopyOp { CopyCallback *cb; ObjectContextRef obc; hobject_t src; object_locator_t oloc; - version_t version; + + CopyResults *results; tid_t objecter_tid; object_copy_cursor_t cursor; - uint64_t size; - utime_t mtime; map<string,bufferlist> attrs; bufferlist data; map<string,bufferlist> omap; @@ -120,12 +137,15 @@ public: CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l, version_t v, const hobject_t& dest) - : cb(cb_), obc(_obc), src(s), oloc(l), version(v), + : cb(cb_), obc(_obc), src(s), oloc(l), + results(NULL), objecter_tid(0), - size(0), rval(-1), temp_oid(dest) - {} + { + results = new CopyResults(); + results->user_version = v; + } }; typedef boost::shared_ptr<CopyOp> CopyOpRef; @@ -135,29 +155,19 @@ public: * one and give an instance of the class to start_copy. * * The implementer is responsible for making sure that the CopyCallback - * can associate itself with the correct copy operation. The presence - * of the closing Transaction ensures that write operations can be performed - * atomically with the copy being completed (which doing them in separate - * transactions would not allow); if you are doing the copy for a read - * op you will have to generate a separate op to finish the copy with. + * can associate itself with the correct copy operation. */ - /// return code, total object size, data in temp object?, final Transaction - typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults; - class CopyCallback : public GenContext<CopyResults&> { + typedef boost::tuple<int, CopyResults*> CopyCallbackResults; + class CopyCallback : public GenContext<CopyCallbackResults> { protected: CopyCallback() {} /** * results.get<0>() is the return code: 0 for success; -ECANCELLED if * the operation was cancelled by the local OSD; -errno for other issues. - * results.get<1>() is the total size of the object (for updating pg stats) - * results.get<2>() indicates whether we have already written data to - * the temp object (so it needs to get cleaned up, if the return code - * indicates a failure) - * results.get<3>() is a Transaction; if non-empty you need to perform - * its results before any other accesses to the object in order to - * complete the copy. + * results.get<1>() is a pointer to a CopyResults object, which you are + * responsible for deleting. */ - virtual void finish(CopyResults& results_) = 0; + virtual void finish(CopyCallbackResults results_) = 0; public: /// Provide the final size of the copied object to the CopyCallback @@ -166,16 +176,18 @@ public: class CopyFromCallback: public CopyCallback { public: - CopyResults results; + CopyResults *results; + int retval; OpContext *ctx; hobject_t temp_obj; CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) : - ctx(ctx_), temp_obj(temp_obj_) {} + results(NULL), retval(0), ctx(ctx_), temp_obj(temp_obj_) {} ~CopyFromCallback() {} - virtual void finish(CopyResults& results_) { - results = results_; - int r = results.get<0>(); + virtual void finish(CopyCallbackResults results_) { + results = results_.get<1>(); + int r = results_.get<0>(); + retval = r; if (r >= 0) { ctx->pg->execute_ctx(ctx); } @@ -186,14 +198,50 @@ public: } ctx->pg->close_op_ctx(ctx); } + delete results; } - bool is_temp_obj_used() { return results.get<2>(); } - uint64_t get_data_size() { return results.get<1>(); } - int get_result() { return results.get<0>(); } + bool is_temp_obj_used() { return results->started_temp_obj; } + uint64_t get_data_size() { return results->object_size; } + int get_result() { return retval; } }; friend class CopyFromCallback; + class PromoteCallback: public CopyCallback { + OpRequestRef op; + ObjectContextRef obc; + hobject_t temp_obj; + ReplicatedPG *pg; + public: + PromoteCallback(OpRequestRef op_, ObjectContextRef obc_, + const hobject_t& temp_obj_, + ReplicatedPG *pg_) : + op(op_), obc(obc_), temp_obj(temp_obj_), pg(pg_) {} + + virtual void finish(CopyCallbackResults results) { + CopyResults* results_data = results.get<1>(); + int r = results.get<0>(); + if (r >= 0) { + pg->finish_promote(results_data, obc, temp_obj); + } else { + // we need to get rid of the op in the blocked queue + map<hobject_t,list<OpRequestRef> >::iterator blocked_iter; + blocked_iter = pg->waiting_for_blocked_object.find(obc->obs.oi.soid); + assert(blocked_iter != pg->waiting_for_blocked_object.end()); + assert(blocked_iter->second.begin()->get() == op.get()); + blocked_iter->second.pop_front(); + if (blocked_iter->second.empty()) { + pg->waiting_for_blocked_object.erase(blocked_iter); + } + if (r != -ECANCELED) { // on cancel the client will resend + pg->osd->reply_op_error(op, r); + } + } + delete results_data; + } + }; + friend class PromoteCallback; + boost::scoped_ptr<PGBackend> pgbackend; PGBackend *get_pgbackend() { return pgbackend.get(); @@ -410,6 +458,7 @@ public: * State on the PG primary associated with the replicated mutation */ class RepGather { + bool is_done; public: xlist<RepGather*>::item queue_item; int nref; @@ -422,7 +471,7 @@ public: tid_t rep_tid; - bool applying, applied, aborted, done; + bool applying, applied, aborted; set<int> waitfor_ack; //set<int> waitfor_nvram; @@ -431,6 +480,8 @@ public: //bool sent_nvram; bool sent_disk; + Context *ondone; ///< if set, this Context will be activated when repop is done + utime_t start; eversion_t pg_local_last_complete; @@ -440,14 +491,16 @@ public: RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, eversion_t lc) : + is_done(false), queue_item(this), nref(1), ctx(c), obc(pi), rep_tid(rt), - applying(false), applied(false), aborted(false), done(false), + applying(false), applied(false), aborted(false), sent_ack(false), //sent_nvram(false), sent_disk(false), + ondone(NULL), pg_local_last_complete(lc), queue_snap_trimmer(false) { } @@ -465,6 +518,14 @@ public: //generic_dout(0) << "deleting " << this << dendl; } } + void mark_done() { + is_done = true; + if (ondone) + ondone->complete(0); + } + bool done() { + return is_done; + } }; @@ -846,8 +907,19 @@ protected: uint64_t offset, uint64_t length, bool count_bytes); void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st); + /** + * This helper function is called from do_op if the ObjectContext lookup fails. + * @returns true if the caching code is handling the Op, false otherwise. + */ inline bool maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r); + /** + * This helper function tells the client to redirect their request elsewhere. + */ void do_cache_redirect(OpRequestRef op, ObjectContextRef obc); + /** + * This function starts up a copy from + */ + void promote_object(OpRequestRef op, ObjectContextRef obc); int prepare_transaction(OpContext *ctx); @@ -989,7 +1061,19 @@ protected: // -- copyfrom -- map<hobject_t, CopyOpRef> copy_ops; - int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src, + int fill_in_copy_get(bufferlist::iterator& bp, OSDOp& op, + object_info_t& oi, bool classic); + /** + * To copy an object, call start_copy. + * + * @param cb: The CopyCallback to be activated when the copy is complete + * @param obc: The ObjectContext we are copying into + * @param src: The source object + * @param oloc: the source object locator + * @param version: the version of the source object to copy (0 for any) + * @param temp_dest_oid: the temporary object to use for large objects + */ + void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src, object_locator_t oloc, version_t version, const hobject_t& temp_dest_oid); void process_copy_chunk(hobject_t oid, tid_t tid, int r); @@ -997,7 +1081,9 @@ protected: void _copy_some(ObjectContextRef obc, CopyOpRef cop); void _build_finish_copy_transaction(CopyOpRef cop, ObjectStore::Transaction& t); - int finish_copyfrom(OpContext *ctx); + void finish_copyfrom(OpContext *ctx); + void finish_promote(CopyResults *results, ObjectContextRef obc, + hobject_t& temp_obj); void cancel_copy(CopyOpRef cop); void cancel_copy_ops(); @@ -1145,6 +1231,17 @@ public: void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op); void kick_object_context_blocked(ObjectContextRef obc); + struct C_KickBlockedObject : public Context { + ObjectContextRef obc; + ReplicatedPG *pg; + C_KickBlockedObject(ObjectContextRef obc_, ReplicatedPG *pg_) : + obc(obc_), pg(pg_) {} + protected: + void finish(int r) { + pg->kick_object_context_blocked(obc); + } + }; + void mark_all_unfound_lost(int what); eversion_t pick_newest_available(const hobject_t& oid); ObjectContextRef mark_object_lost(ObjectStore::Transaction *t, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index b441d3fc8c7..4b853130ad4 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2595,6 +2595,95 @@ void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>& o.back()->omap_complete = true; } +// -- object_copy_data_t -- + +void object_copy_data_t::encode_classic(bufferlist& bl) const +{ + ::encode(size, bl); + ::encode(mtime, bl); + ::encode(attrs, bl); + ::encode(data, bl); + ::encode(omap, bl); + ::encode(cursor, bl); +} + +void object_copy_data_t::decode_classic(bufferlist::iterator& bl) +{ + ::decode(size, bl); + ::decode(mtime, bl); + ::decode(attrs, bl); + ::decode(data, bl); + ::decode(omap, bl); + ::decode(cursor, bl); +} + +void object_copy_data_t::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(size, bl); + ::encode(mtime, bl); + ::encode(category, bl); + ::encode(attrs, bl); + ::encode(data, bl); + ::encode(omap, bl); + ::encode(cursor, bl); + ENCODE_FINISH(bl); +} + +void object_copy_data_t::decode(bufferlist::iterator& bl) +{ + DECODE_START(1, bl); + ::decode(size, bl); + ::decode(mtime, bl); + ::decode(category, bl); + ::decode(attrs, bl); + ::decode(data, bl); + ::decode(omap, bl); + ::decode(cursor, bl); + DECODE_FINISH(bl); +} + +void object_copy_data_t::generate_test_instances(list<object_copy_data_t*>& o) +{ + o.push_back(new object_copy_data_t()); + + list<object_copy_cursor_t*> cursors; + object_copy_cursor_t::generate_test_instances(cursors); + list<object_copy_cursor_t*>::iterator ci = cursors.begin(); + o.back()->cursor = **(ci++); + + o.push_back(new object_copy_data_t()); + o.back()->cursor = **(ci++); + + o.push_back(new object_copy_data_t()); + o.back()->size = 1234; + o.back()->mtime.set_from_double(1234); + bufferptr bp("there", 5); + bufferlist bl; + bl.push_back(bp); + o.back()->attrs["hello"] = bl; + bufferptr bp2("not", 3); + bufferlist bl2; + bl2.push_back(bp2); + o.back()->omap["why"] = bl2; + bufferptr databp("iamsomedatatocontain", 20); + o.back()->data.push_back(databp); +} + +void object_copy_data_t::dump(Formatter *f) const +{ + f->open_object_section("cursor"); + cursor.dump(f); + f->close_section(); // cursor + f->dump_int("size", size); + f->dump_stream("mtime") << mtime; + /* we should really print out the attrs here, but bufferlist + const-correctness prents that */ + f->dump_int("attrs_size", attrs.size()); + f->dump_int("omap_size", omap.size()); + f->dump_int("data_length", data.length()); +} + // -- pg_create_t -- void pg_create_t::encode(bufferlist &bl) const @@ -3709,7 +3798,7 @@ ostream& operator<<(ostream& out, const OSDOp& op) out << (op.op.watch.flag ? " add":" remove") << " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver; break; - case CEPH_OSD_OP_COPY_GET: + case CEPH_OSD_OP_COPY_GET_CLASSIC: out << " max " << op.op.copy_get.max; case CEPH_OSD_OP_COPY_FROM: out << " ver " << op.op.copy_from.src_version; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 1b15877e747..59c5f175f86 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1934,6 +1934,43 @@ struct object_copy_cursor_t { }; WRITE_CLASS_ENCODER(object_copy_cursor_t) +/** + * object_copy_data_t + * + * Return data from a copy request. The semantics are a litte strange + * right now to accommodate the implicit encoding that was previously used + * in its place. + * + * In particular, the sender unconditionally fills in the cursor (from what + * it receives and sends), the size, and the mtime, but is responsible for + * figuring out whether it should put any data in the in_attrs, data, or + * omap members (corresponding to xattrs, object data, and the omap entries) + * based on external data (the client includes a max amount to return with + * the copy request). The client then looks into the attrs, data, and/or omap + * based on the contents of the cursor. Note the change from in_attrs to attrs -- + * this is the result of some silly interface differences which were + * previously elided because bufferlists and bufferptrs encode on the wire the + * same way. + */ +struct object_copy_data_t { + object_copy_cursor_t cursor; + uint64_t size; + utime_t mtime; + map<string, bufferlist> attrs; + bufferlist data; + map<string, bufferlist> omap; + string category; +public: + object_copy_data_t() : size((uint64_t)-1) {} + + static void generate_test_instances(list<object_copy_data_t*>& o); + void encode_classic(bufferlist& bl) const; + void decode_classic(bufferlist::iterator& bl); + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& bl); + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(object_copy_data_t) /** * pg creation info diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 40331f2afb4..6e85167a001 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -569,6 +569,7 @@ struct ObjectOperation { object_copy_cursor_t *cursor; uint64_t *out_size; utime_t *out_mtime; + string *out_category; std::map<std::string,bufferlist> *out_attrs; bufferlist *out_data; std::map<std::string,bufferlist> *out_omap; @@ -576,43 +577,34 @@ struct ObjectOperation { C_ObjectOperation_copyget(object_copy_cursor_t *c, uint64_t *s, utime_t *m, + string *cat, std::map<std::string,bufferlist> *a, bufferlist *d, std::map<std::string,bufferlist> *o, int *r) : cursor(c), - out_size(s), out_mtime(m), out_attrs(a), - out_data(d), out_omap(o), prval(r) {} + out_size(s), out_mtime(m), out_category(cat), + out_attrs(a), out_data(d), out_omap(o), prval(r) {} void finish(int r) { if (r < 0) return; try { bufferlist::iterator p = bl.begin(); - uint64_t size; - ::decode(size, p); + object_copy_data_t copy_reply; + ::decode(copy_reply, p); if (out_size) - *out_size = size; - utime_t mtime; - ::decode(mtime, p); + *out_size = copy_reply.size; if (out_mtime) - *out_mtime = mtime; - if (out_attrs) { - ::decode_noclear(*out_attrs, p); - } else { - std::map<std::string,bufferlist> t; - ::decode(t, p); - } - bufferlist bl; - ::decode(bl, p); + *out_mtime = copy_reply.mtime; + if (out_category) + *out_category = copy_reply.category; + if (out_attrs) + *out_attrs = copy_reply.attrs; if (out_data) - out_data->claim_append(bl); - if (out_omap) { - ::decode_noclear(*out_omap, p); - } else { - std::map<std::string,bufferlist> t; - ::decode(t, p); - } - ::decode(*cursor, p); + out_data->claim_append(copy_reply.data); + if (out_omap) + *out_omap = copy_reply.omap; + *cursor = copy_reply.cursor; } catch (buffer::error& e) { if (prval) *prval = -EIO; @@ -624,6 +616,7 @@ struct ObjectOperation { uint64_t max, uint64_t *out_size, utime_t *out_mtime, + string *out_category, std::map<std::string,bufferlist> *out_attrs, bufferlist *out_data, std::map<std::string,bufferlist> *out_omap, @@ -635,7 +628,8 @@ struct ObjectOperation { unsigned p = ops.size() - 1; out_rval[p] = prval; C_ObjectOperation_copyget *h = - new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_attrs, out_data, out_omap, prval); + new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_category, + out_attrs, out_data, out_omap, prval); out_bl[p] = &h->bl; out_handler[p] = h; } diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index 33cb7950cb3..1ce2c72a58c 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -64,6 +64,7 @@ TYPE(pg_missing_t::item) TYPE(pg_missing_t) TYPE(pg_ls_response_t) TYPE(object_copy_cursor_t) +TYPE(object_copy_data_t) TYPE(pg_create_t) TYPE(watch_info_t) TYPE(object_info_t) |