summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-10-22 13:45:42 -0700
committerSage Weil <sage@inktank.com>2013-10-22 13:45:42 -0700
commit54e111c672e4f9a4e785268651099a08bf88e2d4 (patch)
tree5d2665c61452b213e9aa3800d1a896a68f2c184a
parent5a2142ee433c8a1dcda857863be78da6eb96a56a (diff)
parent001833c7743b6ead24de9e0d218deacb55be2978 (diff)
downloadceph-54e111c672e4f9a4e785268651099a08bf88e2d4.tar.gz
Merge remote-tracking branch 'gh/wip-promote-copies' into wip-tier
-rwxr-xr-xqa/workunits/rados/caching_redirects.sh60
-rw-r--r--src/common/ceph_strings.cc1
-rw-r--r--src/include/rados.h3
-rw-r--r--src/os/ObjectStore.h10
-rw-r--r--src/osd/ReplicatedPG.cc307
-rw-r--r--src/osd/ReplicatedPG.h165
-rw-r--r--src/osd/osd_types.cc91
-rw-r--r--src/osd/osd_types.h37
-rw-r--r--src/osdc/Objecter.h44
-rw-r--r--src/test/encoding/types.h1
10 files changed, 529 insertions, 190 deletions
diff --git a/qa/workunits/rados/caching_redirects.sh b/qa/workunits/rados/caching_redirects.sh
index 19b940b5b4c..b6007bf0cab 100755
--- a/qa/workunits/rados/caching_redirects.sh
+++ b/qa/workunits/rados/caching_redirects.sh
@@ -11,22 +11,22 @@ expect_false()
#create pools, set up tier relationship
ceph osd pool create base_pool 2
-ceph osd pool create partial_cache 2
-ceph osd pool create data_cache 2
-ceph osd tier add base_pool partial_cache
-ceph osd tier add base_pool data_cache
+ceph osd pool create partial_wrong 2
+ceph osd pool create wrong_cache 2
+ceph osd tier add base_pool partial_wrong
+ceph osd tier add base_pool wrong_cache
-# populate base_pool and data_cache with some data
+# populate base_pool with some data
echo "foo" > foo.txt
echo "bar" > bar.txt
echo "baz" > baz.txt
rados -p base_pool put fooobj foo.txt
rados -p base_pool put barobj bar.txt
-# data_cache is backwards so we can tell we read from it
-rados -p data_cache put fooobj bar.txt
-rados -p data_cache put barobj foo.txt
-# partial_cache gets barobj backwards
-rados -p partial_cache put barobj foo.txt
+# fill in wrong_cache backwards so we can tell we read from it
+rados -p wrong_cache put fooobj bar.txt
+rados -p wrong_cache put barobj foo.txt
+# partial_wrong gets barobj backwards so we can check promote and non-promote
+rados -p partial_wrong put barobj foo.txt
# get the objects back before setting a caching pool
rados -p base_pool get fooobj tmp.txt
@@ -34,22 +34,25 @@ diff -q tmp.txt foo.txt
rados -p base_pool get barobj tmp.txt
diff -q tmp.txt bar.txt
-# set up redirect and make sure we get redirect-based results
-ceph osd tier set-overlay base_pool partial_cache
-ceph osd tier cache-mode partial_cache writeback
+# set up redirect and make sure we get backwards results
+ceph osd tier set-overlay base_pool wrong_cache
+ceph osd tier cache-mode wrong_cache writeback
rados -p base_pool get fooobj tmp.txt
-diff -q tmp.txt foo.txt
+diff -q tmp.txt bar.txt
rados -p base_pool get barobj tmp.txt
diff -q tmp.txt foo.txt
-# switch cache pools and make sure contents differ
+# switch cache pools and make sure we're doing promote
ceph osd tier remove-overlay base_pool
-ceph osd tier set-overlay base_pool data_cache
-ceph osd tier cache-mode data_cache writeback
+ceph osd tier set-overlay base_pool partial_wrong
+ceph osd tier cache-mode partial_wrong writeback
rados -p base_pool get fooobj tmp.txt
-diff -q tmp.txt bar.txt
+diff -q tmp.txt foo.txt # hurray, it promoted!
rados -p base_pool get barobj tmp.txt
-diff -q tmp.txt foo.txt
+diff -q tmp.txt foo.txt # yep, we read partial_wrong's local object!
+
+# try a nonexistent object and make sure we get an error
+expect_false rados -p base_pool get bazobj tmp.txt
# drop the cache entirely and make sure contents are still the same
ceph osd tier remove-overlay base_pool
@@ -57,3 +60,22 @@ rados -p base_pool get fooobj tmp.txt
diff -q tmp.txt foo.txt
rados -p base_pool get barobj tmp.txt
diff -q tmp.txt bar.txt
+
+# create an empty cache pool and make sure it has objects after reading
+ceph osd pool create empty_cache 2
+
+touch empty.txt
+rados -p empty_cache ls > tmp.txt
+diff -q tmp.txt empty.txt
+
+ceph osd tier add base_pool empty_cache
+ceph osd tier set-overlay base_pool empty_cache
+ceph osd tier cache-mode empty_cache writeback
+rados -p base_pool get fooobj tmp.txt
+rados -p base_pool get barobj tmp.txt
+expect_false rados -p base_pool get bazobj tmp.txt
+
+rados -p empty_cache ls > tmp.txt
+expect_false diff -q tmp.txt empty.txt
+rados -p base_pool ls > tmp2.txt
+diff -q tmp.txt tmp2.txt \ No newline at end of file
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc
index 2749fbb0a3b..a693953711f 100644
--- a/src/common/ceph_strings.cc
+++ b/src/common/ceph_strings.cc
@@ -48,6 +48,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_TMAPPUT: return "tmapput";
case CEPH_OSD_OP_WATCH: return "watch";
+ case CEPH_OSD_OP_COPY_GET_CLASSIC: return "copy-get-classic";
case CEPH_OSD_OP_COPY_GET: return "copy-get";
case CEPH_OSD_OP_COPY_FROM: return "copy-from";
case CEPH_OSD_OP_UNDIRTY: return "undirty";
diff --git a/src/include/rados.h b/src/include/rados.h
index 70c04d4167d..bf4f5b5fce4 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -218,9 +218,10 @@ enum {
CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
CEPH_OSD_OP_COPY_FROM = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 26,
- CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27,
+ CEPH_OSD_OP_COPY_GET_CLASSIC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27,
CEPH_OSD_OP_UNDIRTY = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 28,
CEPH_OSD_OP_ISDIRTY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 29,
+ CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 30,
/** multi **/
CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h
index 07473b344f5..69f3cb1fba7 100644
--- a/src/os/ObjectStore.h
+++ b/src/os/ObjectStore.h
@@ -905,6 +905,16 @@ public:
return r;
}
virtual int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset, bool user_only = false) {return 0;};
+ int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferlist>& aset, bool user_only = false) {
+ map<string,bufferptr> bmap;
+ int r = getattrs(cid, oid, bmap, user_only);
+ for (map<string,bufferptr>::iterator i = bmap.begin();
+ i != bmap.end();
+ ++i) {
+ aset[i->first].append(i->second);
+ }
+ return r;
+ }
// collections
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 3995ae0f320..753b66f336f 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1206,15 +1206,19 @@ void ReplicatedPG::do_op(OpRequestRef op)
bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
int r)
{
+ if (obc.get() && obc->is_blocked()) {
+ // we're already doing something with this object
+ return false;
+ }
switch(pool.info.cache_mode) {
case pg_pool_t::CACHEMODE_NONE:
return false;
break;
case pg_pool_t::CACHEMODE_WRITEBACK:
- if (obc.get()) {
+ if (obc.get() && obc->obs.exists) { // we have the object already
return false;
- } else {
- do_cache_redirect(op, obc);
+ } else { // try and promote!
+ promote_object(op, obc);
return true;
}
break;
@@ -1222,12 +1226,17 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
do_cache_redirect(op, obc);
return true;
break;
- case pg_pool_t::CACHEMODE_READONLY:
- if (obc.get() && !r) {
+ case pg_pool_t::CACHEMODE_READONLY: // TODO: clean this case up
+ if (!obc.get() && r == -ENOENT) { // we don't have the object and op's a read
+ promote_object(op, obc);
+ return true;
+ } else if (obc.get() && obc->obs.exists) { // we have the object locally
return false;
- } else {
+ } else if (!r) { // it must be a write
do_cache_redirect(op, obc);
return true;
+ } else { // crap, there was a failure of some kind
+ return false;
}
break;
default:
@@ -1250,6 +1259,31 @@ void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
return;
}
+void ReplicatedPG::promote_object(OpRequestRef op, ObjectContextRef obc)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ if (!obc.get()) { // we need to create an ObjectContext
+ int r = find_object_context(
+ hobject_t(m->get_oid(),
+ m->get_object_locator().key,
+ m->get_snapid(),
+ m->get_pg().ps(),
+ m->get_object_locator().get_pool(),
+ m->get_object_locator().nspace),
+ &obc, true, NULL);
+ assert(r == 0); // a lookup that allows creates can't fail now
+ }
+
+ hobject_t temp_target = generate_temp_object();
+ PromoteCallback *cb = new PromoteCallback(op, obc, temp_target, this);
+ object_locator_t oloc(m->get_object_locator());
+ oloc.pool = pool.info.tier_of;
+ start_copy(cb, obc, obc->obs.oi.soid, oloc, 0, temp_target);
+
+ assert(obc->is_blocked());
+ wait_for_blocked_object(obc->obs.oi.soid, op);
+}
+
void ReplicatedPG::execute_ctx(OpContext *ctx)
{
dout(10) << __func__ << " " << ctx << dendl;
@@ -3647,84 +3681,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
break;
- case CEPH_OSD_OP_COPY_GET:
+ case CEPH_OSD_OP_COPY_GET_CLASSIC:
++ctx->num_read;
- {
- object_copy_cursor_t cursor;
- uint64_t out_max;
- try {
- ::decode(cursor, bp);
- ::decode(out_max, bp);
- }
- catch (buffer::error& e) {
- result = -EINVAL;
- goto fail;
- }
-
- // size, mtime
- ::encode(oi.size, osd_op.outdata);
- ::encode(oi.mtime, osd_op.outdata);
-
- // attrs
- map<string,bufferptr> out_attrs;
- if (!cursor.attr_complete) {
- result = osd->store->getattrs(coll, soid, out_attrs, true);
- if (result < 0)
- break;
- cursor.attr_complete = true;
- dout(20) << " got attrs" << dendl;
- }
- ::encode(out_attrs, osd_op.outdata);
-
- int64_t left = out_max - osd_op.outdata.length();
-
- // data
- bufferlist bl;
- if (left > 0 && !cursor.data_complete) {
- if (cursor.data_offset < oi.size) {
- result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
- if (result < 0)
- return result;
- assert(result <= left);
- left -= result;
- cursor.data_offset += result;
- }
- if (cursor.data_offset == oi.size) {
- cursor.data_complete = true;
- dout(20) << " got data" << dendl;
- }
- }
- ::encode(bl, osd_op.outdata);
+ result = fill_in_copy_get(bp, osd_op, oi, true);
+ if (result == -EINVAL)
+ goto fail;
+ break;
- // omap
- std::map<std::string,bufferlist> out_omap;
- if (left > 0 && !cursor.omap_complete) {
- ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid);
- assert(iter);
- if (iter->valid()) {
- iter->upper_bound(cursor.omap_offset);
- for (; left > 0 && iter->valid(); iter->next()) {
- out_omap.insert(make_pair(iter->key(), iter->value()));
- left -= iter->key().length() + 4 + iter->value().length() + 4;
- }
- }
- if (iter->valid()) {
- cursor.omap_offset = iter->key();
- } else {
- cursor.omap_complete = true;
- dout(20) << " got omap" << dendl;
- }
- }
- ::encode(out_omap, osd_op.outdata);
-
- dout(20) << " cursor.is_complete=" << cursor.is_complete()
- << " " << out_attrs.size() << " attrs"
- << " " << bl.length() << " bytes"
- << " " << out_omap.size() << " keys"
- << dendl;
- ::encode(cursor, osd_op.outdata);
- result = 0;
- }
+ case CEPH_OSD_OP_COPY_GET:
+ ++ctx->num_read;
+ result = fill_in_copy_get(bp, osd_op, oi, false);
+ if (result == -EINVAL)
+ goto fail;
break;
case CEPH_OSD_OP_COPY_FROM:
@@ -3757,15 +3725,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
hobject_t temp_target = generate_temp_object();
CopyFromCallback *cb = new CopyFromCallback(ctx, temp_target);
ctx->copy_cb = cb;
- result = start_copy(cb, ctx->obc, src, src_oloc, src_version,
+ start_copy(cb, ctx->obc, src, src_oloc, src_version,
temp_target);
- if (result < 0)
- goto fail;
result = -EINPROGRESS;
} else {
// finish
assert(ctx->copy_cb->get_result() >= 0);
- result = finish_copyfrom(ctx);
+ finish_copyfrom(ctx);
+ result = 0;
}
}
break;
@@ -4362,7 +4329,93 @@ struct C_Copyfrom : public Context {
}
};
-int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
+int ReplicatedPG::fill_in_copy_get(bufferlist::iterator& bp, OSDOp& osd_op,
+ object_info_t& oi, bool classic)
+{
+ hobject_t& soid = oi.soid;
+ int result = 0;
+ object_copy_cursor_t cursor;
+ uint64_t out_max;
+ try {
+ ::decode(cursor, bp);
+ ::decode(out_max, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ return result;
+ }
+
+ object_copy_data_t reply_obj;
+ // size, mtime
+ reply_obj.size = oi.size;
+ reply_obj.mtime = oi.mtime;
+ reply_obj.category = oi.category;
+
+ // attrs
+ map<string,bufferlist>& out_attrs = reply_obj.attrs;
+ if (!cursor.attr_complete) {
+ result = osd->store->getattrs(coll, soid, out_attrs, true);
+ if (result < 0)
+ return result;
+ cursor.attr_complete = true;
+ dout(20) << " got attrs" << dendl;
+ }
+
+ int64_t left = out_max - osd_op.outdata.length();
+
+ // data
+ bufferlist& bl = reply_obj.data;
+ if (left > 0 && !cursor.data_complete) {
+ if (cursor.data_offset < oi.size) {
+ result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
+ if (result < 0)
+ return result;
+ assert(result <= left);
+ left -= result;
+ cursor.data_offset += result;
+ }
+ if (cursor.data_offset == oi.size) {
+ cursor.data_complete = true;
+ dout(20) << " got data" << dendl;
+ }
+ }
+
+ // omap
+ std::map<std::string,bufferlist>& out_omap = reply_obj.omap;
+ if (left > 0 && !cursor.omap_complete) {
+ ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid);
+ assert(iter);
+ if (iter->valid()) {
+ iter->upper_bound(cursor.omap_offset);
+ for (; left > 0 && iter->valid(); iter->next()) {
+ out_omap.insert(make_pair(iter->key(), iter->value()));
+ left -= iter->key().length() + 4 + iter->value().length() + 4;
+ }
+ }
+ if (iter->valid()) {
+ cursor.omap_offset = iter->key();
+ } else {
+ cursor.omap_complete = true;
+ dout(20) << " got omap" << dendl;
+ }
+ }
+
+ dout(20) << " cursor.is_complete=" << cursor.is_complete()
+ << " " << out_attrs.size() << " attrs"
+ << " " << bl.length() << " bytes"
+ << " " << out_omap.size() << " keys"
+ << dendl;
+ reply_obj.cursor = cursor;
+ if (classic) {
+ reply_obj.encode_classic(osd_op.outdata);
+ } else {
+ ::encode(reply_obj, osd_op.outdata);
+ }
+ result = 0;
+ return result;
+}
+
+void ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
hobject_t src, object_locator_t oloc, version_t version,
const hobject_t& temp_dest_oid)
{
@@ -4384,24 +4437,23 @@ int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
++obc->copyfrom_readside;
_copy_some(obc, cop);
-
- return 0;
}
void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
{
dout(10) << __func__ << " " << obc << " " << cop << dendl;
ObjectOperation op;
- if (cop->version) {
- op.assert_version(cop->version);
+ if (cop->results->user_version) {
+ op.assert_version(cop->results->user_version);
} else {
// we should learn the version after the first chunk, if we didn't know
// it already!
assert(cop->cursor.is_initial());
}
op.copy_get(&cop->cursor, cct->_conf->osd_copyfrom_max_chunk,
- &cop->size, &cop->mtime, &cop->attrs,
- &cop->data, &cop->omap,
+ &cop->results->object_size, &cop->results->mtime,
+ &cop->results->category,
+ &cop->attrs, &cop->data, &cop->omap,
&cop->rval);
C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid,
@@ -4412,7 +4464,7 @@ void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
new C_OnFinisher(fin,
&osd->objecter_finisher),
// discover the object version if we don't know it yet
- cop->version ? NULL : &cop->version);
+ cop->results->user_version ? NULL : &cop->results->user_version);
fin->tid = tid;
cop->objecter_tid = tid;
osd->objecter_lock.Unlock();
@@ -4435,7 +4487,6 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
ObjectContextRef obc = cop->obc;
cop->objecter_tid = 0;
- CopyResults results;
if (r >= 0) {
assert(cop->rval >= 0);
@@ -4454,12 +4505,11 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
_copy_some(obc, cop);
return;
}
- _build_finish_copy_transaction(cop, results.get<3>());
- results.get<1>() = cop->temp_cursor.data_offset;
+ _build_finish_copy_transaction(cop, cop->results->final_tx);
}
dout(20) << __func__ << " complete; committing" << dendl;
- results.get<0>() = r;
+ CopyCallbackResults results(r, cop->results);
cop->cb->complete(results);
copy_ops.erase(obc->obs.oi.soid);
@@ -4513,7 +4563,7 @@ void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
}
}
-int ReplicatedPG::finish_copyfrom(OpContext *ctx)
+void ReplicatedPG::finish_copyfrom(OpContext *ctx)
{
dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
ObjectState& obs = ctx->new_obs;
@@ -4526,8 +4576,8 @@ int ReplicatedPG::finish_copyfrom(OpContext *ctx)
if (cb->is_temp_obj_used()) {
ctx->discard_temp_oid = cb->temp_obj;
}
- ctx->op_t.swap(cb->results.get<3>());
- ctx->op_t.append(cb->results.get<3>());
+ ctx->op_t.swap(cb->results->final_tx);
+ ctx->op_t.append(cb->results->final_tx);
interval_set<uint64_t> ch;
if (obs.oi.size > 0)
@@ -4541,15 +4591,53 @@ int ReplicatedPG::finish_copyfrom(OpContext *ctx)
}
ctx->delta_stats.num_wr++;
ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10);
+}
- return 0;
+void ReplicatedPG::finish_promote(CopyResults *results, ObjectContextRef obc,
+ hobject_t& temp_obj)
+{
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ tctx->op_t.swap(results->final_tx);
+ if (results->started_temp_obj) {
+ tctx->discard_temp_oid = temp_obj;
+ }
+
+ RepGather *repop = new_repop(tctx, obc, rep_tid);
+ C_KickBlockedObject *blockedcb = new C_KickBlockedObject(obc, this);
+ repop->ondone = blockedcb;
+ object_stat_sum_t delta;
+ ++delta.num_objects;
+ obc->obs.exists = true;
+ delta.num_bytes += results->object_size;
+ obc->obs.oi.category = results->category;
+ info.stats.stats.add(delta, obc->obs.oi.category);
+ tctx->at_version.epoch = get_osdmap()->get_epoch();
+ tctx->at_version.version = pg_log.get_head().version + 1;
+ tctx->user_at_version = results->user_version;
+
+ tctx->log.push_back(pg_log_entry_t(
+ pg_log_entry_t::MODIFY,
+ obc->obs.oi.soid,
+ tctx->at_version,
+ tctx->obs->oi.version,
+ tctx->user_at_version,
+ osd_reqid_t(),
+ repop->ctx->mtime));
+ append_log(tctx->log, eversion_t(), tctx->local_t);
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+ repop->put();
}
void ReplicatedPG::cancel_copy(CopyOpRef cop)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
- << " from " << cop->src << " " << cop->oloc << " v" << cop->version
- << dendl;
+ << " from " << cop->src << " " << cop->oloc
+ << " v" << cop->results->user_version << dendl;
// cancel objecter op, if we can
if (cop->objecter_tid) {
@@ -4561,8 +4649,7 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop)
--cop->obc->copyfrom_readside;
kick_object_context_blocked(cop->obc);
- bool temp_obj_created = !cop->cursor.is_initial();
- CopyResults result(-ECANCELED, 0, temp_obj_created, ObjectStore::Transaction());
+ CopyCallbackResults result(-ECANCELED, cop->results);
cop->cb->complete(result);
}
@@ -4743,14 +4830,14 @@ void ReplicatedPG::eval_repop(RepGather *repop)
if (m)
dout(10) << "eval_repop " << *repop
<< " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"")
- << (repop->done ? " DONE" : "")
+ << (repop->done() ? " DONE" : "")
<< dendl;
else
dout(10) << "eval_repop " << *repop << " (no op)"
- << (repop->done ? " DONE" : "")
+ << (repop->done() ? " DONE" : "")
<< dendl;
- if (repop->done)
+ if (repop->done())
return;
// apply?
@@ -4855,7 +4942,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// done.
if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty() &&
repop->applied) {
- repop->done = true;
+ repop->mark_done();
calc_min_last_complete_ondisk();
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index bbe3f56be5b..ebba20361a6 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -97,18 +97,35 @@ public:
struct OpContext;
class CopyCallback;
+ /**
+ * CopyResults stores the object metadata of interest to a copy initiator.
+ */
+ struct CopyResults {
+ utime_t mtime; ///< the copy source's mtime
+ size_t object_size; ///< the copied object's size
+ bool started_temp_obj; ///< true if the callback needs to delete temp object
+ /**
+ * Final transaction; if non-empty the callback must execute it before any
+ * other accesses to the object (in order to complete the copy).
+ */
+ ObjectStore::Transaction final_tx;
+ string category; ///< The copy source's category
+ version_t user_version; ///< The copy source's user version
+ CopyResults() : object_size(0), started_temp_obj(false),
+ user_version(0) {}
+ };
+
struct CopyOp {
CopyCallback *cb;
ObjectContextRef obc;
hobject_t src;
object_locator_t oloc;
- version_t version;
+
+ CopyResults *results;
tid_t objecter_tid;
object_copy_cursor_t cursor;
- uint64_t size;
- utime_t mtime;
map<string,bufferlist> attrs;
bufferlist data;
map<string,bufferlist> omap;
@@ -120,12 +137,15 @@ public:
CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l,
version_t v, const hobject_t& dest)
- : cb(cb_), obc(_obc), src(s), oloc(l), version(v),
+ : cb(cb_), obc(_obc), src(s), oloc(l),
+ results(NULL),
objecter_tid(0),
- size(0),
rval(-1),
temp_oid(dest)
- {}
+ {
+ results = new CopyResults();
+ results->user_version = v;
+ }
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
@@ -135,29 +155,19 @@ public:
* one and give an instance of the class to start_copy.
*
* The implementer is responsible for making sure that the CopyCallback
- * can associate itself with the correct copy operation. The presence
- * of the closing Transaction ensures that write operations can be performed
- * atomically with the copy being completed (which doing them in separate
- * transactions would not allow); if you are doing the copy for a read
- * op you will have to generate a separate op to finish the copy with.
+ * can associate itself with the correct copy operation.
*/
- /// return code, total object size, data in temp object?, final Transaction
- typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults;
- class CopyCallback : public GenContext<CopyResults&> {
+ typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
+ class CopyCallback : public GenContext<CopyCallbackResults> {
protected:
CopyCallback() {}
/**
* results.get<0>() is the return code: 0 for success; -ECANCELLED if
* the operation was cancelled by the local OSD; -errno for other issues.
- * results.get<1>() is the total size of the object (for updating pg stats)
- * results.get<2>() indicates whether we have already written data to
- * the temp object (so it needs to get cleaned up, if the return code
- * indicates a failure)
- * results.get<3>() is a Transaction; if non-empty you need to perform
- * its results before any other accesses to the object in order to
- * complete the copy.
+ * results.get<1>() is a pointer to a CopyResults object, which you are
+ * responsible for deleting.
*/
- virtual void finish(CopyResults& results_) = 0;
+ virtual void finish(CopyCallbackResults results_) = 0;
public:
/// Provide the final size of the copied object to the CopyCallback
@@ -166,16 +176,18 @@ public:
class CopyFromCallback: public CopyCallback {
public:
- CopyResults results;
+ CopyResults *results;
+ int retval;
OpContext *ctx;
hobject_t temp_obj;
CopyFromCallback(OpContext *ctx_, const hobject_t& temp_obj_) :
- ctx(ctx_), temp_obj(temp_obj_) {}
+ results(NULL), retval(0), ctx(ctx_), temp_obj(temp_obj_) {}
~CopyFromCallback() {}
- virtual void finish(CopyResults& results_) {
- results = results_;
- int r = results.get<0>();
+ virtual void finish(CopyCallbackResults results_) {
+ results = results_.get<1>();
+ int r = results_.get<0>();
+ retval = r;
if (r >= 0) {
ctx->pg->execute_ctx(ctx);
}
@@ -186,14 +198,50 @@ public:
}
ctx->pg->close_op_ctx(ctx);
}
+ delete results;
}
- bool is_temp_obj_used() { return results.get<2>(); }
- uint64_t get_data_size() { return results.get<1>(); }
- int get_result() { return results.get<0>(); }
+ bool is_temp_obj_used() { return results->started_temp_obj; }
+ uint64_t get_data_size() { return results->object_size; }
+ int get_result() { return retval; }
};
friend class CopyFromCallback;
+ class PromoteCallback: public CopyCallback {
+ OpRequestRef op;
+ ObjectContextRef obc;
+ hobject_t temp_obj;
+ ReplicatedPG *pg;
+ public:
+ PromoteCallback(OpRequestRef op_, ObjectContextRef obc_,
+ const hobject_t& temp_obj_,
+ ReplicatedPG *pg_) :
+ op(op_), obc(obc_), temp_obj(temp_obj_), pg(pg_) {}
+
+ virtual void finish(CopyCallbackResults results) {
+ CopyResults* results_data = results.get<1>();
+ int r = results.get<0>();
+ if (r >= 0) {
+ pg->finish_promote(results_data, obc, temp_obj);
+ } else {
+ // we need to get rid of the op in the blocked queue
+ map<hobject_t,list<OpRequestRef> >::iterator blocked_iter;
+ blocked_iter = pg->waiting_for_blocked_object.find(obc->obs.oi.soid);
+ assert(blocked_iter != pg->waiting_for_blocked_object.end());
+ assert(blocked_iter->second.begin()->get() == op.get());
+ blocked_iter->second.pop_front();
+ if (blocked_iter->second.empty()) {
+ pg->waiting_for_blocked_object.erase(blocked_iter);
+ }
+ if (r != -ECANCELED) { // on cancel the client will resend
+ pg->osd->reply_op_error(op, r);
+ }
+ }
+ delete results_data;
+ }
+ };
+ friend class PromoteCallback;
+
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() {
return pgbackend.get();
@@ -410,6 +458,7 @@ public:
* State on the PG primary associated with the replicated mutation
*/
class RepGather {
+ bool is_done;
public:
xlist<RepGather*>::item queue_item;
int nref;
@@ -422,7 +471,7 @@ public:
tid_t rep_tid;
- bool applying, applied, aborted, done;
+ bool applying, applied, aborted;
set<int> waitfor_ack;
//set<int> waitfor_nvram;
@@ -431,6 +480,8 @@ public:
//bool sent_nvram;
bool sent_disk;
+ Context *ondone; ///< if set, this Context will be activated when repop is done
+
utime_t start;
eversion_t pg_local_last_complete;
@@ -440,14 +491,16 @@ public:
RepGather(OpContext *c, ObjectContextRef pi, tid_t rt,
eversion_t lc) :
+ is_done(false),
queue_item(this),
nref(1),
ctx(c), obc(pi),
rep_tid(rt),
- applying(false), applied(false), aborted(false), done(false),
+ applying(false), applied(false), aborted(false),
sent_ack(false),
//sent_nvram(false),
sent_disk(false),
+ ondone(NULL),
pg_local_last_complete(lc),
queue_snap_trimmer(false) { }
@@ -465,6 +518,14 @@ public:
//generic_dout(0) << "deleting " << this << dendl;
}
}
+ void mark_done() {
+ is_done = true;
+ if (ondone)
+ ondone->complete(0);
+ }
+ bool done() {
+ return is_done;
+ }
};
@@ -846,8 +907,19 @@ protected:
uint64_t offset, uint64_t length, bool count_bytes);
void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
+ /**
+ * This helper function is called from do_op if the ObjectContext lookup fails.
+ * @returns true if the caching code is handling the Op, false otherwise.
+ */
inline bool maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r);
+ /**
+ * This helper function tells the client to redirect their request elsewhere.
+ */
void do_cache_redirect(OpRequestRef op, ObjectContextRef obc);
+ /**
+ * This function starts up a copy from
+ */
+ void promote_object(OpRequestRef op, ObjectContextRef obc);
int prepare_transaction(OpContext *ctx);
@@ -989,7 +1061,19 @@ protected:
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
- int start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+ int fill_in_copy_get(bufferlist::iterator& bp, OSDOp& op,
+ object_info_t& oi, bool classic);
+ /**
+ * To copy an object, call start_copy.
+ *
+ * @param cb: The CopyCallback to be activated when the copy is complete
+ * @param obc: The ObjectContext we are copying into
+ * @param src: The source object
+ * @param oloc: the source object locator
+ * @param version: the version of the source object to copy (0 for any)
+ * @param temp_dest_oid: the temporary object to use for large objects
+ */
+ void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
object_locator_t oloc, version_t version,
const hobject_t& temp_dest_oid);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
@@ -997,7 +1081,9 @@ protected:
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
void _build_finish_copy_transaction(CopyOpRef cop,
ObjectStore::Transaction& t);
- int finish_copyfrom(OpContext *ctx);
+ void finish_copyfrom(OpContext *ctx);
+ void finish_promote(CopyResults *results, ObjectContextRef obc,
+ hobject_t& temp_obj);
void cancel_copy(CopyOpRef cop);
void cancel_copy_ops();
@@ -1145,6 +1231,17 @@ public:
void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
void kick_object_context_blocked(ObjectContextRef obc);
+ struct C_KickBlockedObject : public Context {
+ ObjectContextRef obc;
+ ReplicatedPG *pg;
+ C_KickBlockedObject(ObjectContextRef obc_, ReplicatedPG *pg_) :
+ obc(obc_), pg(pg_) {}
+ protected:
+ void finish(int r) {
+ pg->kick_object_context_blocked(obc);
+ }
+ };
+
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);
ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index b441d3fc8c7..4b853130ad4 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -2595,6 +2595,95 @@ void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>&
o.back()->omap_complete = true;
}
+// -- object_copy_data_t --
+
+void object_copy_data_t::encode_classic(bufferlist& bl) const
+{
+ ::encode(size, bl);
+ ::encode(mtime, bl);
+ ::encode(attrs, bl);
+ ::encode(data, bl);
+ ::encode(omap, bl);
+ ::encode(cursor, bl);
+}
+
+void object_copy_data_t::decode_classic(bufferlist::iterator& bl)
+{
+ ::decode(size, bl);
+ ::decode(mtime, bl);
+ ::decode(attrs, bl);
+ ::decode(data, bl);
+ ::decode(omap, bl);
+ ::decode(cursor, bl);
+}
+
+void object_copy_data_t::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(size, bl);
+ ::encode(mtime, bl);
+ ::encode(category, bl);
+ ::encode(attrs, bl);
+ ::encode(data, bl);
+ ::encode(omap, bl);
+ ::encode(cursor, bl);
+ ENCODE_FINISH(bl);
+}
+
+void object_copy_data_t::decode(bufferlist::iterator& bl)
+{
+ DECODE_START(1, bl);
+ ::decode(size, bl);
+ ::decode(mtime, bl);
+ ::decode(category, bl);
+ ::decode(attrs, bl);
+ ::decode(data, bl);
+ ::decode(omap, bl);
+ ::decode(cursor, bl);
+ DECODE_FINISH(bl);
+}
+
+void object_copy_data_t::generate_test_instances(list<object_copy_data_t*>& o)
+{
+ o.push_back(new object_copy_data_t());
+
+ list<object_copy_cursor_t*> cursors;
+ object_copy_cursor_t::generate_test_instances(cursors);
+ list<object_copy_cursor_t*>::iterator ci = cursors.begin();
+ o.back()->cursor = **(ci++);
+
+ o.push_back(new object_copy_data_t());
+ o.back()->cursor = **(ci++);
+
+ o.push_back(new object_copy_data_t());
+ o.back()->size = 1234;
+ o.back()->mtime.set_from_double(1234);
+ bufferptr bp("there", 5);
+ bufferlist bl;
+ bl.push_back(bp);
+ o.back()->attrs["hello"] = bl;
+ bufferptr bp2("not", 3);
+ bufferlist bl2;
+ bl2.push_back(bp2);
+ o.back()->omap["why"] = bl2;
+ bufferptr databp("iamsomedatatocontain", 20);
+ o.back()->data.push_back(databp);
+}
+
+void object_copy_data_t::dump(Formatter *f) const
+{
+ f->open_object_section("cursor");
+ cursor.dump(f);
+ f->close_section(); // cursor
+ f->dump_int("size", size);
+ f->dump_stream("mtime") << mtime;
+ /* we should really print out the attrs here, but bufferlist
+ const-correctness prents that */
+ f->dump_int("attrs_size", attrs.size());
+ f->dump_int("omap_size", omap.size());
+ f->dump_int("data_length", data.length());
+}
+
// -- pg_create_t --
void pg_create_t::encode(bufferlist &bl) const
@@ -3709,7 +3798,7 @@ ostream& operator<<(ostream& out, const OSDOp& op)
out << (op.op.watch.flag ? " add":" remove")
<< " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver;
break;
- case CEPH_OSD_OP_COPY_GET:
+ case CEPH_OSD_OP_COPY_GET_CLASSIC:
out << " max " << op.op.copy_get.max;
case CEPH_OSD_OP_COPY_FROM:
out << " ver " << op.op.copy_from.src_version;
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 1b15877e747..59c5f175f86 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -1934,6 +1934,43 @@ struct object_copy_cursor_t {
};
WRITE_CLASS_ENCODER(object_copy_cursor_t)
+/**
+ * object_copy_data_t
+ *
+ * Return data from a copy request. The semantics are a litte strange
+ * right now to accommodate the implicit encoding that was previously used
+ * in its place.
+ *
+ * In particular, the sender unconditionally fills in the cursor (from what
+ * it receives and sends), the size, and the mtime, but is responsible for
+ * figuring out whether it should put any data in the in_attrs, data, or
+ * omap members (corresponding to xattrs, object data, and the omap entries)
+ * based on external data (the client includes a max amount to return with
+ * the copy request). The client then looks into the attrs, data, and/or omap
+ * based on the contents of the cursor. Note the change from in_attrs to attrs --
+ * this is the result of some silly interface differences which were
+ * previously elided because bufferlists and bufferptrs encode on the wire the
+ * same way.
+ */
+struct object_copy_data_t {
+ object_copy_cursor_t cursor;
+ uint64_t size;
+ utime_t mtime;
+ map<string, bufferlist> attrs;
+ bufferlist data;
+ map<string, bufferlist> omap;
+ string category;
+public:
+ object_copy_data_t() : size((uint64_t)-1) {}
+
+ static void generate_test_instances(list<object_copy_data_t*>& o);
+ void encode_classic(bufferlist& bl) const;
+ void decode_classic(bufferlist::iterator& bl);
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& bl);
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(object_copy_data_t)
/**
* pg creation info
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 40331f2afb4..6e85167a001 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -569,6 +569,7 @@ struct ObjectOperation {
object_copy_cursor_t *cursor;
uint64_t *out_size;
utime_t *out_mtime;
+ string *out_category;
std::map<std::string,bufferlist> *out_attrs;
bufferlist *out_data;
std::map<std::string,bufferlist> *out_omap;
@@ -576,43 +577,34 @@ struct ObjectOperation {
C_ObjectOperation_copyget(object_copy_cursor_t *c,
uint64_t *s,
utime_t *m,
+ string *cat,
std::map<std::string,bufferlist> *a,
bufferlist *d,
std::map<std::string,bufferlist> *o,
int *r)
: cursor(c),
- out_size(s), out_mtime(m), out_attrs(a),
- out_data(d), out_omap(o), prval(r) {}
+ out_size(s), out_mtime(m), out_category(cat),
+ out_attrs(a), out_data(d), out_omap(o), prval(r) {}
void finish(int r) {
if (r < 0)
return;
try {
bufferlist::iterator p = bl.begin();
- uint64_t size;
- ::decode(size, p);
+ object_copy_data_t copy_reply;
+ ::decode(copy_reply, p);
if (out_size)
- *out_size = size;
- utime_t mtime;
- ::decode(mtime, p);
+ *out_size = copy_reply.size;
if (out_mtime)
- *out_mtime = mtime;
- if (out_attrs) {
- ::decode_noclear(*out_attrs, p);
- } else {
- std::map<std::string,bufferlist> t;
- ::decode(t, p);
- }
- bufferlist bl;
- ::decode(bl, p);
+ *out_mtime = copy_reply.mtime;
+ if (out_category)
+ *out_category = copy_reply.category;
+ if (out_attrs)
+ *out_attrs = copy_reply.attrs;
if (out_data)
- out_data->claim_append(bl);
- if (out_omap) {
- ::decode_noclear(*out_omap, p);
- } else {
- std::map<std::string,bufferlist> t;
- ::decode(t, p);
- }
- ::decode(*cursor, p);
+ out_data->claim_append(copy_reply.data);
+ if (out_omap)
+ *out_omap = copy_reply.omap;
+ *cursor = copy_reply.cursor;
} catch (buffer::error& e) {
if (prval)
*prval = -EIO;
@@ -624,6 +616,7 @@ struct ObjectOperation {
uint64_t max,
uint64_t *out_size,
utime_t *out_mtime,
+ string *out_category,
std::map<std::string,bufferlist> *out_attrs,
bufferlist *out_data,
std::map<std::string,bufferlist> *out_omap,
@@ -635,7 +628,8 @@ struct ObjectOperation {
unsigned p = ops.size() - 1;
out_rval[p] = prval;
C_ObjectOperation_copyget *h =
- new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_attrs, out_data, out_omap, prval);
+ new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_category,
+ out_attrs, out_data, out_omap, prval);
out_bl[p] = &h->bl;
out_handler[p] = h;
}
diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h
index 33cb7950cb3..1ce2c72a58c 100644
--- a/src/test/encoding/types.h
+++ b/src/test/encoding/types.h
@@ -64,6 +64,7 @@ TYPE(pg_missing_t::item)
TYPE(pg_missing_t)
TYPE(pg_ls_response_t)
TYPE(object_copy_cursor_t)
+TYPE(object_copy_data_t)
TYPE(pg_create_t)
TYPE(watch_info_t)
TYPE(object_info_t)