diff options
author | Sage Weil <sage@inktank.com> | 2012-12-14 14:32:44 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-12-14 14:32:44 -0800 |
commit | 79db5a40c7adb1a09f1ab7831ac242e721a464ff (patch) | |
tree | 405c6e26051abc06a46a8c0ceb35afe56da8116e | |
parent | a7de975d9344621201a09b03453682cc7c798892 (diff) | |
parent | 97cc55d599ea1588af2e73d6972e1c9dfd9a545b (diff) | |
download | ceph-79db5a40c7adb1a09f1ab7831ac242e721a464ff.tar.gz |
Merge branch 'wip_watch' into next
-rw-r--r-- | src/osd/OSD.cc | 16 | ||||
-rw-r--r-- | src/osd/PG.cc | 3 | ||||
-rw-r--r-- | src/osd/PG.h | 32 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 109 |
4 files changed, 143 insertions, 17 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 421d7e1af32..217dd23b152 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2431,8 +2431,21 @@ void OSD::disconnect_session_watches(Session *session) dout(10) << "obc=" << (void *)obc << dendl; ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second)); - assert(pg); + if (!pg) { + /* pg removed between watch_unlock.Unlock() and now, all related + * watch structures would have been cleaned up in remove_watchers_and_notifies + */ + continue; + } service.watch_lock.Lock(); + + if (!session->watches.count((void*)obc)) { + // Raced with watch removal, obc is invalid + service.watch_lock.Unlock(); + pg->unlock(); + continue; + } + /* NOTE! fix this one, should be able to just lookup entity name, however, we currently only keep EntityName on the session and not entity_name_t. */ @@ -2450,6 +2463,7 @@ void OSD::disconnect_session_watches(Session *session) << ", expires " << expire << dendl; obc->watchers.erase(witer++); pg->put_object_context(obc); + session->con->put(); session->put(); } if (witer == obc->watchers.end()) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 49d12ea35ef..c25cce15df5 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1358,6 +1358,8 @@ void PG::activate(ObjectStore::Transaction& t, map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map) { assert(!is_active()); + assert(scrubber.callbacks.empty()); + assert(callbacks_for_degraded_object.empty()); // -- crash recovery? if (is_primary() && @@ -3941,6 +3943,7 @@ void PG::chunky_scrub() { scrub_compare_maps(); scrubber.block_writes = false; + scrubber.run_callbacks(); // requeue the writes from the chunk that just finished requeue_ops(waiting_for_active); diff --git a/src/osd/PG.h b/src/osd/PG.h index 2cf1173203d..acaa89708d6 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -656,6 +656,8 @@ protected: list<OpRequestRef> waiting_for_all_missing; map<hobject_t, list<OpRequestRef> > waiting_for_missing_object, waiting_for_degraded_object; + // Callbacks should assume pg (and nothing else) is locked + map<hobject_t, list<Context*> > callbacks_for_degraded_object; map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk; map<eversion_t,OpRequestRef> replay_queue; void split_ops(PG *child, unsigned split_bits); @@ -857,6 +859,20 @@ public: // deep scrub bool deep; + list<Context*> callbacks; + void add_callback(Context *context) { + callbacks.push_back(context); + } + void run_callbacks() { + list<Context*> to_run; + to_run.swap(callbacks); + for (list<Context*>::iterator i = to_run.begin(); + i != to_run.end(); + ++i) { + (*i)->complete(0); + } + } + static const char *state_string(const PG::Scrubber::State& state) { const char *ret = NULL; switch( state ) @@ -875,6 +891,21 @@ public: bool is_chunky_scrub_active() const { return state != INACTIVE; } + // classic (non chunk) scrubs block all writes + // chunky scrubs only block writes to a range + bool write_blocked_by_scrub(const hobject_t &soid) { + if (!block_writes) + return false; + + if (!is_chunky) + return true; + + if (soid >= start && soid < end) + return true; + + return false; + } + // clear all state void reset() { finalizing = false; @@ -896,6 +927,7 @@ public: errors = 0; fixed = 0; deep = false; + run_callbacks(); } } scrubber; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 675973e8f07..4c6c481cc65 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -628,15 +628,11 @@ void ReplicatedPG::do_op(OpRequestRef op) CEPH_NOSNAP, m->get_pg().ps(), info.pgid.pool()); - if (scrubber.block_writes && m->may_write()) { - // classic (non chunk) scrubs block all writes - // chunky scrubs only block writes to a range - if (!scrubber.is_chunky || (head >= scrubber.start && head < scrubber.end)) { - dout(20) << __func__ << ": waiting for scrub" << dendl; - waiting_for_active.push_back(op); - op->mark_delayed(); - return; - } + if (m->may_write() && scrubber.write_blocked_by_scrub(head)) { + dout(20) << __func__ << ": waiting for scrub" << dendl; + waiting_for_active.push_back(op); + op->mark_delayed(); + return; } // missing object? @@ -1590,6 +1586,7 @@ void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity) session->watches.erase(obc); put_object_context(obc); + session->con->put(); session->put(); } @@ -1606,8 +1603,8 @@ void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif) assert(niter != obc->notifs.end()); - niter->first->session->put(); niter->first->session->con->put(); + niter->first->session->put(); obc->notifs.erase(niter); put_object_context(obc); @@ -3349,6 +3346,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) if (iter == obc->watchers.end()) { dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl; obc->watchers[entity] = session; + session->con->get(); session->get(); session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); obc->ref++; @@ -3360,10 +3358,14 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) // weird: same entity, different session. dout(10) << " reconnected (with different session!) watch " << w << " by " << entity << " session " << session << " (was " << iter->second << ")" << dendl; + session->con->get(); + session->get(); + iter->second->watches.erase(obc); + iter->second->con->put(); iter->second->put(); + iter->second = session; - session->get(); session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); } map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter = @@ -4114,10 +4116,14 @@ void ReplicatedPG::unregister_unconnected_watcher(void *_obc, entity_name_t entity) { ObjectContext *obc = static_cast<ObjectContext *>(_obc); - osd->watch_timer.cancel_event(obc->unconnected_watchers[entity]); + + /* If we failed to cancel the event, the event will fire and the obc + * ref and the pg ref will be taken care of */ + if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) { + put_object_context(obc); + put(); + } obc->unconnected_watchers.erase(entity); - put_object_context(obc); - put(); } void ReplicatedPG::register_unconnected_watcher(void *_obc, @@ -4141,15 +4147,76 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, entity_name_t entity, utime_t expire) { + dout(10) << "handle_watch_timeout obc " << _obc << dendl; + struct HandleWatchTimeout : public Context { + epoch_t cur_epoch; + boost::intrusive_ptr<ReplicatedPG> pg; + void *obc; + entity_name_t entity; + utime_t expire; + HandleWatchTimeout( + epoch_t cur_epoch, + ReplicatedPG *pg, + void *obc, + entity_name_t entity, + utime_t expire) : cur_epoch(cur_epoch), + pg(pg), obc(obc), entity(entity), expire(expire) { + assert(pg->is_locked()); + static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); + } + void finish(int) { + assert(pg->is_locked()); + if (cur_epoch < pg->last_peering_reset) + return; + // handle_watch_timeout gets its own ref + static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); + pg->handle_watch_timeout(obc, entity, expire); + } + ~HandleWatchTimeout() { + assert(pg->is_locked()); + pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc)); + } + }; + ObjectContext *obc = static_cast<ObjectContext *>(_obc); if (obc->unconnected_watchers.count(entity) == 0 || - obc->unconnected_watchers[entity]->expire != expire) { - dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " << entity << dendl; + (obc->unconnected_watchers[entity] && + obc->unconnected_watchers[entity]->expire != expire)) { + /* If obc->unconnected_watchers[entity] == NULL we know at least that + * the watcher for obc,entity should expire. We might not have been + * the intended Context*, but that's ok since the intended one will + * take this branch and assume it raced. */ + dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " + << entity << dendl; put_object_context(obc); return; } + if (is_degraded_object(obc->obs.oi.soid)) { + callbacks_for_degraded_object[obc->obs.oi.soid].push_back( + new HandleWatchTimeout(get_osdmap()->get_epoch(), + this, _obc, entity, expire) + ); + dout(10) << "handle_watch_timeout waiting for degraded on obj " + << obc->obs.oi.soid + << dendl; + obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! + put_object_context(obc); // callback got its own ref + return; + } + + if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) { + dout(10) << "handle_watch_timeout waiting for scrub on obj " + << obc->obs.oi.soid + << dendl; + scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(), + this, _obc, entity, expire)); + obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! + put_object_context(obc); // callback got its own ref + return; + } + obc->unconnected_watchers.erase(entity); obc->obs.oi.watchers.erase(entity); @@ -5619,6 +5686,16 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) } put_object_context(i->second); } + if (callbacks_for_degraded_object.count(oid)) { + list<Context*> contexts; + contexts.swap(callbacks_for_degraded_object[oid]); + callbacks_for_degraded_object.erase(oid); + for (list<Context*>::iterator i = contexts.begin(); + i != contexts.end(); + ++i) { + (*i)->complete(0); + } + } } /** op_pull |