diff options
author | athanatos <rexludorum@gmail.com> | 2013-08-22 10:24:52 -0700 |
---|---|---|
committer | athanatos <rexludorum@gmail.com> | 2013-08-22 10:24:52 -0700 |
commit | 67f160eb5ab81d5ad355633fe4fa1968961cdf27 (patch) | |
tree | b0bb5af0b4850e9840447ca99860d09b77eb0cfb | |
parent | a74247722ac1e573bee30c25ad8e7ace8bcd169b (diff) | |
parent | d980f581e3530d28f3f690b3f4c099995d8d8ec5 (diff) | |
download | ceph-67f160eb5ab81d5ad355633fe4fa1968961cdf27.tar.gz |
Merge pull request #414 from dachary/wip-5510
replace ObjectContext pointers with shared_ptr
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/common/sharedptr_registry.hpp | 20 | ||||
-rw-r--r-- | src/osd/OSD.cc | 1 | ||||
-rw-r--r-- | src/osd/PG.cc | 3 | ||||
-rw-r--r-- | src/osd/PG.h | 9 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 312 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 105 | ||||
-rw-r--r-- | src/osd/Watch.cc | 18 | ||||
-rw-r--r-- | src/osd/Watch.h | 10 | ||||
-rw-r--r-- | src/osd/osd_types.h | 29 | ||||
-rw-r--r-- | src/test/test_osd_types.cc | 9 |
10 files changed, 225 insertions, 291 deletions
diff --git a/src/common/sharedptr_registry.hpp b/src/common/sharedptr_registry.hpp index a62aa0d9ce3..6579bd4ba71 100644 --- a/src/common/sharedptr_registry.hpp +++ b/src/common/sharedptr_registry.hpp @@ -58,6 +58,26 @@ public: lock("SharedPtrRegistry::lock") {} + bool empty() { + Mutex::Locker l(lock); + return contents.empty(); + } + + bool get_next(const K &key, pair<K, VPtr> *next) { + VPtr next_val; + Mutex::Locker l(lock); + typename map<K, WeakVPtr>::iterator i = contents.upper_bound(key); + while (i != contents.end() && + !(next_val = i->second.lock())) + ++i; + if (i == contents.end()) + return false; + if (next) + *next = make_pair(i->first, next_val); + return true; + } + + bool get_next(const K &key, pair<K, V> *next) { VPtr next_val; Mutex::Locker l(lock); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 55f11707189..9a58289eda4 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1522,7 +1522,6 @@ int OSD::shutdown() dout(20) << " kicking pg " << p->first << dendl; p->second->lock(); p->second->on_shutdown(); - p->second->kick(); p->second->unlock(); p->second->osr->flush(); } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 49ea61a603a..cd5621cddf2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4527,9 +4527,6 @@ void PG::start_peering_interval(const OSDMapRef lastmap, { const OSDMapRef osdmap = get_osdmap(); - // -- there was a change! -- - kick(); - set_last_peering_reset(); vector<int> oldacting, oldup; diff --git a/src/osd/PG.h b/src/osd/PG.h index 14ac7c9fac5..720ce67bca3 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -232,7 +232,6 @@ protected: * put_unlock() when done with the current pointer (_most common_). */ Mutex _lock; - Cond _cond; atomic_t ref; #ifdef PG_DEBUG_REFS @@ -261,14 +260,6 @@ public: bool is_locked() const { return _lock.is_locked(); } - void wait() { - assert(_lock.is_locked()); - _cond.Wait(_lock); - } - void kick() { - assert(_lock.is_locked()); - _cond.Signal(); - } #ifdef PG_DEBUG_REFS uint64_t get_with_id(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6c7ec56af88..fcdbb182d52 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4,6 +4,9 @@ * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com> + * + * Author: Loic Dachary <loic@dachary.org> * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -613,7 +616,9 @@ void ReplicatedPG::calc_trim_to() ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, pg_t p, const hobject_t& oid, const hobject_t& ioid) : - PG(o, curmap, _pool, p, oid, ioid), temp_created(false), + PG(o, curmap, _pool, p, oid, ioid), + snapset_contexts_lock("ReplicatedPG::snapset_contexts"), + temp_created(false), temp_coll(coll_t::make_temp_coll(p)), snap_trimmer_machine(this) { snap_trimmer_machine.initiate(); @@ -693,7 +698,7 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - ObjectContext *obc; + ObjectContextRef obc; bool can_create = op->may_write(); snapid_t snapid; int r = find_object_context( @@ -747,7 +752,6 @@ void ReplicatedPG::do_op(OpRequestRef op) if (!op->may_write() && !obc->obs.exists) { osd->reply_op_error(op, -ENOENT); - put_object_context(obc); return; } @@ -756,7 +760,6 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by " << obc->blocked_by->obs.oi.soid << dendl; wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op); - put_object_context(obc); return; } @@ -773,7 +776,7 @@ void ReplicatedPG::do_op(OpRequestRef op) } // src_oids - map<hobject_t,ObjectContext*> src_obc; + map<hobject_t,ObjectContextRef> src_obc; for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); ++p) { OSDOp& osd_op = *p; @@ -781,7 +784,6 @@ void ReplicatedPG::do_op(OpRequestRef op) if (osd_op.op.op == CEPH_OSD_OP_LIST_SNAPS && m->get_snapid() != CEPH_SNAPDIR) { dout(10) << "LIST_SNAPS with incorrect context" << dendl; - put_object_context(obc); osd->reply_op_error(op, -EINVAL); return; } @@ -794,7 +796,7 @@ void ReplicatedPG::do_op(OpRequestRef op) hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps(), info.pgid.pool(), m->get_object_locator().nspace); if (!src_obc.count(src_oid)) { - ObjectContext *sobc; + ObjectContextRef sobc; snapid_t ssnapid; int r = find_object_context(src_oid, &sobc, false, &ssnapid); @@ -816,10 +818,8 @@ void ReplicatedPG::do_op(OpRequestRef op) (before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) { wait_for_degraded_object(sobc->obs.oi.soid, op); dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by " - << sobc->obs.oi.soid << dendl; - obc->get(); + << sobc->obs.oi.soid << dendl; obc->blocked_by = sobc; - sobc->get(); sobc->blocking.insert(obc); } else { dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl; @@ -836,8 +836,7 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << "no src oid specified for multi op " << osd_op << dendl; osd->reply_op_error(op, -EINVAL); } - put_object_contexts(src_obc); - put_object_context(obc); + src_obc.clear(); return; } @@ -852,7 +851,7 @@ void ReplicatedPG::do_op(OpRequestRef op) hobject_t clone_oid = obc->obs.oi.soid; clone_oid.snap = *p; if (!src_obc.count(clone_oid)) { - ObjectContext *sobc; + ObjectContextRef sobc; snapid_t ssnapid; int r = find_object_context(clone_oid, &sobc, false, &ssnapid); @@ -868,8 +867,7 @@ void ReplicatedPG::do_op(OpRequestRef op) src_obc[clone_oid] = sobc; continue; } - put_object_contexts(src_obc); - put_object_context(obc); + src_obc.clear(); return; } else { continue; @@ -902,8 +900,7 @@ void ReplicatedPG::do_op(OpRequestRef op) << " < snapset seq " << obc->ssc->snapset.seq << " on " << soid << dendl; delete ctx; - put_object_context(obc); - put_object_contexts(src_obc); + src_obc.clear(); osd->reply_op_error(op, -EOLDSNAPC); return; } @@ -912,8 +909,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (oldv != eversion_t()) { dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl; delete ctx; - put_object_context(obc); - put_object_contexts(src_obc); + src_obc.clear(); if (already_complete(oldv)) { osd->reply_op_error(op, 0, oldv); } else { @@ -970,7 +966,7 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << " taking ondisk_read_lock" << dendl; obc->ondisk_read_lock(); } - for (map<hobject_t,ObjectContext*>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) { + for (map<hobject_t,ObjectContextRef>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) { dout(10) << " taking ondisk_read_lock for src " << p->first << dendl; p->second->ondisk_read_lock(); } @@ -981,7 +977,7 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << " dropping ondisk_read_lock" << dendl; obc->ondisk_read_unlock(); } - for (map<hobject_t,ObjectContext*>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) { + for (map<hobject_t,ObjectContextRef>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) { dout(10) << " dropping ondisk_read_lock for src " << p->first << dendl; p->second->ondisk_read_unlock(); } @@ -989,8 +985,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (result == -EAGAIN) { // clean up after the ctx delete ctx; - put_object_context(obc); - put_object_contexts(src_obc); + src_obc.clear(); return; } @@ -998,8 +993,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (ctx->delta_stats.num_bytes > 0 && pool.info.get_flags() & pg_pool_t::FLAG_FULL) { delete ctx; - put_object_context(obc); - put_object_contexts(src_obc); + src_obc.clear(); osd->reply_op_error(op, -ENOSPC); return; } @@ -1044,8 +1038,7 @@ void ReplicatedPG::do_op(OpRequestRef op) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); osd->send_message_osd_client(reply, m->get_connection()); delete ctx; - put_object_context(obc); - put_object_contexts(src_obc); + src_obc.clear(); return; } @@ -1056,9 +1049,6 @@ void ReplicatedPG::do_op(OpRequestRef op) append_log(ctx->log, pg_trim_to, ctx->local_t); - // continuing on to write path, make sure object context is registered - assert(obc->registered); - // verify that we are doing this in order? if (g_conf->osd_debug_op_order && m->get_source().is_client()) { map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid]; @@ -1466,14 +1456,13 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) { // load clone info bufferlist bl; - ObjectContext *obc = 0; + ObjectContextRef obc; int r = find_object_context(coid, &obc, false, NULL); if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) { derr << __func__ << "could not find coid " << coid << dendl; assert(0); } assert(r == 0); - assert(obc->registered); object_info_t &coi = obc->obs.oi; set<snapid_t> old_snaps(coi.snaps.begin(), coi.snaps.end()); @@ -1615,7 +1604,6 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid) snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash, info.pgid.pool(), coid.get_namespace()); ctx->snapset_obc = get_object_context(snapoid, false); - assert(ctx->snapset_obc->registered); if (snapset.clones.empty() && !snapset.head_exists) { dout(10) << coid << " removing " << snapoid << dendl; @@ -2066,7 +2054,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->user_modify = true; } - ObjectContext *src_obc = 0; + ObjectContextRef src_obc; if (ceph_osd_op_type_multi(op.op)) { MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request); object_locator_t src_oloc; @@ -2484,7 +2472,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) hobject_t clone_oid = soid; clone_oid.snap = *clone_iter; - ObjectContext *clone_obc = ctx->src_obc[clone_oid]; + ObjectContextRef clone_obc = ctx->src_obc[clone_oid]; assert(clone_obc); for (vector<snapid_t>::reverse_iterator p = clone_obc->obs.oi.snaps.rbegin(); p != clone_obc->obs.oi.snaps.rend(); @@ -2825,9 +2813,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) uint64_t cookie = op.watch.cookie; bool do_watch = op.watch.flag & 1; entity_name_t entity = ctx->reqid.name; - ObjectContext *obc = ctx->obc; + ObjectContextRef obc = ctx->obc; - dout(10) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie + dout(10) << "watch: ctx->obc=" << (void *)obc.get() << " cookie=" << cookie << " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl; dout(10) << "watch: oi.user_version=" << oi.user_version.version << dendl; dout(10) << "watch: peer_addr=" @@ -2845,7 +2833,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) t.nop(); // make sure update the object_info on disk! } ctx->watch_connects.push_back(w); - assert(obc->registered); } else { map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter = oi.watchers.find(make_pair(cookie, entity)); @@ -3428,7 +3415,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl; - ObjectContext *rollback_to; + ObjectContextRef rollback_to; int ret = find_object_context( hobject_t(soid.oid, soid.get_key(), snapid, soid.hash, info.pgid.pool(), soid.get_namespace()), &rollback_to, false, &cloneid); @@ -3511,7 +3498,6 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) obs.oi.size = rollback_to->obs.oi.size; snapset.head_exists = true; } - put_object_context(rollback_to); } return ret; } @@ -3567,9 +3553,10 @@ void ReplicatedPG::make_writeable(OpContext *ctx) object_info_t static_snap_oi(coid); object_info_t *snap_oi; if (is_primary()) { - ctx->clone_obc = new ObjectContext(static_snap_oi, true, NULL); - ctx->clone_obc->get(); - register_object_context(ctx->clone_obc); + ctx->clone_obc = object_contexts.lookup_or_create(static_snap_oi.soid); + ctx->clone_obc->destructor_callback = new C_PG_ObjectContext(this, ctx->clone_obc.get()); + ctx->clone_obc->obs.oi = static_snap_oi; + ctx->clone_obc->obs.exists = true; snap_oi = &ctx->clone_obc->obs.oi; } else { snap_oi = &static_snap_oi; @@ -3819,7 +3806,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->at_version.version++; ctx->snapset_obc->obs.exists = false; - assert(ctx->snapset_obc->registered); } } } else if (ctx->new_snapset.clones.size()) { @@ -3836,7 +3822,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->snapset_obc->obs.oi.version = ctx->at_version; ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid; ctx->snapset_obc->obs.oi.mtime = ctx->mtime; - assert(ctx->snapset_obc->registered); bufferlist bv(sizeof(ctx->new_obs.oi)); ::encode(ctx->snapset_obc->obs.oi, bv); @@ -3978,17 +3963,14 @@ void ReplicatedPG::op_applied(RepGather *repop) int whoami = osd->get_nodeid(); if (repop->ctx->clone_obc) { - put_object_context(repop->ctx->clone_obc); - repop->ctx->clone_obc = 0; + repop->ctx->clone_obc = ObjectContextRef(); } if (repop->ctx->snapset_obc) { - put_object_context(repop->ctx->snapset_obc); - repop->ctx->snapset_obc = 0; + repop->ctx->snapset_obc = ObjectContextRef(); } - put_object_context(repop->obc); - put_object_contexts(repop->src_obc); - repop->obc = 0; + repop->src_obc.clear(); + repop->obc = ObjectContextRef(); if (!repop->aborted) { assert(repop->waitfor_ack.count(whoami) || @@ -4264,7 +4246,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, } } -ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *obc, +ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid) { if (ctx->op) @@ -4349,16 +4331,14 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers) { - for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin(); - i != object_contexts.end(); - ++i) { - i->second->get(); - get_obc_watchers(i->second, pg_watchers); - put_object_context(i->second); + pair<hobject_t, ObjectContextRef> i; + while (object_contexts.get_next(i.first, &i)) { + ObjectContextRef obc(i.second); + get_obc_watchers(obc, pg_watchers); } } -void ReplicatedPG::get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers) +void ReplicatedPG::get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers) { for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = obc->watchers.begin(); @@ -4382,16 +4362,12 @@ void ReplicatedPG::get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> & void ReplicatedPG::check_blacklisted_watchers() { dout(20) << "ReplicatedPG::check_blacklisted_watchers for pg " << get_pgid() << dendl; - for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin(); - i != object_contexts.end(); - ++i) { - i->second->get(); - check_blacklisted_obc_watchers(i->second); - put_object_context(i->second); - } + pair<hobject_t, ObjectContextRef> i; + while (object_contexts.get_next(i.first, &i)) + check_blacklisted_obc_watchers(i.second); } -void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContext *obc) +void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContextRef obc) { dout(20) << "ReplicatedPG::check_blacklisted_obc_watchers for obc " << obc->obs.oi.soid << dendl; for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator k = @@ -4411,7 +4387,7 @@ void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContext *obc) } } -void ReplicatedPG::populate_obc_watchers(ObjectContext *obc) +void ReplicatedPG::populate_obc_watchers(ObjectContextRef obc) { assert(is_active()); assert(!is_missing_object(obc->obs.oi.soid) || @@ -4447,7 +4423,7 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc) void ReplicatedPG::handle_watch_timeout(WatchRef watch) { - ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref + ObjectContextRef obc = watch->get_obc(); // handle_watch_timeout owns this ref dout(10) << "handle_watch_timeout obc " << obc << dendl; if (is_degraded_object(obc->obs.oi.soid)) { @@ -4457,7 +4433,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) dout(10) << "handle_watch_timeout waiting for degraded on obj " << obc->obs.oi.soid << dendl; - put_object_context(obc); // callback got its own ref return; } @@ -4468,7 +4443,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) scrubber.add_callback( watch->get_delayed_cb() // This callback! ); - put_object_context(obc); return; } @@ -4516,41 +4490,35 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) eval_repop(repop); } -ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid) +ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi, + SnapSetContext *ssc) { - map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid); - if (p != object_contexts.end()) - return p->second; - return NULL; -} - -ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi, - SnapSetContext *ssc) -{ - ObjectContext *obc = new ObjectContext(oi, false, ssc); - dout(10) << "create_object_context " << obc << " " << oi.soid << " " << obc->ref << dendl; - register_object_context(obc); + ObjectContextRef obc(object_contexts.lookup_or_create(oi.soid)); + assert(obc->destructor_callback == NULL); + obc->destructor_callback = new C_PG_ObjectContext(this, obc.get()); + obc->obs.oi = oi; + obc->obs.exists = false; + obc->ssc = ssc; + if (ssc) + register_snapset_context(ssc); + dout(10) << "create_object_context " << (void*)obc.get() << " " << oi.soid << " " << dendl; populate_obc_watchers(obc); - obc->ref++; return obc; } -ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, - bool can_create) +ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, + bool can_create) { - map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(soid); - ObjectContext *obc; - if (p != object_contexts.end()) { - obc = p->second; - dout(10) << "get_object_context " << obc << " " << soid << " " << obc->ref - << " -> " << (obc->ref+1) << dendl; + ObjectContextRef obc = object_contexts.lookup(soid); + if (obc) { + dout(10) << "get_object_context " << obc << " " << soid << dendl; } else { // check disk bufferlist bv; int r = osd->store->getattr(coll, soid, OI_ATTR, bv); if (r < 0) { if (!can_create) - return NULL; // -ENOENT! + return ObjectContextRef(); // -ENOENT! // new object. object_info_t oi(soid); @@ -4562,49 +4530,41 @@ ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, assert(oi.soid.pool == (int64_t)info.pgid.pool()); - SnapSetContext *ssc = NULL; - if (can_create) - ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace()); - obc = new ObjectContext(oi, true, ssc); + obc = object_contexts.lookup_or_create(oi.soid); + obc->destructor_callback = new C_PG_ObjectContext(this, obc.get()); + obc->obs.oi = oi; obc->obs.exists = true; - register_object_context(obc); - - if (can_create && !obc->ssc) + if (can_create) { obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace()); + register_snapset_context(obc->ssc); + } populate_obc_watchers(obc); dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl; } - obc->ref++; return obc; } void ReplicatedPG::context_registry_on_change() { - list<ObjectContext *> contexts; - for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin(); - i != object_contexts.end(); - ++i) { - i->second->get(); - contexts.push_back(i->second); - for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = - i->second->watchers.begin(); - j != i->second->watchers.end(); - i->second->watchers.erase(j++)) { - j->second->discard(); + pair<hobject_t, ObjectContextRef> i; + while (object_contexts.get_next(i.first, &i)) { + ObjectContextRef obc(i.second); + if (obc) { + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = + obc->watchers.begin(); + j != obc->watchers.end(); + obc->watchers.erase(j++)) { + j->second->discard(); + } } } - for (list<ObjectContext *>::iterator i = contexts.begin(); - i != contexts.end(); - contexts.erase(i++)) { - put_object_context(*i); - } } int ReplicatedPG::find_object_context(const hobject_t& oid, - ObjectContext **pobc, + ObjectContextRef *pobc, bool can_create, snapid_t *psnapid) { @@ -4616,11 +4576,10 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, // want the snapdir? if (oid.snap == CEPH_SNAPDIR) { // return head or snapdir, whichever exists. - ObjectContext *obc = get_object_context(head, can_create); + ObjectContextRef obc = get_object_context(head, can_create); if (obc && !obc->obs.exists) { // ignore it if the obc exists but the object doesn't - put_object_context(obc); - obc = NULL; + obc = ObjectContextRef(); } if (!obc) { obc = get_object_context(snapdir, can_create); @@ -4638,7 +4597,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, // want the head? if (oid.snap == CEPH_NOSNAP) { - ObjectContext *obc = get_object_context(head, can_create); + ObjectContextRef obc = get_object_context(head, can_create); if (!obc) return -ENOENT; dout(10) << "find_object_context " << oid << " @" << oid.snap << dendl; @@ -4661,7 +4620,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, // head? if (oid.snap > ssc->snapset.seq) { if (ssc->snapset.head_exists) { - ObjectContext *obc = get_object_context(head, false); + ObjectContextRef obc = get_object_context(head, false); dout(10) << "find_object_context " << head << " want " << oid.snap << " > snapset seq " << ssc->snapset.seq << " -- HIT " << obc->obs @@ -4706,7 +4665,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, return -EAGAIN; } - ObjectContext *obc = get_object_context(soid, false); + ObjectContextRef obc = get_object_context(soid, false); assert(obc); // clone @@ -4721,41 +4680,20 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, } else { dout(20) << "find_object_context " << soid << " [" << first << "," << last << "] does not contain " << oid.snap << " -- DNE" << dendl; - put_object_context(obc); return -ENOENT; } } -void ReplicatedPG::put_object_context(ObjectContext *obc) +void ReplicatedPG::object_context_destructor_callback(ObjectContext *obc) { - dout(10) << "put_object_context " << obc << " " << obc->obs.oi.soid << " " - << obc->ref << " -> " << (obc->ref-1) << dendl; - - --obc->ref; - if (obc->ref == 0) { - if (obc->ssc) - put_snapset_context(obc->ssc); + dout(10) << "object_context_destructor_callback " << obc << " " + << obc->obs.oi.soid << dendl; - if (obc->registered) - object_contexts.erase(obc->obs.oi.soid); - delete obc; - - if (object_contexts.empty()) - kick(); - } + if (obc->ssc) + put_snapset_context(obc->ssc); } -void ReplicatedPG::put_object_contexts(map<hobject_t,ObjectContext*>& obcv) -{ - if (obcv.empty()) - return; - dout(10) << "put_object_contexts " << obcv << dendl; - for (map<hobject_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p) - put_object_context(p->second); - obcv.clear(); -} - -void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *pgstat) +void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *pgstat) { object_info_t& oi = obc->obs.oi; @@ -4797,9 +4735,10 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t * SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) { + Mutex::Locker l(snapset_contexts_lock); SnapSetContext *ssc = new SnapSetContext(oid); dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl; - register_snapset_context(ssc); + _register_snapset_context(ssc); ssc->ref++; return ssc; } @@ -4810,6 +4749,7 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, bool can_create, const string& nspace) { + Mutex::Locker l(snapset_contexts_lock); SnapSetContext *ssc; map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid); if (p != snapset_contexts.end()) { @@ -4828,7 +4768,7 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, return NULL; } ssc = new SnapSetContext(oid); - register_snapset_context(ssc); + _register_snapset_context(ssc); if (r >= 0) { bufferlist::iterator bvp = bv.begin(); ssc->snapset.decode(bvp); @@ -4843,9 +4783,9 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) { + Mutex::Locker l(snapset_contexts_lock); dout(10) << "put_snapset_context " << ssc->oid << " " << ssc->ref << " -> " << (ssc->ref-1) << dendl; - --ssc->ref; if (ssc->ref == 0) { if (ssc->registered) @@ -5098,7 +5038,7 @@ void ReplicatedPG::sub_op_modify_reply(OpRequestRef op) // =========================================================== -void ReplicatedPG::calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head, +void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, pg_missing_t& missing, const hobject_t &last_backfill, interval_set<uint64_t>& data_subset, @@ -5383,7 +5323,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) * clones/heads and dup data ranges where possible. */ void ReplicatedPG::prep_push_to_replica( - ObjectContext *obc, const hobject_t& soid, int peer, + ObjectContextRef obc, const hobject_t& soid, int peer, int prio, PushOp *pop) { @@ -5437,7 +5377,7 @@ void ReplicatedPG::prep_push_to_replica( } void ReplicatedPG::prep_push(int prio, - ObjectContext *obc, + ObjectContextRef obc, const hobject_t& soid, int peer, PushOp *pop) { @@ -5453,7 +5393,7 @@ void ReplicatedPG::prep_push(int prio, void ReplicatedPG::prep_push( int prio, - ObjectContext *obc, + ObjectContextRef obc, const hobject_t& soid, int peer, eversion_t version, interval_set<uint64_t> &data_subset, @@ -5744,7 +5684,7 @@ bool ReplicatedPG::handle_pull_response( hoid.get_namespace()); assert(ssc); } - ObjectContext *obc = create_object_context(pi.recovery_info.oi, ssc); + ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc); obc->obs.exists = true; obc->ondisk_write_lock(); @@ -6142,18 +6082,14 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) void ReplicatedPG::finish_degraded_object(const hobject_t& oid) { dout(10) << "finish_degraded_object " << oid << dendl; - map<hobject_t, ObjectContext *>::iterator i = object_contexts.find(oid); - if (i != object_contexts.end()) { - i->second->get(); - for (set<ObjectContext*>::iterator j = i->second->blocking.begin(); - j != i->second->blocking.end(); - i->second->blocking.erase(j++)) { + ObjectContextRef obc(object_contexts.lookup(oid)); + if (obc) { + for (set<ObjectContextRef>::iterator j = obc->blocking.begin(); + j != obc->blocking.end(); + obc->blocking.erase(j++)) { dout(10) << " no longer blocking writes for " << (*j)->obs.oi.soid << dendl; - (*j)->blocked_by = NULL; - put_object_context(*j); - put_object_context(i->second); + (*j)->blocked_by = ObjectContextRef(); } - put_object_context(i->second); } if (callbacks_for_degraded_object.count(oid)) { list<Context*> contexts; @@ -6258,11 +6194,10 @@ void ReplicatedPG::_committed_pushed_object( unlock(); } -void ReplicatedPG::_applied_recovered_object(ObjectContext *obc) +void ReplicatedPG::_applied_recovered_object(ObjectContextRef obc) { lock(); dout(10) << "_applied_recovered_object " << *obc << dendl; - put_object_context(obc); assert(active_pushes >= 1); --active_pushes; @@ -6468,7 +6403,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid) /* Mark an object as lost */ -ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, +ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, const hobject_t &oid, eversion_t version, utime_t mtime, int what) { @@ -6485,7 +6420,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, pg_log_entry_t e(what, oid, info.last_update, version, osd_reqid_t(), mtime); pg_log.add(e); - ObjectContext *obc = get_object_context(oid, true); + ObjectContextRef obc = get_object_context(oid, true); obc->ondisk_write_lock(); @@ -6502,7 +6437,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, struct C_PG_MarkUnfoundLost : public Context { ReplicatedPGRef pg; - list<ObjectContext*> obcs; + list<ObjectContextRef> obcs; C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {} void finish(int r) { pg->_finish_mark_all_unfound_lost(obcs); @@ -6535,7 +6470,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what) continue; } - ObjectContext *obc = NULL; + ObjectContextRef obc; eversion_t prev; switch (what) { @@ -6611,7 +6546,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what) osd->queue_for_recovery(this); } -void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs) +void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs) { lock(); dout(10) << "_finish_mark_all_unfound_lost " << dendl; @@ -6620,11 +6555,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs) requeue_ops(waiting_for_all_missing); waiting_for_all_missing.clear(); - while (!obcs.empty()) { - ObjectContext *obc = obcs.front(); - put_object_context(obc); - obcs.pop_front(); - } + obcs.clear(); unlock(); } @@ -7085,7 +7016,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) case pg_log_entry_t::LOST_REVERT: { if (item.have == latest->reverting_to) { - ObjectContext *obc = get_object_context(soid, true); + ObjectContextRef obc = get_object_context(soid, true); if (obc->obs.oi.version == latest->version) { // I'm already reverting @@ -7185,7 +7116,7 @@ int ReplicatedPG::prep_object_replica_pushes( dout(10) << __func__ << ": on " << soid << dendl; // NOTE: we know we will get a valid oloc off of disk here. - ObjectContext *obc = get_object_context(soid, false); + ObjectContextRef obc = get_object_context(soid, false); if (!obc) { pg_log.missing_add(soid, v, eversion_t()); bool uhoh = true; @@ -7229,7 +7160,6 @@ int ReplicatedPG::prep_object_replica_pushes( dout(10) << " ondisk_read_unlock on " << soid << dendl; obc->ondisk_read_unlock(); - put_object_context(obc); return 1; } @@ -7432,11 +7362,10 @@ int ReplicatedPG::recover_backfill( for (set<hobject_t>::iterator i = add_to_stat.begin(); i != add_to_stat.end(); ++i) { - ObjectContext *obc = get_object_context(*i, false); + ObjectContextRef obc = get_object_context(*i, false); pg_stat_t stat; add_object_context_to_pg_stat(obc, &stat); pending_backfill_updates[*i] = stat; - put_object_context(obc); } for (map<hobject_t, eversion_t>::iterator i = to_remove.begin(); i != to_remove.end(); @@ -7507,13 +7436,12 @@ void ReplicatedPG::prep_backfill_object_push( if (!pushing.count(oid)) start_recovery_op(oid); - ObjectContext *obc = get_object_context(oid, false); + ObjectContextRef obc = get_object_context(oid, false); obc->ondisk_read_lock(); (*pushes)[peer].push_back(PushOp()); prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority, &((*pushes)[peer].back())); obc->ondisk_read_unlock(); - put_object_context(obc); } void ReplicatedPG::scan_range( @@ -7535,9 +7463,9 @@ void ReplicatedPG::scan_range( for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) { handle.reset_tp_timeout(); - ObjectContext *obc = NULL; + ObjectContextRef obc; if (is_primary()) - obc = _lookup_object_context(*p); + obc = object_contexts.lookup(*p); if (obc) { bi->objects[*p] = obc->obs.oi.version; dout(20) << " " << *p << " " << obc->obs.oi.version << dendl; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index af14871fe27..0fbe5afd9ca 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -3,6 +3,9 @@ * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com> + * + * Author: Loic Dachary <loic@dachary.org> * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -27,6 +30,9 @@ #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" #include "messages/MOSDSubOp.h" + +#include "common/sharedptr_registry.hpp" + class MOSDSubOpReply; class ReplicatedPG; @@ -124,10 +130,10 @@ public: vector<pg_log_entry_t> log; interval_set<uint64_t> modified_ranges; - ObjectContext *obc; // For ref counting purposes - map<hobject_t,ObjectContext*> src_obc; - ObjectContext *clone_obc; // if we created a clone - ObjectContext *snapset_obc; // if we created/deleted a snapdir + ObjectContextRef obc; + map<hobject_t,ObjectContextRef> src_obc; + ObjectContextRef clone_obc; // if we created a clone + ObjectContextRef snapset_obc; // if we created/deleted a snapdir int data_off; // FIXME: we may want to kill this msgr hint off at some point! @@ -150,7 +156,7 @@ public: modify(false), user_modify(false), bytes_written(0), bytes_read(0), current_osd_subop_num(0), - obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg), + data_off(0), reply(NULL), pg(_pg), num_read(0), num_write(0) { if (_ssc) { @@ -176,8 +182,8 @@ public: eversion_t v; OpContext *ctx; - ObjectContext *obc; - map<hobject_t,ObjectContext*> src_obc; + ObjectContextRef obc; + map<hobject_t,ObjectContextRef> src_obc; tid_t rep_tid; @@ -197,7 +203,7 @@ public: list<ObjectStore::Transaction*> tls; bool queue_snap_trimmer; - RepGather(OpContext *c, ObjectContext *pi, tid_t rt, + RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, eversion_t lc) : queue_item(this), nref(1), @@ -240,7 +246,7 @@ protected: void eval_repop(RepGather*); void issue_repop(RepGather *repop, utime_t now, eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version); - RepGather *new_repop(OpContext *ctx, ObjectContext *obc, tid_t rep_tid); + RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid); void remove_repop(RepGather *repop); void repop_ack(RepGather *repop, int result, int ack_type, @@ -276,50 +282,42 @@ protected: friend struct C_OnPushCommit; // projected object info - map<hobject_t, ObjectContext*> object_contexts; + SharedPtrRegistry<hobject_t, ObjectContext> object_contexts; map<object_t, SnapSetContext*> snapset_contexts; + Mutex snapset_contexts_lock; // debug order that client ops are applied map<hobject_t, map<client_t, tid_t> > debug_op_order; - void populate_obc_watchers(ObjectContext *obc); - void check_blacklisted_obc_watchers(ObjectContext *); + void populate_obc_watchers(ObjectContextRef obc); + void check_blacklisted_obc_watchers(ObjectContextRef obc); void check_blacklisted_watchers(); void get_watchers(list<obj_watch_item_t> &pg_watchers); - void get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers); + void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers); public: void handle_watch_timeout(WatchRef watch); protected: - ObjectContext *lookup_object_context(const hobject_t& soid) { - if (object_contexts.count(soid)) { - ObjectContext *obc = object_contexts[soid]; - obc->ref++; - return obc; - } - return NULL; - } - ObjectContext *_lookup_object_context(const hobject_t& oid); - ObjectContext *create_object_context(const object_info_t& oi, SnapSetContext *ssc); - ObjectContext *get_object_context(const hobject_t& soid, bool can_create); - void register_object_context(ObjectContext *obc) { - if (!obc->registered) { - assert(object_contexts.count(obc->obs.oi.soid) == 0); - obc->registered = true; - object_contexts[obc->obs.oi.soid] = obc; - } - if (obc->ssc) - register_snapset_context(obc->ssc); - } + ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc); + ObjectContextRef get_object_context(const hobject_t& soid, bool can_create); void context_registry_on_change(); - void put_object_context(ObjectContext *obc); - void put_object_contexts(map<hobject_t,ObjectContext*>& obcv); + void object_context_destructor_callback(ObjectContext *obc); + struct C_PG_ObjectContext : public Context { + ReplicatedPGRef pg; + ObjectContext *obc; + C_PG_ObjectContext(ReplicatedPG *p, ObjectContext *o) : + pg(p), obc(o) {} + void finish(int r) { + pg->object_context_destructor_callback(obc); + } + }; + int find_object_context(const hobject_t& oid, - ObjectContext **pobc, + ObjectContextRef *pobc, bool can_create, snapid_t *psnapid=NULL); - void add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *stat); + void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat); void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc); @@ -327,6 +325,11 @@ protected: SnapSetContext *get_snapset_context(const object_t& oid, const string &key, ps_t seed, bool can_create, const string &nspace); void register_snapset_context(SnapSetContext *ssc) { + Mutex::Locker l(snapset_contexts_lock); + _register_snapset_context(ssc); + } + void _register_snapset_context(SnapSetContext *ssc) { + assert(snapset_contexts_lock.is_locked()); if (!ssc->registered) { assert(snapset_contexts.count(ssc->oid) == 0); ssc->registered = true; @@ -525,7 +528,7 @@ protected: int prep_object_replica_pushes(const hobject_t& soid, eversion_t v, int priority, map<int, vector<PushOp> > *pushes); - void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head, + void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, pg_missing_t& missing, const hobject_t &last_backfill, interval_set<uint64_t>& data_subset, @@ -535,17 +538,17 @@ protected: interval_set<uint64_t>& data_subset, map<hobject_t, interval_set<uint64_t> >& clone_subsets); void prep_push_to_replica( - ObjectContext *obc, + ObjectContextRef obc, const hobject_t& oid, int dest, int priority, PushOp *push_op); void prep_push(int priority, - ObjectContext *obc, + ObjectContextRef obc, const hobject_t& oid, int dest, PushOp *op); void prep_push(int priority, - ObjectContext *obc, + ObjectContextRef obc, const hobject_t& soid, int peer, eversion_t version, interval_set<uint64_t> &data_subset, @@ -648,8 +651,8 @@ protected: } }; struct C_OSD_OndiskWriteUnlock : public Context { - ObjectContext *obc, *obc2; - C_OSD_OndiskWriteUnlock(ObjectContext *o, ObjectContext *o2=0) : obc(o), obc2(o2) {} + ObjectContextRef obc, obc2; + C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {} void finish(int r) { obc->ondisk_write_unlock(); if (obc2) @@ -657,17 +660,17 @@ protected: } }; struct C_OSD_OndiskWriteUnlockList : public Context { - list<ObjectContext*> *pls; - C_OSD_OndiskWriteUnlockList(list<ObjectContext*> *l) : pls(l) {} + list<ObjectContextRef> *pls; + C_OSD_OndiskWriteUnlockList(list<ObjectContextRef> *l) : pls(l) {} void finish(int r) { - for (list<ObjectContext*>::iterator p = pls->begin(); p != pls->end(); ++p) + for (list<ObjectContextRef>::iterator p = pls->begin(); p != pls->end(); ++p) (*p)->ondisk_write_unlock(); } }; struct C_OSD_AppliedRecoveredObject : public Context { ReplicatedPGRef pg; - ObjectContext *obc; - C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) : + ObjectContextRef obc; + C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContextRef o) : pg(p), obc(o) {} void finish(int r) { pg->_applied_recovered_object(obc); @@ -730,7 +733,7 @@ protected: void sub_op_modify_commit(RepModify *rm); void sub_op_modify_reply(OpRequestRef op); - void _applied_recovered_object(ObjectContext *obc); + void _applied_recovered_object(ObjectContextRef obc); void _applied_recovered_object_replica(); void _committed_pushed_object(epoch_t epoch, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); @@ -879,10 +882,10 @@ public: void mark_all_unfound_lost(int what); eversion_t pick_newest_available(const hobject_t& oid); - ObjectContext *mark_object_lost(ObjectStore::Transaction *t, + ObjectContextRef mark_object_lost(ObjectStore::Transaction *t, const hobject_t& oid, eversion_t version, utime_t mtime, int what); - void _finish_mark_all_unfound_lost(list<ObjectContext*>& obcs); + void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs); void on_role_change(); void on_change(ObjectStore::Transaction *t); diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index 8a084ca9aa1..ffa3adced24 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -250,15 +250,14 @@ public: string Watch::gen_dbg_prefix() { stringstream ss; ss << pg->gen_prefix() << " -- Watch(" - << make_pair(cookie, entity) - << ", obc->ref=" << (obc ? obc->ref : -1) << ") "; + << make_pair(cookie, entity) << ") "; return ss.str(); } Watch::Watch( ReplicatedPG *pg, OSDService *osd, - ObjectContext *obc, + ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, @@ -272,7 +271,6 @@ Watch::Watch( addr(addr), entity(entity), discarded(false) { - obc->get(); dout(10) << "Watch()" << dendl; } @@ -292,13 +290,6 @@ Context *Watch::get_delayed_cb() return cb; } -ObjectContext *Watch::get_obc() -{ - assert(obc); - obc->get(); - return obc; -} - void Watch::register_cb() { Mutex::Locker l(osd->watch_lock); @@ -370,8 +361,7 @@ void Watch::discard_state() sessionref->put(); conn = ConnectionRef(); } - pg->put_object_context(obc); - obc = NULL; + obc = ObjectContextRef(); } bool Watch::is_discarded() @@ -428,7 +418,7 @@ void Watch::notify_ack(uint64_t notify_id) WatchRef Watch::makeWatchRef( ReplicatedPG *pg, OSDService *osd, - ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr) + ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr) { WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr)); ret->set_self(ret); diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 1c9fa28cb65..ecb61ad8b72 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -151,7 +151,7 @@ class Watch { OSDService *osd; boost::intrusive_ptr<ReplicatedPG> pg; - ObjectContext *obc; + std::tr1::shared_ptr<ObjectContext> obc; std::map<uint64_t, NotifyRef> in_progress_notifies; @@ -165,7 +165,7 @@ class Watch { Watch( ReplicatedPG *pg, OSDService *osd, - ObjectContext *obc, uint32_t timeout, + std::tr1::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr); @@ -187,7 +187,7 @@ public: string gen_dbg_prefix(); static WatchRef makeWatchRef( ReplicatedPG *pg, OSDService *osd, - ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr); + std::tr1::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr); void set_self(WatchRef _self) { self = _self; } @@ -195,8 +195,8 @@ public: /// Does not grant a ref count! boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; } - /// Grants a ref count! - ObjectContext *get_obc(); + std::tr1::shared_ptr<ObjectContext> get_obc() { return obc; } + uint64_t get_cookie() const { return cookie; } entity_name_t get_entity() const { return entity; } entity_addr_t get_peer_addr() const { return addr; } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 3cafdc2b035..6cdacc9902c 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2001,6 +2001,8 @@ struct ObjectState { object_info_t oi; bool exists; + ObjectState() : exists(false) {} + ObjectState(const object_info_t &oi_, bool exists_) : oi(oi_), exists(exists_) {} }; @@ -2022,13 +2024,18 @@ struct SnapSetContext { * etc., because we don't send writes down to disk until after * replicas ack. */ + +struct ObjectContext; + +typedef std::tr1::shared_ptr<ObjectContext> ObjectContextRef; + struct ObjectContext { - int ref; - bool registered; ObjectState obs; SnapSetContext *ssc; // may be null + Context *destructor_callback; + private: Mutex lock; public: @@ -2036,20 +2043,22 @@ public: int unstable_writes, readers, writers_waiting, readers_waiting; // set if writes for this object are blocked on another objects recovery - ObjectContext *blocked_by; // object blocking our writes - set<ObjectContext*> blocking; // objects whose writes we block + ObjectContextRef blocked_by; // object blocking our writes + set<ObjectContextRef> blocking; // objects whose writes we block // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. map<pair<uint64_t, entity_name_t>, WatchRef> watchers; - ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_) - : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_), + ObjectContext() + : ssc(NULL), + destructor_callback(0), lock("ReplicatedPG::ObjectContext::lock"), - unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), - blocked_by(0) {} - - void get() { ++ref; } + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {} + ~ObjectContext() { + if (destructor_callback) + destructor_callback->complete(0); + } // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. void ondisk_write_lock() { lock.Lock(); diff --git a/src/test/test_osd_types.cc b/src/test/test_osd_types.cc index 730a8ffdc5d..e07c9e06592 100644 --- a/src/test/test_osd_types.cc +++ b/src/test/test_osd_types.cc @@ -1008,8 +1008,7 @@ protected: TEST_F(ObjectContextTest, read_write_lock) { { - object_info_t oi; - ObjectContext obc(oi, false, NULL); + ObjectContext obc; // // write_lock @@ -1044,8 +1043,7 @@ TEST_F(ObjectContextTest, read_write_lock) useconds_t delay = 0; { - object_info_t oi; - ObjectContext obc(oi, false, NULL); + ObjectContext obc; // // write_lock @@ -1102,8 +1100,7 @@ TEST_F(ObjectContextTest, read_write_lock) } { - object_info_t oi; - ObjectContext obc(oi, false, NULL); + ObjectContext obc; // // read_lock |