summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorathanatos <rexludorum@gmail.com>2013-08-22 10:24:52 -0700
committerathanatos <rexludorum@gmail.com>2013-08-22 10:24:52 -0700
commit67f160eb5ab81d5ad355633fe4fa1968961cdf27 (patch)
treeb0bb5af0b4850e9840447ca99860d09b77eb0cfb
parenta74247722ac1e573bee30c25ad8e7ace8bcd169b (diff)
parentd980f581e3530d28f3f690b3f4c099995d8d8ec5 (diff)
downloadceph-67f160eb5ab81d5ad355633fe4fa1968961cdf27.tar.gz
Merge pull request #414 from dachary/wip-5510
replace ObjectContext pointers with shared_ptr Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/common/sharedptr_registry.hpp20
-rw-r--r--src/osd/OSD.cc1
-rw-r--r--src/osd/PG.cc3
-rw-r--r--src/osd/PG.h9
-rw-r--r--src/osd/ReplicatedPG.cc312
-rw-r--r--src/osd/ReplicatedPG.h105
-rw-r--r--src/osd/Watch.cc18
-rw-r--r--src/osd/Watch.h10
-rw-r--r--src/osd/osd_types.h29
-rw-r--r--src/test/test_osd_types.cc9
10 files changed, 225 insertions, 291 deletions
diff --git a/src/common/sharedptr_registry.hpp b/src/common/sharedptr_registry.hpp
index a62aa0d9ce3..6579bd4ba71 100644
--- a/src/common/sharedptr_registry.hpp
+++ b/src/common/sharedptr_registry.hpp
@@ -58,6 +58,26 @@ public:
lock("SharedPtrRegistry::lock")
{}
+ bool empty() {
+ Mutex::Locker l(lock);
+ return contents.empty();
+ }
+
+ bool get_next(const K &key, pair<K, VPtr> *next) {
+ VPtr next_val;
+ Mutex::Locker l(lock);
+ typename map<K, WeakVPtr>::iterator i = contents.upper_bound(key);
+ while (i != contents.end() &&
+ !(next_val = i->second.lock()))
+ ++i;
+ if (i == contents.end())
+ return false;
+ if (next)
+ *next = make_pair(i->first, next_val);
+ return true;
+ }
+
+
bool get_next(const K &key, pair<K, V> *next) {
VPtr next_val;
Mutex::Locker l(lock);
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 55f11707189..9a58289eda4 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1522,7 +1522,6 @@ int OSD::shutdown()
dout(20) << " kicking pg " << p->first << dendl;
p->second->lock();
p->second->on_shutdown();
- p->second->kick();
p->second->unlock();
p->second->osr->flush();
}
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 49ea61a603a..cd5621cddf2 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -4527,9 +4527,6 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
{
const OSDMapRef osdmap = get_osdmap();
- // -- there was a change! --
- kick();
-
set_last_peering_reset();
vector<int> oldacting, oldup;
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 14ac7c9fac5..720ce67bca3 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -232,7 +232,6 @@ protected:
* put_unlock() when done with the current pointer (_most common_).
*/
Mutex _lock;
- Cond _cond;
atomic_t ref;
#ifdef PG_DEBUG_REFS
@@ -261,14 +260,6 @@ public:
bool is_locked() const {
return _lock.is_locked();
}
- void wait() {
- assert(_lock.is_locked());
- _cond.Wait(_lock);
- }
- void kick() {
- assert(_lock.is_locked());
- _cond.Signal();
- }
#ifdef PG_DEBUG_REFS
uint64_t get_with_id();
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 6c7ec56af88..fcdbb182d52 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -4,6 +4,9 @@
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -613,7 +616,9 @@ void ReplicatedPG::calc_trim_to()
ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
- PG(o, curmap, _pool, p, oid, ioid), temp_created(false),
+ PG(o, curmap, _pool, p, oid, ioid),
+ snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
+ temp_created(false),
temp_coll(coll_t::make_temp_coll(p)), snap_trimmer_machine(this)
{
snap_trimmer_machine.initiate();
@@ -693,7 +698,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
- ObjectContext *obc;
+ ObjectContextRef obc;
bool can_create = op->may_write();
snapid_t snapid;
int r = find_object_context(
@@ -747,7 +752,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (!op->may_write() && !obc->obs.exists) {
osd->reply_op_error(op, -ENOENT);
- put_object_context(obc);
return;
}
@@ -756,7 +760,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by "
<< obc->blocked_by->obs.oi.soid << dendl;
wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op);
- put_object_context(obc);
return;
}
@@ -773,7 +776,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
// src_oids
- map<hobject_t,ObjectContext*> src_obc;
+ map<hobject_t,ObjectContextRef> src_obc;
for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); ++p) {
OSDOp& osd_op = *p;
@@ -781,7 +784,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (osd_op.op.op == CEPH_OSD_OP_LIST_SNAPS &&
m->get_snapid() != CEPH_SNAPDIR) {
dout(10) << "LIST_SNAPS with incorrect context" << dendl;
- put_object_context(obc);
osd->reply_op_error(op, -EINVAL);
return;
}
@@ -794,7 +796,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps(),
info.pgid.pool(), m->get_object_locator().nspace);
if (!src_obc.count(src_oid)) {
- ObjectContext *sobc;
+ ObjectContextRef sobc;
snapid_t ssnapid;
int r = find_object_context(src_oid, &sobc, false, &ssnapid);
@@ -816,10 +818,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
(before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) {
wait_for_degraded_object(sobc->obs.oi.soid, op);
dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by "
- << sobc->obs.oi.soid << dendl;
- obc->get();
+ << sobc->obs.oi.soid << dendl;
obc->blocked_by = sobc;
- sobc->get();
sobc->blocking.insert(obc);
} else {
dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
@@ -836,8 +836,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
dout(10) << "no src oid specified for multi op " << osd_op << dendl;
osd->reply_op_error(op, -EINVAL);
}
- put_object_contexts(src_obc);
- put_object_context(obc);
+ src_obc.clear();
return;
}
@@ -852,7 +851,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
hobject_t clone_oid = obc->obs.oi.soid;
clone_oid.snap = *p;
if (!src_obc.count(clone_oid)) {
- ObjectContext *sobc;
+ ObjectContextRef sobc;
snapid_t ssnapid;
int r = find_object_context(clone_oid, &sobc, false, &ssnapid);
@@ -868,8 +867,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
src_obc[clone_oid] = sobc;
continue;
}
- put_object_contexts(src_obc);
- put_object_context(obc);
+ src_obc.clear();
return;
} else {
continue;
@@ -902,8 +900,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
<< " < snapset seq " << obc->ssc->snapset.seq
<< " on " << soid << dendl;
delete ctx;
- put_object_context(obc);
- put_object_contexts(src_obc);
+ src_obc.clear();
osd->reply_op_error(op, -EOLDSNAPC);
return;
}
@@ -912,8 +909,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (oldv != eversion_t()) {
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
delete ctx;
- put_object_context(obc);
- put_object_contexts(src_obc);
+ src_obc.clear();
if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv);
} else {
@@ -970,7 +966,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
}
- for (map<hobject_t,ObjectContext*>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
+ for (map<hobject_t,ObjectContextRef>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
dout(10) << " taking ondisk_read_lock for src " << p->first << dendl;
p->second->ondisk_read_lock();
}
@@ -981,7 +977,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
dout(10) << " dropping ondisk_read_lock" << dendl;
obc->ondisk_read_unlock();
}
- for (map<hobject_t,ObjectContext*>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
+ for (map<hobject_t,ObjectContextRef>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
dout(10) << " dropping ondisk_read_lock for src " << p->first << dendl;
p->second->ondisk_read_unlock();
}
@@ -989,8 +985,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (result == -EAGAIN) {
// clean up after the ctx
delete ctx;
- put_object_context(obc);
- put_object_contexts(src_obc);
+ src_obc.clear();
return;
}
@@ -998,8 +993,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (ctx->delta_stats.num_bytes > 0 &&
pool.info.get_flags() & pg_pool_t::FLAG_FULL) {
delete ctx;
- put_object_context(obc);
- put_object_contexts(src_obc);
+ src_obc.clear();
osd->reply_op_error(op, -ENOSPC);
return;
}
@@ -1044,8 +1038,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
delete ctx;
- put_object_context(obc);
- put_object_contexts(src_obc);
+ src_obc.clear();
return;
}
@@ -1056,9 +1049,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
append_log(ctx->log, pg_trim_to, ctx->local_t);
- // continuing on to write path, make sure object context is registered
- assert(obc->registered);
-
// verify that we are doing this in order?
if (g_conf->osd_debug_op_order && m->get_source().is_client()) {
map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid];
@@ -1466,14 +1456,13 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
{
// load clone info
bufferlist bl;
- ObjectContext *obc = 0;
+ ObjectContextRef obc;
int r = find_object_context(coid, &obc, false, NULL);
if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) {
derr << __func__ << "could not find coid " << coid << dendl;
assert(0);
}
assert(r == 0);
- assert(obc->registered);
object_info_t &coi = obc->obs.oi;
set<snapid_t> old_snaps(coi.snaps.begin(), coi.snaps.end());
@@ -1615,7 +1604,6 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash,
info.pgid.pool(), coid.get_namespace());
ctx->snapset_obc = get_object_context(snapoid, false);
- assert(ctx->snapset_obc->registered);
if (snapset.clones.empty() && !snapset.head_exists) {
dout(10) << coid << " removing " << snapoid << dendl;
@@ -2066,7 +2054,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->user_modify = true;
}
- ObjectContext *src_obc = 0;
+ ObjectContextRef src_obc;
if (ceph_osd_op_type_multi(op.op)) {
MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request);
object_locator_t src_oloc;
@@ -2484,7 +2472,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
hobject_t clone_oid = soid;
clone_oid.snap = *clone_iter;
- ObjectContext *clone_obc = ctx->src_obc[clone_oid];
+ ObjectContextRef clone_obc = ctx->src_obc[clone_oid];
assert(clone_obc);
for (vector<snapid_t>::reverse_iterator p = clone_obc->obs.oi.snaps.rbegin();
p != clone_obc->obs.oi.snaps.rend();
@@ -2825,9 +2813,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
uint64_t cookie = op.watch.cookie;
bool do_watch = op.watch.flag & 1;
entity_name_t entity = ctx->reqid.name;
- ObjectContext *obc = ctx->obc;
+ ObjectContextRef obc = ctx->obc;
- dout(10) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie
+ dout(10) << "watch: ctx->obc=" << (void *)obc.get() << " cookie=" << cookie
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
dout(10) << "watch: oi.user_version=" << oi.user_version.version << dendl;
dout(10) << "watch: peer_addr="
@@ -2845,7 +2833,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
t.nop(); // make sure update the object_info on disk!
}
ctx->watch_connects.push_back(w);
- assert(obc->registered);
} else {
map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
oi.watchers.find(make_pair(cookie, entity));
@@ -3428,7 +3415,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl;
- ObjectContext *rollback_to;
+ ObjectContextRef rollback_to;
int ret = find_object_context(
hobject_t(soid.oid, soid.get_key(), snapid, soid.hash, info.pgid.pool(), soid.get_namespace()),
&rollback_to, false, &cloneid);
@@ -3511,7 +3498,6 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
obs.oi.size = rollback_to->obs.oi.size;
snapset.head_exists = true;
}
- put_object_context(rollback_to);
}
return ret;
}
@@ -3567,9 +3553,10 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
object_info_t static_snap_oi(coid);
object_info_t *snap_oi;
if (is_primary()) {
- ctx->clone_obc = new ObjectContext(static_snap_oi, true, NULL);
- ctx->clone_obc->get();
- register_object_context(ctx->clone_obc);
+ ctx->clone_obc = object_contexts.lookup_or_create(static_snap_oi.soid);
+ ctx->clone_obc->destructor_callback = new C_PG_ObjectContext(this, ctx->clone_obc.get());
+ ctx->clone_obc->obs.oi = static_snap_oi;
+ ctx->clone_obc->obs.exists = true;
snap_oi = &ctx->clone_obc->obs.oi;
} else {
snap_oi = &static_snap_oi;
@@ -3819,7 +3806,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
ctx->at_version.version++;
ctx->snapset_obc->obs.exists = false;
- assert(ctx->snapset_obc->registered);
}
}
} else if (ctx->new_snapset.clones.size()) {
@@ -3836,7 +3822,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
ctx->snapset_obc->obs.oi.version = ctx->at_version;
ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
ctx->snapset_obc->obs.oi.mtime = ctx->mtime;
- assert(ctx->snapset_obc->registered);
bufferlist bv(sizeof(ctx->new_obs.oi));
::encode(ctx->snapset_obc->obs.oi, bv);
@@ -3978,17 +3963,14 @@ void ReplicatedPG::op_applied(RepGather *repop)
int whoami = osd->get_nodeid();
if (repop->ctx->clone_obc) {
- put_object_context(repop->ctx->clone_obc);
- repop->ctx->clone_obc = 0;
+ repop->ctx->clone_obc = ObjectContextRef();
}
if (repop->ctx->snapset_obc) {
- put_object_context(repop->ctx->snapset_obc);
- repop->ctx->snapset_obc = 0;
+ repop->ctx->snapset_obc = ObjectContextRef();
}
- put_object_context(repop->obc);
- put_object_contexts(repop->src_obc);
- repop->obc = 0;
+ repop->src_obc.clear();
+ repop->obc = ObjectContextRef();
if (!repop->aborted) {
assert(repop->waitfor_ack.count(whoami) ||
@@ -4264,7 +4246,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
}
}
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *obc,
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
tid_t rep_tid)
{
if (ctx->op)
@@ -4349,16 +4331,14 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
{
- for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
- i != object_contexts.end();
- ++i) {
- i->second->get();
- get_obc_watchers(i->second, pg_watchers);
- put_object_context(i->second);
+ pair<hobject_t, ObjectContextRef> i;
+ while (object_contexts.get_next(i.first, &i)) {
+ ObjectContextRef obc(i.second);
+ get_obc_watchers(obc, pg_watchers);
}
}
-void ReplicatedPG::get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers)
+void ReplicatedPG::get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers)
{
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
obc->watchers.begin();
@@ -4382,16 +4362,12 @@ void ReplicatedPG::get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &
void ReplicatedPG::check_blacklisted_watchers()
{
dout(20) << "ReplicatedPG::check_blacklisted_watchers for pg " << get_pgid() << dendl;
- for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
- i != object_contexts.end();
- ++i) {
- i->second->get();
- check_blacklisted_obc_watchers(i->second);
- put_object_context(i->second);
- }
+ pair<hobject_t, ObjectContextRef> i;
+ while (object_contexts.get_next(i.first, &i))
+ check_blacklisted_obc_watchers(i.second);
}
-void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContext *obc)
+void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContextRef obc)
{
dout(20) << "ReplicatedPG::check_blacklisted_obc_watchers for obc " << obc->obs.oi.soid << dendl;
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator k =
@@ -4411,7 +4387,7 @@ void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContext *obc)
}
}
-void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
+void ReplicatedPG::populate_obc_watchers(ObjectContextRef obc)
{
assert(is_active());
assert(!is_missing_object(obc->obs.oi.soid) ||
@@ -4447,7 +4423,7 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
void ReplicatedPG::handle_watch_timeout(WatchRef watch)
{
- ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref
+ ObjectContextRef obc = watch->get_obc(); // handle_watch_timeout owns this ref
dout(10) << "handle_watch_timeout obc " << obc << dendl;
if (is_degraded_object(obc->obs.oi.soid)) {
@@ -4457,7 +4433,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
dout(10) << "handle_watch_timeout waiting for degraded on obj "
<< obc->obs.oi.soid
<< dendl;
- put_object_context(obc); // callback got its own ref
return;
}
@@ -4468,7 +4443,6 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
scrubber.add_callback(
watch->get_delayed_cb() // This callback!
);
- put_object_context(obc);
return;
}
@@ -4516,41 +4490,35 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
eval_repop(repop);
}
-ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
+ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi,
+ SnapSetContext *ssc)
{
- map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid);
- if (p != object_contexts.end())
- return p->second;
- return NULL;
-}
-
-ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi,
- SnapSetContext *ssc)
-{
- ObjectContext *obc = new ObjectContext(oi, false, ssc);
- dout(10) << "create_object_context " << obc << " " << oi.soid << " " << obc->ref << dendl;
- register_object_context(obc);
+ ObjectContextRef obc(object_contexts.lookup_or_create(oi.soid));
+ assert(obc->destructor_callback == NULL);
+ obc->destructor_callback = new C_PG_ObjectContext(this, obc.get());
+ obc->obs.oi = oi;
+ obc->obs.exists = false;
+ obc->ssc = ssc;
+ if (ssc)
+ register_snapset_context(ssc);
+ dout(10) << "create_object_context " << (void*)obc.get() << " " << oi.soid << " " << dendl;
populate_obc_watchers(obc);
- obc->ref++;
return obc;
}
-ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
- bool can_create)
+ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
+ bool can_create)
{
- map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(soid);
- ObjectContext *obc;
- if (p != object_contexts.end()) {
- obc = p->second;
- dout(10) << "get_object_context " << obc << " " << soid << " " << obc->ref
- << " -> " << (obc->ref+1) << dendl;
+ ObjectContextRef obc = object_contexts.lookup(soid);
+ if (obc) {
+ dout(10) << "get_object_context " << obc << " " << soid << dendl;
} else {
// check disk
bufferlist bv;
int r = osd->store->getattr(coll, soid, OI_ATTR, bv);
if (r < 0) {
if (!can_create)
- return NULL; // -ENOENT!
+ return ObjectContextRef(); // -ENOENT!
// new object.
object_info_t oi(soid);
@@ -4562,49 +4530,41 @@ ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
assert(oi.soid.pool == (int64_t)info.pgid.pool());
- SnapSetContext *ssc = NULL;
- if (can_create)
- ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
- obc = new ObjectContext(oi, true, ssc);
+ obc = object_contexts.lookup_or_create(oi.soid);
+ obc->destructor_callback = new C_PG_ObjectContext(this, obc.get());
+ obc->obs.oi = oi;
obc->obs.exists = true;
- register_object_context(obc);
-
- if (can_create && !obc->ssc)
+ if (can_create) {
obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
+ register_snapset_context(obc->ssc);
+ }
populate_obc_watchers(obc);
dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
}
- obc->ref++;
return obc;
}
void ReplicatedPG::context_registry_on_change()
{
- list<ObjectContext *> contexts;
- for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
- i != object_contexts.end();
- ++i) {
- i->second->get();
- contexts.push_back(i->second);
- for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
- i->second->watchers.begin();
- j != i->second->watchers.end();
- i->second->watchers.erase(j++)) {
- j->second->discard();
+ pair<hobject_t, ObjectContextRef> i;
+ while (object_contexts.get_next(i.first, &i)) {
+ ObjectContextRef obc(i.second);
+ if (obc) {
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
+ obc->watchers.begin();
+ j != obc->watchers.end();
+ obc->watchers.erase(j++)) {
+ j->second->discard();
+ }
}
}
- for (list<ObjectContext *>::iterator i = contexts.begin();
- i != contexts.end();
- contexts.erase(i++)) {
- put_object_context(*i);
- }
}
int ReplicatedPG::find_object_context(const hobject_t& oid,
- ObjectContext **pobc,
+ ObjectContextRef *pobc,
bool can_create,
snapid_t *psnapid)
{
@@ -4616,11 +4576,10 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
// want the snapdir?
if (oid.snap == CEPH_SNAPDIR) {
// return head or snapdir, whichever exists.
- ObjectContext *obc = get_object_context(head, can_create);
+ ObjectContextRef obc = get_object_context(head, can_create);
if (obc && !obc->obs.exists) {
// ignore it if the obc exists but the object doesn't
- put_object_context(obc);
- obc = NULL;
+ obc = ObjectContextRef();
}
if (!obc) {
obc = get_object_context(snapdir, can_create);
@@ -4638,7 +4597,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
// want the head?
if (oid.snap == CEPH_NOSNAP) {
- ObjectContext *obc = get_object_context(head, can_create);
+ ObjectContextRef obc = get_object_context(head, can_create);
if (!obc)
return -ENOENT;
dout(10) << "find_object_context " << oid << " @" << oid.snap << dendl;
@@ -4661,7 +4620,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
// head?
if (oid.snap > ssc->snapset.seq) {
if (ssc->snapset.head_exists) {
- ObjectContext *obc = get_object_context(head, false);
+ ObjectContextRef obc = get_object_context(head, false);
dout(10) << "find_object_context " << head
<< " want " << oid.snap << " > snapset seq " << ssc->snapset.seq
<< " -- HIT " << obc->obs
@@ -4706,7 +4665,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
return -EAGAIN;
}
- ObjectContext *obc = get_object_context(soid, false);
+ ObjectContextRef obc = get_object_context(soid, false);
assert(obc);
// clone
@@ -4721,41 +4680,20 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
} else {
dout(20) << "find_object_context " << soid << " [" << first << "," << last
<< "] does not contain " << oid.snap << " -- DNE" << dendl;
- put_object_context(obc);
return -ENOENT;
}
}
-void ReplicatedPG::put_object_context(ObjectContext *obc)
+void ReplicatedPG::object_context_destructor_callback(ObjectContext *obc)
{
- dout(10) << "put_object_context " << obc << " " << obc->obs.oi.soid << " "
- << obc->ref << " -> " << (obc->ref-1) << dendl;
-
- --obc->ref;
- if (obc->ref == 0) {
- if (obc->ssc)
- put_snapset_context(obc->ssc);
+ dout(10) << "object_context_destructor_callback " << obc << " "
+ << obc->obs.oi.soid << dendl;
- if (obc->registered)
- object_contexts.erase(obc->obs.oi.soid);
- delete obc;
-
- if (object_contexts.empty())
- kick();
- }
+ if (obc->ssc)
+ put_snapset_context(obc->ssc);
}
-void ReplicatedPG::put_object_contexts(map<hobject_t,ObjectContext*>& obcv)
-{
- if (obcv.empty())
- return;
- dout(10) << "put_object_contexts " << obcv << dendl;
- for (map<hobject_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p)
- put_object_context(p->second);
- obcv.clear();
-}
-
-void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *pgstat)
+void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *pgstat)
{
object_info_t& oi = obc->obs.oi;
@@ -4797,9 +4735,10 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *
SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
{
+ Mutex::Locker l(snapset_contexts_lock);
SnapSetContext *ssc = new SnapSetContext(oid);
dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl;
- register_snapset_context(ssc);
+ _register_snapset_context(ssc);
ssc->ref++;
return ssc;
}
@@ -4810,6 +4749,7 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
bool can_create,
const string& nspace)
{
+ Mutex::Locker l(snapset_contexts_lock);
SnapSetContext *ssc;
map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid);
if (p != snapset_contexts.end()) {
@@ -4828,7 +4768,7 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
return NULL;
}
ssc = new SnapSetContext(oid);
- register_snapset_context(ssc);
+ _register_snapset_context(ssc);
if (r >= 0) {
bufferlist::iterator bvp = bv.begin();
ssc->snapset.decode(bvp);
@@ -4843,9 +4783,9 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
{
+ Mutex::Locker l(snapset_contexts_lock);
dout(10) << "put_snapset_context " << ssc->oid << " "
<< ssc->ref << " -> " << (ssc->ref-1) << dendl;
-
--ssc->ref;
if (ssc->ref == 0) {
if (ssc->registered)
@@ -5098,7 +5038,7 @@ void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
// ===========================================================
-void ReplicatedPG::calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
+void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
@@ -5383,7 +5323,7 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
* clones/heads and dup data ranges where possible.
*/
void ReplicatedPG::prep_push_to_replica(
- ObjectContext *obc, const hobject_t& soid, int peer,
+ ObjectContextRef obc, const hobject_t& soid, int peer,
int prio,
PushOp *pop)
{
@@ -5437,7 +5377,7 @@ void ReplicatedPG::prep_push_to_replica(
}
void ReplicatedPG::prep_push(int prio,
- ObjectContext *obc,
+ ObjectContextRef obc,
const hobject_t& soid, int peer,
PushOp *pop)
{
@@ -5453,7 +5393,7 @@ void ReplicatedPG::prep_push(int prio,
void ReplicatedPG::prep_push(
int prio,
- ObjectContext *obc,
+ ObjectContextRef obc,
const hobject_t& soid, int peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
@@ -5744,7 +5684,7 @@ bool ReplicatedPG::handle_pull_response(
hoid.get_namespace());
assert(ssc);
}
- ObjectContext *obc = create_object_context(pi.recovery_info.oi, ssc);
+ ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc);
obc->obs.exists = true;
obc->ondisk_write_lock();
@@ -6142,18 +6082,14 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
{
dout(10) << "finish_degraded_object " << oid << dendl;
- map<hobject_t, ObjectContext *>::iterator i = object_contexts.find(oid);
- if (i != object_contexts.end()) {
- i->second->get();
- for (set<ObjectContext*>::iterator j = i->second->blocking.begin();
- j != i->second->blocking.end();
- i->second->blocking.erase(j++)) {
+ ObjectContextRef obc(object_contexts.lookup(oid));
+ if (obc) {
+ for (set<ObjectContextRef>::iterator j = obc->blocking.begin();
+ j != obc->blocking.end();
+ obc->blocking.erase(j++)) {
dout(10) << " no longer blocking writes for " << (*j)->obs.oi.soid << dendl;
- (*j)->blocked_by = NULL;
- put_object_context(*j);
- put_object_context(i->second);
+ (*j)->blocked_by = ObjectContextRef();
}
- put_object_context(i->second);
}
if (callbacks_for_degraded_object.count(oid)) {
list<Context*> contexts;
@@ -6258,11 +6194,10 @@ void ReplicatedPG::_committed_pushed_object(
unlock();
}
-void ReplicatedPG::_applied_recovered_object(ObjectContext *obc)
+void ReplicatedPG::_applied_recovered_object(ObjectContextRef obc)
{
lock();
dout(10) << "_applied_recovered_object " << *obc << dendl;
- put_object_context(obc);
assert(active_pushes >= 1);
--active_pushes;
@@ -6468,7 +6403,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
/* Mark an object as lost
*/
-ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
+ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
const hobject_t &oid, eversion_t version,
utime_t mtime, int what)
{
@@ -6485,7 +6420,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
pg_log_entry_t e(what, oid, info.last_update, version, osd_reqid_t(), mtime);
pg_log.add(e);
- ObjectContext *obc = get_object_context(oid, true);
+ ObjectContextRef obc = get_object_context(oid, true);
obc->ondisk_write_lock();
@@ -6502,7 +6437,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
struct C_PG_MarkUnfoundLost : public Context {
ReplicatedPGRef pg;
- list<ObjectContext*> obcs;
+ list<ObjectContextRef> obcs;
C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {}
void finish(int r) {
pg->_finish_mark_all_unfound_lost(obcs);
@@ -6535,7 +6470,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
continue;
}
- ObjectContext *obc = NULL;
+ ObjectContextRef obc;
eversion_t prev;
switch (what) {
@@ -6611,7 +6546,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
osd->queue_for_recovery(this);
}
-void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
+void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs)
{
lock();
dout(10) << "_finish_mark_all_unfound_lost " << dendl;
@@ -6620,11 +6555,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
- while (!obcs.empty()) {
- ObjectContext *obc = obcs.front();
- put_object_context(obc);
- obcs.pop_front();
- }
+ obcs.clear();
unlock();
}
@@ -7085,7 +7016,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
case pg_log_entry_t::LOST_REVERT:
{
if (item.have == latest->reverting_to) {
- ObjectContext *obc = get_object_context(soid, true);
+ ObjectContextRef obc = get_object_context(soid, true);
if (obc->obs.oi.version == latest->version) {
// I'm already reverting
@@ -7185,7 +7116,7 @@ int ReplicatedPG::prep_object_replica_pushes(
dout(10) << __func__ << ": on " << soid << dendl;
// NOTE: we know we will get a valid oloc off of disk here.
- ObjectContext *obc = get_object_context(soid, false);
+ ObjectContextRef obc = get_object_context(soid, false);
if (!obc) {
pg_log.missing_add(soid, v, eversion_t());
bool uhoh = true;
@@ -7229,7 +7160,6 @@ int ReplicatedPG::prep_object_replica_pushes(
dout(10) << " ondisk_read_unlock on " << soid << dendl;
obc->ondisk_read_unlock();
- put_object_context(obc);
return 1;
}
@@ -7432,11 +7362,10 @@ int ReplicatedPG::recover_backfill(
for (set<hobject_t>::iterator i = add_to_stat.begin();
i != add_to_stat.end();
++i) {
- ObjectContext *obc = get_object_context(*i, false);
+ ObjectContextRef obc = get_object_context(*i, false);
pg_stat_t stat;
add_object_context_to_pg_stat(obc, &stat);
pending_backfill_updates[*i] = stat;
- put_object_context(obc);
}
for (map<hobject_t, eversion_t>::iterator i = to_remove.begin();
i != to_remove.end();
@@ -7507,13 +7436,12 @@ void ReplicatedPG::prep_backfill_object_push(
if (!pushing.count(oid))
start_recovery_op(oid);
- ObjectContext *obc = get_object_context(oid, false);
+ ObjectContextRef obc = get_object_context(oid, false);
obc->ondisk_read_lock();
(*pushes)[peer].push_back(PushOp());
prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority,
&((*pushes)[peer].back()));
obc->ondisk_read_unlock();
- put_object_context(obc);
}
void ReplicatedPG::scan_range(
@@ -7535,9 +7463,9 @@ void ReplicatedPG::scan_range(
for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
handle.reset_tp_timeout();
- ObjectContext *obc = NULL;
+ ObjectContextRef obc;
if (is_primary())
- obc = _lookup_object_context(*p);
+ obc = object_contexts.lookup(*p);
if (obc) {
bi->objects[*p] = obc->obs.oi.version;
dout(20) << " " << *p << " " << obc->obs.oi.version << dendl;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index af14871fe27..0fbe5afd9ca 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -3,6 +3,9 @@
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -27,6 +30,9 @@
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDSubOp.h"
+
+#include "common/sharedptr_registry.hpp"
+
class MOSDSubOpReply;
class ReplicatedPG;
@@ -124,10 +130,10 @@ public:
vector<pg_log_entry_t> log;
interval_set<uint64_t> modified_ranges;
- ObjectContext *obc; // For ref counting purposes
- map<hobject_t,ObjectContext*> src_obc;
- ObjectContext *clone_obc; // if we created a clone
- ObjectContext *snapset_obc; // if we created/deleted a snapdir
+ ObjectContextRef obc;
+ map<hobject_t,ObjectContextRef> src_obc;
+ ObjectContextRef clone_obc; // if we created a clone
+ ObjectContextRef snapset_obc; // if we created/deleted a snapdir
int data_off; // FIXME: we may want to kill this msgr hint off at some point!
@@ -150,7 +156,7 @@ public:
modify(false), user_modify(false),
bytes_written(0), bytes_read(0),
current_osd_subop_num(0),
- obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
+ data_off(0), reply(NULL), pg(_pg),
num_read(0),
num_write(0) {
if (_ssc) {
@@ -176,8 +182,8 @@ public:
eversion_t v;
OpContext *ctx;
- ObjectContext *obc;
- map<hobject_t,ObjectContext*> src_obc;
+ ObjectContextRef obc;
+ map<hobject_t,ObjectContextRef> src_obc;
tid_t rep_tid;
@@ -197,7 +203,7 @@ public:
list<ObjectStore::Transaction*> tls;
bool queue_snap_trimmer;
- RepGather(OpContext *c, ObjectContext *pi, tid_t rt,
+ RepGather(OpContext *c, ObjectContextRef pi, tid_t rt,
eversion_t lc) :
queue_item(this),
nref(1),
@@ -240,7 +246,7 @@ protected:
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, utime_t now,
eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version);
- RepGather *new_repop(OpContext *ctx, ObjectContext *obc, tid_t rep_tid);
+ RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid);
void remove_repop(RepGather *repop);
void repop_ack(RepGather *repop,
int result, int ack_type,
@@ -276,50 +282,42 @@ protected:
friend struct C_OnPushCommit;
// projected object info
- map<hobject_t, ObjectContext*> object_contexts;
+ SharedPtrRegistry<hobject_t, ObjectContext> object_contexts;
map<object_t, SnapSetContext*> snapset_contexts;
+ Mutex snapset_contexts_lock;
// debug order that client ops are applied
map<hobject_t, map<client_t, tid_t> > debug_op_order;
- void populate_obc_watchers(ObjectContext *obc);
- void check_blacklisted_obc_watchers(ObjectContext *);
+ void populate_obc_watchers(ObjectContextRef obc);
+ void check_blacklisted_obc_watchers(ObjectContextRef obc);
void check_blacklisted_watchers();
void get_watchers(list<obj_watch_item_t> &pg_watchers);
- void get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers);
+ void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
public:
void handle_watch_timeout(WatchRef watch);
protected:
- ObjectContext *lookup_object_context(const hobject_t& soid) {
- if (object_contexts.count(soid)) {
- ObjectContext *obc = object_contexts[soid];
- obc->ref++;
- return obc;
- }
- return NULL;
- }
- ObjectContext *_lookup_object_context(const hobject_t& oid);
- ObjectContext *create_object_context(const object_info_t& oi, SnapSetContext *ssc);
- ObjectContext *get_object_context(const hobject_t& soid, bool can_create);
- void register_object_context(ObjectContext *obc) {
- if (!obc->registered) {
- assert(object_contexts.count(obc->obs.oi.soid) == 0);
- obc->registered = true;
- object_contexts[obc->obs.oi.soid] = obc;
- }
- if (obc->ssc)
- register_snapset_context(obc->ssc);
- }
+ ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
+ ObjectContextRef get_object_context(const hobject_t& soid, bool can_create);
void context_registry_on_change();
- void put_object_context(ObjectContext *obc);
- void put_object_contexts(map<hobject_t,ObjectContext*>& obcv);
+ void object_context_destructor_callback(ObjectContext *obc);
+ struct C_PG_ObjectContext : public Context {
+ ReplicatedPGRef pg;
+ ObjectContext *obc;
+ C_PG_ObjectContext(ReplicatedPG *p, ObjectContext *o) :
+ pg(p), obc(o) {}
+ void finish(int r) {
+ pg->object_context_destructor_callback(obc);
+ }
+ };
+
int find_object_context(const hobject_t& oid,
- ObjectContext **pobc,
+ ObjectContextRef *pobc,
bool can_create, snapid_t *psnapid=NULL);
- void add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *stat);
+ void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
@@ -327,6 +325,11 @@ protected:
SnapSetContext *get_snapset_context(const object_t& oid, const string &key,
ps_t seed, bool can_create, const string &nspace);
void register_snapset_context(SnapSetContext *ssc) {
+ Mutex::Locker l(snapset_contexts_lock);
+ _register_snapset_context(ssc);
+ }
+ void _register_snapset_context(SnapSetContext *ssc) {
+ assert(snapset_contexts_lock.is_locked());
if (!ssc->registered) {
assert(snapset_contexts.count(ssc->oid) == 0);
ssc->registered = true;
@@ -525,7 +528,7 @@ protected:
int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
int priority,
map<int, vector<PushOp> > *pushes);
- void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
+ void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
@@ -535,17 +538,17 @@ protected:
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
void prep_push_to_replica(
- ObjectContext *obc,
+ ObjectContextRef obc,
const hobject_t& oid,
int dest,
int priority,
PushOp *push_op);
void prep_push(int priority,
- ObjectContext *obc,
+ ObjectContextRef obc,
const hobject_t& oid, int dest,
PushOp *op);
void prep_push(int priority,
- ObjectContext *obc,
+ ObjectContextRef obc,
const hobject_t& soid, int peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
@@ -648,8 +651,8 @@ protected:
}
};
struct C_OSD_OndiskWriteUnlock : public Context {
- ObjectContext *obc, *obc2;
- C_OSD_OndiskWriteUnlock(ObjectContext *o, ObjectContext *o2=0) : obc(o), obc2(o2) {}
+ ObjectContextRef obc, obc2;
+ C_OSD_OndiskWriteUnlock(ObjectContextRef o, ObjectContextRef o2 = ObjectContextRef()) : obc(o), obc2(o2) {}
void finish(int r) {
obc->ondisk_write_unlock();
if (obc2)
@@ -657,17 +660,17 @@ protected:
}
};
struct C_OSD_OndiskWriteUnlockList : public Context {
- list<ObjectContext*> *pls;
- C_OSD_OndiskWriteUnlockList(list<ObjectContext*> *l) : pls(l) {}
+ list<ObjectContextRef> *pls;
+ C_OSD_OndiskWriteUnlockList(list<ObjectContextRef> *l) : pls(l) {}
void finish(int r) {
- for (list<ObjectContext*>::iterator p = pls->begin(); p != pls->end(); ++p)
+ for (list<ObjectContextRef>::iterator p = pls->begin(); p != pls->end(); ++p)
(*p)->ondisk_write_unlock();
}
};
struct C_OSD_AppliedRecoveredObject : public Context {
ReplicatedPGRef pg;
- ObjectContext *obc;
- C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) :
+ ObjectContextRef obc;
+ C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContextRef o) :
pg(p), obc(o) {}
void finish(int r) {
pg->_applied_recovered_object(obc);
@@ -730,7 +733,7 @@ protected:
void sub_op_modify_commit(RepModify *rm);
void sub_op_modify_reply(OpRequestRef op);
- void _applied_recovered_object(ObjectContext *obc);
+ void _applied_recovered_object(ObjectContextRef obc);
void _applied_recovered_object_replica();
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
@@ -879,10 +882,10 @@ public:
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);
- ObjectContext *mark_object_lost(ObjectStore::Transaction *t,
+ ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
const hobject_t& oid, eversion_t version,
utime_t mtime, int what);
- void _finish_mark_all_unfound_lost(list<ObjectContext*>& obcs);
+ void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs);
void on_role_change();
void on_change(ObjectStore::Transaction *t);
diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc
index 8a084ca9aa1..ffa3adced24 100644
--- a/src/osd/Watch.cc
+++ b/src/osd/Watch.cc
@@ -250,15 +250,14 @@ public:
string Watch::gen_dbg_prefix() {
stringstream ss;
ss << pg->gen_prefix() << " -- Watch("
- << make_pair(cookie, entity)
- << ", obc->ref=" << (obc ? obc->ref : -1) << ") ";
+ << make_pair(cookie, entity) << ") ";
return ss.str();
}
Watch::Watch(
ReplicatedPG *pg,
OSDService *osd,
- ObjectContext *obc,
+ ObjectContextRef obc,
uint32_t timeout,
uint64_t cookie,
entity_name_t entity,
@@ -272,7 +271,6 @@ Watch::Watch(
addr(addr),
entity(entity),
discarded(false) {
- obc->get();
dout(10) << "Watch()" << dendl;
}
@@ -292,13 +290,6 @@ Context *Watch::get_delayed_cb()
return cb;
}
-ObjectContext *Watch::get_obc()
-{
- assert(obc);
- obc->get();
- return obc;
-}
-
void Watch::register_cb()
{
Mutex::Locker l(osd->watch_lock);
@@ -370,8 +361,7 @@ void Watch::discard_state()
sessionref->put();
conn = ConnectionRef();
}
- pg->put_object_context(obc);
- obc = NULL;
+ obc = ObjectContextRef();
}
bool Watch::is_discarded()
@@ -428,7 +418,7 @@ void Watch::notify_ack(uint64_t notify_id)
WatchRef Watch::makeWatchRef(
ReplicatedPG *pg, OSDService *osd,
- ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr)
+ ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr)
{
WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
ret->set_self(ret);
diff --git a/src/osd/Watch.h b/src/osd/Watch.h
index 1c9fa28cb65..ecb61ad8b72 100644
--- a/src/osd/Watch.h
+++ b/src/osd/Watch.h
@@ -151,7 +151,7 @@ class Watch {
OSDService *osd;
boost::intrusive_ptr<ReplicatedPG> pg;
- ObjectContext *obc;
+ std::tr1::shared_ptr<ObjectContext> obc;
std::map<uint64_t, NotifyRef> in_progress_notifies;
@@ -165,7 +165,7 @@ class Watch {
Watch(
ReplicatedPG *pg, OSDService *osd,
- ObjectContext *obc, uint32_t timeout,
+ std::tr1::shared_ptr<ObjectContext> obc, uint32_t timeout,
uint64_t cookie, entity_name_t entity,
entity_addr_t addr);
@@ -187,7 +187,7 @@ public:
string gen_dbg_prefix();
static WatchRef makeWatchRef(
ReplicatedPG *pg, OSDService *osd,
- ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr);
+ std::tr1::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, entity_addr_t addr);
void set_self(WatchRef _self) {
self = _self;
}
@@ -195,8 +195,8 @@ public:
/// Does not grant a ref count!
boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; }
- /// Grants a ref count!
- ObjectContext *get_obc();
+ std::tr1::shared_ptr<ObjectContext> get_obc() { return obc; }
+
uint64_t get_cookie() const { return cookie; }
entity_name_t get_entity() const { return entity; }
entity_addr_t get_peer_addr() const { return addr; }
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 3cafdc2b035..6cdacc9902c 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2001,6 +2001,8 @@ struct ObjectState {
object_info_t oi;
bool exists;
+ ObjectState() : exists(false) {}
+
ObjectState(const object_info_t &oi_, bool exists_)
: oi(oi_), exists(exists_) {}
};
@@ -2022,13 +2024,18 @@ struct SnapSetContext {
* etc., because we don't send writes down to disk until after
* replicas ack.
*/
+
+struct ObjectContext;
+
+typedef std::tr1::shared_ptr<ObjectContext> ObjectContextRef;
+
struct ObjectContext {
- int ref;
- bool registered;
ObjectState obs;
SnapSetContext *ssc; // may be null
+ Context *destructor_callback;
+
private:
Mutex lock;
public:
@@ -2036,20 +2043,22 @@ public:
int unstable_writes, readers, writers_waiting, readers_waiting;
// set if writes for this object are blocked on another objects recovery
- ObjectContext *blocked_by; // object blocking our writes
- set<ObjectContext*> blocking; // objects whose writes we block
+ ObjectContextRef blocked_by; // object blocking our writes
+ set<ObjectContextRef> blocking; // objects whose writes we block
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
- ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
- : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_),
+ ObjectContext()
+ : ssc(NULL),
+ destructor_callback(0),
lock("ReplicatedPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
- blocked_by(0) {}
-
- void get() { ++ref; }
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
+ ~ObjectContext() {
+ if (destructor_callback)
+ destructor_callback->complete(0);
+ }
// do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
void ondisk_write_lock() {
lock.Lock();
diff --git a/src/test/test_osd_types.cc b/src/test/test_osd_types.cc
index 730a8ffdc5d..e07c9e06592 100644
--- a/src/test/test_osd_types.cc
+++ b/src/test/test_osd_types.cc
@@ -1008,8 +1008,7 @@ protected:
TEST_F(ObjectContextTest, read_write_lock)
{
{
- object_info_t oi;
- ObjectContext obc(oi, false, NULL);
+ ObjectContext obc;
//
// write_lock
@@ -1044,8 +1043,7 @@ TEST_F(ObjectContextTest, read_write_lock)
useconds_t delay = 0;
{
- object_info_t oi;
- ObjectContext obc(oi, false, NULL);
+ ObjectContext obc;
//
// write_lock
@@ -1102,8 +1100,7 @@ TEST_F(ObjectContextTest, read_write_lock)
}
{
- object_info_t oi;
- ObjectContext obc(oi, false, NULL);
+ ObjectContext obc;
//
// read_lock