summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-12-11 22:22:31 -0800
committerSamuel Just <sam.just@inktank.com>2012-12-12 15:30:00 -0800
commit047aecd90f1dbfb172f48f9d10b67e82b3a8ce15 (patch)
tree7e72dbf2ecfb2449a0674c9128333188670cca0b
parent0dfe6c84f0a5a5a51a8976174bd1f6d9d3741b17 (diff)
downloadceph-047aecd90f1dbfb172f48f9d10b67e82b3a8ce15.tar.gz
PG,ReplicatedPG: handle_watch_timeout must not write during scrub/degraded
Currently, handle_watch_timeout will gladly write to an object while that object is degraded or is being scrubbed. Now, we queue a callback to be called on scrub completion or finish_degraded_object to recall handle_watch_timeout. The callback mechanism assumes that the registered callbacks assume they will be called with the pg lock -- and no other locks -- already held. The callback will release the obc and pg refs unconditionally. Thus, we need to replace the unconnected_watchers pointer with NULL to ensure that unregister_unconnected_watcher fails to cancel the event and does not release the resources a second time. Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/PG.cc3
-rw-r--r--src/osd/PG.h17
-rw-r--r--src/osd/ReplicatedPG.cc75
3 files changed, 93 insertions, 2 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 49d12ea35ef..c25cce15df5 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1358,6 +1358,8 @@ void PG::activate(ObjectStore::Transaction& t,
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map)
{
assert(!is_active());
+ assert(scrubber.callbacks.empty());
+ assert(callbacks_for_degraded_object.empty());
// -- crash recovery?
if (is_primary() &&
@@ -3941,6 +3943,7 @@ void PG::chunky_scrub() {
scrub_compare_maps();
scrubber.block_writes = false;
+ scrubber.run_callbacks();
// requeue the writes from the chunk that just finished
requeue_ops(waiting_for_active);
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 31201cfa040..acaa89708d6 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -656,6 +656,8 @@ protected:
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
+ // Callbacks should assume pg (and nothing else) is locked
+ map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);
@@ -857,6 +859,20 @@ public:
// deep scrub
bool deep;
+ list<Context*> callbacks;
+ void add_callback(Context *context) {
+ callbacks.push_back(context);
+ }
+ void run_callbacks() {
+ list<Context*> to_run;
+ to_run.swap(callbacks);
+ for (list<Context*>::iterator i = to_run.begin();
+ i != to_run.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
+
static const char *state_string(const PG::Scrubber::State& state) {
const char *ret = NULL;
switch( state )
@@ -911,6 +927,7 @@ public:
errors = 0;
fixed = 0;
deep = false;
+ run_callbacks();
}
} scrubber;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 50160094f3a..ba314118f90 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -4147,15 +4147,76 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
entity_name_t entity,
utime_t expire)
{
+ dout(10) << "handle_watch_timeout obc " << _obc << dendl;
+ struct HandleWatchTimeout : public Context {
+ epoch_t cur_epoch;
+ boost::intrusive_ptr<ReplicatedPG> pg;
+ void *obc;
+ entity_name_t entity;
+ utime_t expire;
+ HandleWatchTimeout(
+ epoch_t cur_epoch,
+ ReplicatedPG *pg,
+ void *obc,
+ entity_name_t entity,
+ utime_t expire) : cur_epoch(cur_epoch),
+ pg(pg), obc(obc), entity(entity), expire(expire) {
+ assert(pg->is_locked());
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ }
+ void finish(int) {
+ assert(pg->is_locked());
+ if (cur_epoch < pg->last_peering_reset)
+ return;
+ // handle_watch_timeout gets its own ref
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ pg->handle_watch_timeout(obc, entity, expire);
+ }
+ ~HandleWatchTimeout() {
+ assert(pg->is_locked());
+ pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc));
+ }
+ };
+
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
if (obc->unconnected_watchers.count(entity) == 0 ||
- obc->unconnected_watchers[entity]->expire != expire) {
- dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " << entity << dendl;
+ (obc->unconnected_watchers[entity] &&
+ obc->unconnected_watchers[entity]->expire != expire)) {
+ /* If obc->unconnected_watchers[entity] == NULL we know at least that
+ * the watcher for obc,entity should expire. We might not have been
+ * the intended Context*, but that's ok since the intended one will
+ * take this branch and assume it raced. */
+ dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher "
+ << entity << dendl;
put_object_context(obc);
return;
}
+ if (is_degraded_object(obc->obs.oi.soid)) {
+ callbacks_for_degraded_object[obc->obs.oi.soid].push_back(
+ new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire)
+ );
+ dout(10) << "handle_watch_timeout waiting for degraded on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
+ if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ dout(10) << "handle_watch_timeout waiting for scrub on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire));
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
obc->unconnected_watchers.erase(entity);
obc->obs.oi.watchers.erase(entity);
@@ -5625,6 +5686,16 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
}
put_object_context(i->second);
}
+ if (callbacks_for_degraded_object.count(oid)) {
+ list<Context*> contexts;
+ contexts.swap(callbacks_for_degraded_object[oid]);
+ callbacks_for_degraded_object.erase(oid);
+ for (list<Context*>::iterator i = contexts.begin();
+ i != contexts.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
}
/** op_pull