diff options
author | Samuel Just <sam.just@inktank.com> | 2013-02-15 10:45:47 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-02-20 13:29:20 -0800 |
commit | ebdf66dfbfb8e2509e8fe9aebe5cb3e3da929769 (patch) | |
tree | 213b0c7442c54a97beb1afec2b9ad60b35fa34c2 | |
parent | 7af32997978caaa3c4538334c5fa26df1698b3f5 (diff) | |
download | ceph-ebdf66dfbfb8e2509e8fe9aebe5cb3e3da929769.tar.gz |
Watch/Notify: rework watch/notify
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/OSD.cc | 141 | ||||
-rw-r--r-- | src/osd/OSD.h | 20 | ||||
-rw-r--r-- | src/osd/PG.h | 10 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 509 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 31 | ||||
-rw-r--r-- | src/osd/Watch.cc | 453 | ||||
-rw-r--r-- | src/osd/Watch.h | 303 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 33 | ||||
-rw-r--r-- | src/osd/osd_types.h | 9 |
9 files changed, 856 insertions, 653 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b20e6d690f2..66f976b1c15 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -168,7 +168,6 @@ OSDService::OSDService(OSD *osd) : scrubs_active(0), watch_lock("OSD::watch_lock"), watch_timer(osd->client_messenger->cct, watch_lock), - watch(NULL), backfill_request_lock("OSD::backfill_request_lock"), backfill_request_timer(g_ceph_context, backfill_request_lock, false), last_tid(0), @@ -252,15 +251,12 @@ void OSDService::shutdown() watch_lock.Lock(); watch_timer.shutdown(); watch_lock.Unlock(); - - delete watch; } void OSDService::init() { reserver_finisher.start(); watch_timer.init(); - watch = new Watch(); } ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev) @@ -2571,152 +2567,17 @@ void OSD::ms_handle_connect(Connection *con) } } -void OSD::put_object_context(void *_obc, pg_t pgid) -{ - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc; - ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(pgid); - // If pg is being deleted, (which is the only case in which - // it will be NULL) it will clean up its object contexts itself - if (pg) { - pg->put_object_context(obc); - pg->unlock(); - } -} - -void OSD::complete_notify(void *_notif, void *_obc) -{ - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc; - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "complete_notify " << notif << " got the last reply from pending watchers, can send response now" << dendl; - MWatchNotify *reply = notif->reply; - client_messenger->send_message(reply, notif->session->con); - notif->session->put(); - notif->session->con->put(); - service.watch->remove_notification(notif); - if (notif->timeout) - service.watch_timer.cancel_event(notif->timeout); - map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif); - if (iter != obc->notifs.end()) - obc->notifs.erase(iter); - delete notif; -} - -void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, ReplicatedPG *pg) -{ - assert(service.watch_lock.is_locked()); - pg->assert_locked(); - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "ack_notification " << name << " notif " << notif << " id " << notif->id << dendl; - if (service.watch->ack_notification(name, notif)) { - complete_notify(notif, _obc); - pg->put_object_context(static_cast<ReplicatedPG::ObjectContext *>(_obc)); - } -} - -void OSD::handle_watch_timeout(void *obc, - ReplicatedPG *pg, - entity_name_t entity, - utime_t expire) -{ - // watch_lock is inside pg->lock; handle_watch_timeout checks for the race. - service.watch_lock.Unlock(); - pg->lock(); - service.watch_lock.Lock(); - - pg->handle_watch_timeout(obc, entity, expire); - pg->unlock(); - pg->put(); -} - -void OSD::disconnect_session_watches(Session *session) -{ - // get any watched obc's - map<ReplicatedPG::ObjectContext *, pg_t> obcs; - service.watch_lock.Lock(); - for (map<void *, pg_t>::iterator iter = session->watches.begin(); iter != session->watches.end(); ++iter) { - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)iter->first; - obcs[obc] = iter->second; - } - service.watch_lock.Unlock(); - - for (map<ReplicatedPG::ObjectContext *, pg_t>::iterator oiter = obcs.begin(); oiter != obcs.end(); ++oiter) { - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first; - dout(10) << "obc=" << (void *)obc << dendl; - - ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second)); - if (!pg) { - /* pg removed between watch_unlock.Unlock() and now, all related - * watch structures would have been cleaned up in remove_watchers_and_notifies - */ - continue; - } - service.watch_lock.Lock(); - - if (!session->watches.count((void*)obc)) { - // Raced with watch removal, obc is invalid - service.watch_lock.Unlock(); - pg->unlock(); - continue; - } - - /* NOTE! fix this one, should be able to just lookup entity name, - however, we currently only keep EntityName on the session and not - entity_name_t. */ - map<entity_name_t, Session *>::iterator witer = obc->watchers.begin(); - while (1) { - while (witer != obc->watchers.end() && witer->second == session) { - dout(10) << "removing watching session entity_name=" << session->entity_name - << " from " << obc->obs.oi << dendl; - entity_name_t entity = witer->first; - watch_info_t& w = obc->obs.oi.watchers[entity]; - utime_t expire = ceph_clock_now(g_ceph_context); - expire += w.timeout_seconds; - pg->register_unconnected_watcher(obc, entity, expire); - dout(10) << " disconnected watch " << w << " by " << entity << " session " << session - << ", expires " << expire << dendl; - obc->watchers.erase(witer++); - pg->put_object_context(obc); - session->con->put(); - session->put(); - } - if (witer == obc->watchers.end()) - break; - ++witer; - } - service.watch_lock.Unlock(); - pg->unlock(); - } -} - bool OSD::ms_handle_reset(Connection *con) { dout(1) << "OSD::ms_handle_reset()" << dendl; OSD::Session *session = (OSD::Session *)con->get_priv(); if (!session) return false; - disconnect_session_watches(session); + session->wstate.reset(); session->put(); return true; } -void OSD::handle_notify_timeout(void *_notif) -{ - assert(service.watch_lock.is_locked()); - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "OSD::handle_notify_timeout notif " << notif << " id " << notif->id << dendl; - - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)notif->obc; - - pg_t pgid = notif->pgid; - - complete_notify(_notif, obc); - service.watch_lock.Unlock(); /* drop lock to change locking order */ - - put_object_context(obc, pgid); - service.watch_lock.Lock(); - /* exiting with watch_lock held */ -} - struct C_OSD_GetVersion : public Context { OSD *osd; uint64_t oldest, newest; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 1837195d339..25e1eee13ad 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -48,6 +48,7 @@ using namespace std; #include <ext/hash_set> using namespace __gnu_cxx; +#include "Watch.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "common/sharedptr_registry.hpp" @@ -295,7 +296,11 @@ public: // -- Watch -- Mutex watch_lock; SafeTimer watch_timer; - Watch *watch; + uint64_t next_notif_id; + uint64_t get_next_id(epoch_t cur_epoch) { + Mutex::Locker l(watch_lock); + return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); + } // -- Backfill Request Scheduling -- Mutex backfill_request_lock; @@ -526,7 +531,7 @@ public: int64_t auid; epoch_t last_sent_epoch; Connection *con; - std::map<void *, pg_t> watches; + WatchConState wstate; Session() : auid(-1), last_sent_epoch(0), con(0) {} }; @@ -1468,17 +1473,6 @@ public: int init_op_flags(OpRequestRef op); - - void put_object_context(void *_obc, pg_t pgid); - void complete_notify(void *notif, void *obc); - void ack_notification(entity_name_t& peer_addr, void *notif, void *obc, - ReplicatedPG *pg); - void handle_notify_timeout(void *notif); - void disconnect_session_watches(Session *session); - void handle_watch_timeout(void *obc, - ReplicatedPG *pg, - entity_name_t entity, - utime_t expire); OSDService service; friend class OSDService; }; diff --git a/src/osd/PG.h b/src/osd/PG.h index 39bc6b5ab98..ed85c4e2946 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1918,16 +1918,6 @@ public: virtual void on_activate() = 0; virtual void on_flushed() = 0; virtual void on_shutdown() = 0; - virtual void remove_watchers_and_notifies() = 0; - - virtual void register_unconnected_watcher(void *obc, - entity_name_t entity, - utime_t expire) = 0; - virtual void unregister_unconnected_watcher(void *obc, - entity_name_t entity) = 0; - virtual void handle_watch_timeout(void *obc, - entity_name_t entity, - utime_t expire) = 0; }; WRITE_CLASS_ENCODER(PG::OndiskLog) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ffa1a93845b..2e4ea0286d0 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1567,94 +1567,6 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr) } } -void ReplicatedPG::dump_watchers(ObjectContext *obc) -{ - assert(osd->watch_lock.is_locked()); - - dout(10) << "dump_watchers " << obc->obs.oi.soid << " " << obc->obs.oi << dendl; - for (map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.begin(); - iter != obc->watchers.end(); - ++iter) - dout(10) << " * obc->watcher: " << iter->first << " session=" << iter->second << dendl; - - for (map<entity_name_t, watch_info_t>::iterator oi_iter = obc->obs.oi.watchers.begin(); - oi_iter != obc->obs.oi.watchers.end(); - oi_iter++) { - watch_info_t& w = oi_iter->second; - dout(10) << " * oi->watcher: " << oi_iter->first << " cookie=" << w.cookie << dendl; - } -} - -void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity) -{ - assert_locked(); - assert(osd->watch_lock.is_locked()); - dout(10) << "remove_watcher " << *obc << " " << entity << dendl; - map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity); - assert(iter != obc->watchers.end()); - OSD::Session *session = iter->second; - dout(10) << "remove_watcher removing session " << session << dendl; - - obc->watchers.erase(iter); - assert(session->watches.count(obc)); - session->watches.erase(obc); - - put_object_context(obc); - session->con->put(); - session->put(); -} - -void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif) -{ - assert_locked(); - assert(osd->watch_lock.is_locked()); - map<Watch::Notification *, bool>::iterator niter = obc->notifs.find(notif); - - // Cancel notification - if (notif->timeout) - osd->watch_timer.cancel_event(notif->timeout); - osd->watch->remove_notification(notif); - - assert(niter != obc->notifs.end()); - - niter->first->session->con->put(); - niter->first->session->put(); - obc->notifs.erase(niter); - - put_object_context(obc); - delete notif; -} - -void ReplicatedPG::remove_watchers_and_notifies() -{ - assert_locked(); - - dout(10) << "remove_watchers" << dendl; - - osd->watch_lock.Lock(); - for (map<hobject_t, ObjectContext*>::iterator oiter = object_contexts.begin(); - oiter != object_contexts.end(); - ) { - map<hobject_t, ObjectContext *>::iterator iter = oiter++; - ObjectContext *obc = iter->second; - obc->ref++; - for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin(); - witer != obc->watchers.end(); - remove_watcher(obc, (witer++)->first)) ; - for (map<entity_name_t,Watch::C_WatchTimeout*>::iterator iter = obc->unconnected_watchers.begin(); - iter != obc->unconnected_watchers.end(); - ) { - map<entity_name_t,Watch::C_WatchTimeout*>::iterator i = iter++; - unregister_unconnected_watcher(obc, i->first); - } - for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin(); - niter != obc->notifs.end(); - remove_notify(obc, (niter++)->first)) ; - put_object_context(obc); - } - osd->watch_lock.Unlock(); -} - // ======================================================================== // low level osd ops @@ -2329,20 +2241,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) case CEPH_OSD_OP_NOTIFY_ACK: { - osd->watch_lock.Lock(); - entity_name_t source = ctx->op->request->get_source(); - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source); - Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie); - if (oi_iter != oi.watchers.end() && notif) { - ctx->notify_acks.push_back(op.watch.cookie); - } else { - if (!notif) - dout(10) << " no pending notify for cookie " << op.watch.cookie << dendl; - else - dout(10) << " not registered as a watcher" << dendl; - result = -EINVAL; - } - osd->watch_lock.Unlock(); + ctx->notify_acks.push_back(op.watch.cookie); } break; @@ -2557,27 +2456,27 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) watch_info_t w(cookie, 30); // FIXME: where does the timeout come from? if (do_watch) { - if (oi.watchers.count(entity) && oi.watchers[entity] == w) { + if (oi.watchers.count(make_pair(cookie, entity))) { dout(10) << " found existing watch " << w << " by " << entity << dendl; } else { dout(10) << " registered new watch " << w << " by " << entity << dendl; - oi.watchers[entity] = w; + oi.watchers[make_pair(cookie, entity)] = w; t.nop(); // make sure update the object_info on disk! } ctx->watch_connect = true; ctx->watch_info = w; assert(obc->registered); } else { - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity); + map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter = + oi.watchers.find(make_pair(cookie, entity)); if (oi_iter != oi.watchers.end()) { - dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl; - oi.watchers.erase(entity); + dout(10) << " removed watch " << oi_iter->second << " by " + << entity << dendl; + oi.watchers.erase(oi_iter); t.nop(); // update oi on disk ctx->watch_disconnect = true; - - // FIXME: trigger notifies? - + ctx->watch_info = w; } else { dout(10) << " can't remove: no watch by " << entity << dendl; } @@ -3340,154 +3239,88 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum void ReplicatedPG::do_osd_op_effects(OpContext *ctx) { - if (ctx->watch_connect || ctx->watch_disconnect || - !ctx->notifies.empty() || !ctx->notify_acks.empty()) { - OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv(); - assert(session); - ObjectContext *obc = ctx->obc; - object_info_t& oi = ctx->new_obs.oi; - hobject_t& soid = oi.soid; - entity_name_t entity = ctx->reqid.name; + ConnectionRef conn(ctx->op->request->get_connection()); + boost::intrusive_ptr<OSD::Session> session( + (OSD::Session *)conn->get_priv()); + entity_name_t entity = ctx->reqid.name; - dout(10) << "do_osd_op_effects applying watch/notify effects on session " << session << dendl; - - osd->watch_lock.Lock(); - dump_watchers(obc); - - map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity); - if (ctx->watch_connect) { - watch_info_t w = ctx->watch_info; - - if (iter == obc->watchers.end()) { - dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl; - obc->watchers[entity] = session; - session->con->get(); - session->get(); - session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - obc->ref++; - } else if (iter->second == session) { - // already there - dout(10) << " already connected to " << w << " by " << entity - << " session " << session << dendl; - } else { - // weird: same entity, different session. - dout(10) << " reconnected (with different session!) watch " << w << " by " << entity - << " session " << session << " (was " << iter->second << ")" << dendl; - session->con->get(); - session->get(); - - iter->second->watches.erase(obc); - iter->second->con->put(); - iter->second->put(); - - iter->second = session; - session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - } - map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter = - obc->unconnected_watchers.find(entity); - if (un_iter != obc->unconnected_watchers.end()) { - unregister_unconnected_watcher(obc, un_iter->first); - } + dout(15) << "do_osd_op_effects on session " << session.get() << dendl; - map<Watch::Notification *, bool>::iterator niter; - for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) { - Watch::Notification *notif = niter->first; - map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity); - if (iter != notif->watchers.end()) { - /* there is a pending notification for this watcher, we should resend it anyway - even if we already sent it as it might not have received it */ - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->send_message_osd_client(notify_msg, session->con); - } - } - } - - if (ctx->watch_disconnect) { - if (iter != obc->watchers.end()) { - remove_watcher(obc, entity); - } else { - assert(obc->unconnected_watchers.count(entity)); - unregister_unconnected_watcher(obc, entity); - } - - // ack any pending notifies - map<Watch::Notification *, bool>::iterator p = obc->notifs.begin(); - while (p != obc->notifs.end()) { - Watch::Notification *notif = p->first; - entity_name_t by = entity; - p++; - assert(notif->obc == obc); - dout(10) << " acking pending notif " << notif->id << " by " << by << dendl; - // TODOSAM: osd->osd-> not good - osd->osd->ack_notification(entity, notif, obc, this); - } + if (ctx->watch_connect) { + pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity); + dout(15) << "do_osd_op_effects applying watch connect on session " + << session.get() << " watcher " << watcher << dendl; + WatchRef watch; + if (ctx->obc->watchers.count(watcher)) { + dout(15) << "do_osd_op_effects found existing watch watcher " << watcher + << dendl; + watch = ctx->obc->watchers[watcher]; + } else { + dout(15) << "do_osd_op_effects new watcher " << watcher + << dendl; + watch = Watch::makeWatchRef( + this, osd, ctx->obc, ctx->watch_info.timeout_seconds, + ctx->watch_info.cookie, entity); + ctx->obc->watchers.insert( + make_pair( + watcher, + watch)); + } + watch->connect(conn); + } + + if (ctx->watch_disconnect) { + pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity); + dout(15) << "do_osd_op_effects applying watch disconnect on session " + << session.get() << " and watcher " << watcher << dendl; + if (ctx->obc->watchers.count(watcher)) { + WatchRef watch = ctx->obc->watchers[watcher]; + dout(10) << "do_osd_op_effects applying disconnect found watcher " + << watcher << dendl; + ctx->obc->watchers.erase(watcher); + watch->remove(); + } else { + dout(10) << "do_osd_op_effects failed to find watcher " + << watcher << dendl; } + } - for (list<notify_info_t>::iterator p = ctx->notifies.begin(); - p != ctx->notifies.end(); - ++p) { - - dout(10) << " " << *p << dendl; - - Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl); - session->get(); // notif got a reference - session->con->get(); - notif->pgid = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - - osd->watch->add_notification(notif); - dout(20) << " notify id " << notif->id << dendl; - - // connected - for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin(); - i != obc->obs.oi.watchers.end(); - ++i) { - map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first); - if (q != obc->watchers.end()) { - entity_name_t name = q->first; - OSD::Session *s = q->second; - watch_info_t& w = obc->obs.oi.watchers[q->first]; - - notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race - - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->send_message_osd_client(notify_msg, s->con); - } else { - // unconnected - entity_name_t name = i->first; - notif->add_watcher(name, Watch::WATCHER_PENDING); - } - } - - notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl); - if (notif->watchers.empty()) { - // TODOSAM: osd->osd-> not good - osd->osd->complete_notify(notif, obc); - } else { - obc->notifs[notif] = true; - obc->ref++; - notif->obc = obc; - // TODOSAM: osd->osd not good - notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif); - osd->watch_timer.add_event_after(p->timeout, notif->timeout); - } + for (list<notify_info_t>::iterator p = ctx->notifies.begin(); + p != ctx->notifies.end(); + ++p) { + dout(10) << "do_osd_op_effects, notify " << *p << dendl; + NotifyRef notif( + Notify::makeNotifyRef( + conn, + ctx->obc->watchers.size(), + p->bl, + p->timeout, + p->cookie, + osd->get_next_id(get_osdmap()->get_epoch()), + ctx->obc->obs.oi.user_version.version, + osd)); + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = + ctx->obc->watchers.begin(); + i != ctx->obc->watchers.end(); + ++i) { + dout(10) << "starting notify on watch " << i->first << dendl; + i->second->start_notify(notif); } + notif->init(); + } - for (list<uint64_t>::iterator p = ctx->notify_acks.begin(); p != ctx->notify_acks.end(); ++p) { - uint64_t cookie = *p; - - dout(10) << " notify_ack " << cookie << dendl; - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity); - assert(oi_iter != oi.watchers.end()); - - Watch::Notification *notif = osd->watch->get_notif(cookie); - assert(notif); - - // TODOSAM: osd->osd-> not good - osd->osd->ack_notification(entity, notif, obc, this); + for (list<uint64_t>::iterator p = ctx->notify_acks.begin(); + p != ctx->notify_acks.end(); + ++p) { + dout(10) << "notify_ack " << *p << dendl; + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = + ctx->obc->watchers.begin(); + i != ctx->obc->watchers.end(); + ++i) { + if (i->first.second != entity) continue; + dout(10) << "acking notify on watch " << i->first << dendl; + i->second->notify_ack(*p); } - - osd->watch_lock.Unlock(); - session->put(); } } @@ -4115,113 +3948,39 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc) log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version)); dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl; - if (!obc->obs.oi.watchers.empty()) { - Mutex::Locker l(osd->watch_lock); - assert(obc->unconnected_watchers.size() == 0); - assert(obc->watchers.size() == 0); - // populate unconnected_watchers - utime_t now = ceph_clock_now(g_ceph_context); - for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin(); - p != obc->obs.oi.watchers.end(); - p++) { - utime_t expire = now; - expire += p->second.timeout_seconds; - dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl; - register_unconnected_watcher(obc, p->first, expire); - } - } -} - -void ReplicatedPG::unregister_unconnected_watcher(void *_obc, - entity_name_t entity) -{ - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - - /* If we failed to cancel the event, the event will fire and the obc - * ref and the pg ref will be taken care of */ - if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) { - put_object_context(obc); - put(); + assert(obc->watchers.size() == 0); + // populate unconnected_watchers + utime_t now = ceph_clock_now(g_ceph_context); + for (map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator p = + obc->obs.oi.watchers.begin(); + p != obc->obs.oi.watchers.end(); + p++) { + utime_t expire = now; + expire += p->second.timeout_seconds; + dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl; + WatchRef watch( + Watch::makeWatchRef( + this, osd, obc, p->second.timeout_seconds, p->first.first, p->first.second)); + watch->disconnect(); + obc->watchers.insert( + make_pair( + make_pair(p->first.first, p->first.second), + watch)); } - obc->unconnected_watchers.erase(entity); } -void ReplicatedPG::register_unconnected_watcher(void *_obc, - entity_name_t entity, - utime_t expire) +void ReplicatedPG::handle_watch_timeout(WatchRef watch) { - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - pg_t pgid = info.pgid; - pgid.set_ps(obc->obs.oi.soid.hash); - get(); - obc->ref++; - Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd, - static_cast<void *>(obc), - this, - entity, expire); - osd->watch_timer.add_event_at(expire, cb); - obc->unconnected_watchers[entity] = cb; -} - -void ReplicatedPG::handle_watch_timeout(void *_obc, - entity_name_t entity, - utime_t expire) -{ - dout(10) << "handle_watch_timeout obc " << _obc << dendl; - struct HandleWatchTimeout : public Context { - epoch_t cur_epoch; - boost::intrusive_ptr<ReplicatedPG> pg; - void *obc; - entity_name_t entity; - utime_t expire; - HandleWatchTimeout( - epoch_t cur_epoch, - ReplicatedPG *pg, - void *obc, - entity_name_t entity, - utime_t expire) : cur_epoch(cur_epoch), - pg(pg), obc(obc), entity(entity), expire(expire) { - assert(pg->is_locked()); - static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); - } - void finish(int) { - assert(pg->is_locked()); - if (cur_epoch < pg->last_peering_reset) - return; - // handle_watch_timeout gets its own ref - static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); - pg->handle_watch_timeout(obc, entity, expire); - } - ~HandleWatchTimeout() { - assert(pg->is_locked()); - pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc)); - } - }; - - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - - if (obc->unconnected_watchers.count(entity) == 0 || - (obc->unconnected_watchers[entity] && - obc->unconnected_watchers[entity]->expire != expire)) { - /* If obc->unconnected_watchers[entity] == NULL we know at least that - * the watcher for obc,entity should expire. We might not have been - * the intended Context*, but that's ok since the intended one will - * take this branch and assume it raced. */ - dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " - << entity << dendl; - put_object_context(obc); - return; - } + ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref + dout(10) << "handle_watch_timeout obc " << obc << dendl; if (is_degraded_object(obc->obs.oi.soid)) { callbacks_for_degraded_object[obc->obs.oi.soid].push_back( - new HandleWatchTimeout(get_osdmap()->get_epoch(), - this, _obc, entity, expire) + watch->get_delayed_cb() ); dout(10) << "handle_watch_timeout waiting for degraded on obj " << obc->obs.oi.soid << dendl; - obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! put_object_context(obc); // callback got its own ref return; } @@ -4230,15 +3989,16 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, dout(10) << "handle_watch_timeout waiting for scrub on obj " << obc->obs.oi.soid << dendl; - scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(), - this, _obc, entity, expire)); - obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! - put_object_context(obc); // callback got its own ref + scrubber.add_callback( + watch->get_delayed_cb() // This callback! + ); + put_object_context(obc); return; } - obc->unconnected_watchers.erase(entity); - obc->obs.oi.watchers.erase(entity); + obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); + obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); + watch->remove(); vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); @@ -4283,7 +4043,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, eval_repop(repop); } -ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid) +ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid) { map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid); if (p != object_contexts.end()) @@ -4291,7 +4051,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_ return NULL; } -ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi, +ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi, SnapSetContext *ssc) { ObjectContext *obc = new ObjectContext(oi, false, ssc); @@ -4302,7 +4062,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_in return obc; } -ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, +ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, const object_locator_t& oloc, bool can_create) { @@ -4354,7 +4114,24 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s void ReplicatedPG::context_registry_on_change() { - remove_watchers_and_notifies(); + list<ObjectContext *> contexts; + for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin(); + i != object_contexts.end(); + ++i) { + i->second->get(); + contexts.push_back(i->second); + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = + i->second->watchers.begin(); + j != i->second->watchers.end(); + i->second->watchers.erase(j++)) { + j->second->discard(); + } + } + for (list<ObjectContext *>::iterator i = contexts.begin(); + i != contexts.end(); + contexts.erase(i++)) { + put_object_context(*i); + } } @@ -4529,7 +4306,7 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t * pgstat->stats.cat_sum[oi.category].add(stat); } -ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) +SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) { SnapSetContext *ssc = new SnapSetContext(oid); dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl; @@ -4538,10 +4315,10 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_ return ssc; } -ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, - const string& key, - ps_t seed, - bool can_create) +SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, + const string& key, + ps_t seed, + bool can_create) { SnapSetContext *ssc; map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid); @@ -6028,7 +5805,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid) /* Mark an object as lost */ -ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, +ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, const hobject_t &oid, eversion_t version, utime_t mtime, int what) { @@ -6065,7 +5842,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac struct C_PG_MarkUnfoundLost : public Context { ReplicatedPG *pg; - list<ReplicatedPG::ObjectContext*> obcs; + list<ObjectContext*> obcs; C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) { pg->get(); } @@ -6246,14 +6023,14 @@ void ReplicatedPG::on_removal() { dout(10) << "on_removal" << dendl; apply_and_flush_repops(false); - remove_watchers_and_notifies(); + context_registry_on_change(); } void ReplicatedPG::on_shutdown() { dout(10) << "on_shutdown" << dendl; apply_and_flush_repops(false); - remove_watchers_and_notifies(); + context_registry_on_change(); } void ReplicatedPG::on_flushed() diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 397d8cfc63e..47ee76e5aa3 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -60,6 +60,7 @@ public: class ReplicatedPG : public PG { friend class OSD; + friend class Watch; public: /* @@ -424,14 +425,9 @@ protected: map<object_t, SnapSetContext*> snapset_contexts; void populate_obc_watchers(ObjectContext *obc); - void register_unconnected_watcher(void *obc, - entity_name_t entity, - utime_t expire); - void unregister_unconnected_watcher(void *obc, - entity_name_t entity); - void handle_watch_timeout(void *obc, - entity_name_t entity, - utime_t expire); +public: + void handle_watch_timeout(WatchRef watch); +protected: ObjectContext *lookup_object_context(const hobject_t& soid) { if (object_contexts.count(soid)) { @@ -725,11 +721,6 @@ protected: void send_remove_op(const hobject_t& oid, eversion_t v, int peer); - void dump_watchers(ObjectContext *obc); - void remove_watcher(ObjectContext *obc, entity_name_t entity); - void remove_notify(ObjectContext *obc, Watch::Notification *notif); - void remove_watchers_and_notifies(); - struct RepModify { ReplicatedPG *pg; OpRequestRef op; @@ -1020,20 +1011,6 @@ public: void on_shutdown(); }; - -inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectState& obs) -{ - out << obs.oi.soid; - if (!obs.exists) - out << "(dne)"; - return out; -} - -inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc) -{ - return out << "obc(" << obc.obs << ")"; -} - inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { out << "repgather(" << &repop diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index da90cef4103..1ee5f35b5e3 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -1,7 +1,8 @@ - +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "PG.h" #include "include/types.h" +#include "messages/MWatchNotify.h" #include <map> @@ -11,26 +12,454 @@ #include "common/config.h" -bool Watch::ack_notification(entity_name_t& watcher, Notification *notif) +struct CancelableContext : public Context { + virtual void cancel() = 0; +}; + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +static ostream& _prefix( + std::ostream* _dout, + Notify *notify) { + return *_dout << notify->gen_dbg_prefix(); +} + +Notify::Notify( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd) + : client(client), + in_progress_watchers(num_watchers), + complete(false), + discarded(false), + payload(payload), + timeout(timeout), + cookie(cookie), + notify_id(notify_id), + version(version), + osd(osd), + cb(NULL), + lock("Notify::lock") {} + +NotifyRef Notify::makeNotifyRef( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd) { + NotifyRef ret( + new Notify( + client, num_watchers, + payload, timeout, + cookie, notify_id, + version, osd)); + ret->set_self(ret); + return ret; +} + +class NotifyTimeoutCB : public CancelableContext { + NotifyRef notif; + bool canceled; // protected by notif lock +public: + NotifyTimeoutCB(NotifyRef notif) : notif(notif) {} + void finish(int) { + notif->osd->watch_lock.Unlock(); + notif->lock.Lock(); + if (!canceled) + notif->do_timeout(); // drops lock + else + notif->lock.Unlock(); + notif->osd->watch_lock.Lock(); + } + void cancel() { + assert(notif->lock.is_locked_by_me()); + canceled = true; + } +}; + +void Notify::do_timeout() { - map<entity_name_t, WatcherState>::iterator iter = notif->watchers.find(watcher); + assert(lock.is_locked_by_me()); + dout(10) << "timeout" << dendl; + cb = NULL; + if (is_discarded()) { + lock.Unlock(); + return; + } - if (iter == notif->watchers.end()) // client was not suppose to ack this notification - return false; + in_progress_watchers = 0; // we give up TODO: we should return an error code + maybe_complete_notify(); + assert(complete); + set<WatchRef> _watchers; + _watchers.swap(watchers); + lock.Unlock(); - notif->watchers.erase(iter); + for (set<WatchRef>::iterator i = _watchers.begin(); + i != _watchers.end(); + ++i) { + boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg()); + pg->lock(); + if (!(*i)->is_discarded()) { + (*i)->cancel_notify(self.lock()); + } + pg->unlock(); + } +} - return notif->watchers.empty(); // true if there are no more watchers +void Notify::register_cb() +{ + assert(lock.is_locked_by_me()); + { + osd->watch_lock.Lock(); + cb = new NotifyTimeoutCB(self.lock()); + osd->watch_timer.add_event_after( + timeout, + cb); + osd->watch_lock.Unlock(); + } +} + +void Notify::unregister_cb() +{ + assert(lock.is_locked_by_me()); + if (!cb) + return; + cb->cancel(); + { + osd->watch_lock.Lock(); + osd->watch_timer.cancel_event(cb); + cb = NULL; + osd->watch_lock.Unlock(); + } +} + +void Notify::start_watcher(WatchRef watch) +{ + Mutex::Locker l(lock); + dout(10) << "start_watcher" << dendl; + watchers.insert(watch); +} + +void Notify::complete_watcher(WatchRef watch) +{ + Mutex::Locker l(lock); + dout(10) << "complete_watcher" << dendl; + if (is_discarded()) + return; + assert(in_progress_watchers > 0); + watchers.erase(watch); + --in_progress_watchers; + maybe_complete_notify(); +} + +void Notify::maybe_complete_notify() +{ + dout(10) << "maybe_complete_notify -- " + << in_progress_watchers + << " in progress watchers " << dendl; + if (!in_progress_watchers) { + MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id, + WATCH_NOTIFY, payload)); + osd->send_message_osd_client(reply, client.get()); + unregister_cb(); + complete = true; + } +} + +void Notify::discard() +{ + Mutex::Locker l(lock); + discarded = true; + unregister_cb(); + watchers.clear(); +} + +void Notify::init() +{ + Mutex::Locker l(lock); + register_cb(); + maybe_complete_notify(); + assert(in_progress_watchers == watchers.size()); +} + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, watch.get()) + +static ostream& _prefix( + std::ostream* _dout, + Watch *watch) { + return *_dout << watch->gen_dbg_prefix(); +} + +class HandleWatchTimeout : public CancelableContext { + WatchRef watch; +public: + bool canceled; // protected by watch->pg->lock + HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} + void cancel() { + canceled = true; + } + void finish(int) { assert(0); /* not used */ } + void complete(int) { + dout(10) << "HandleWatchTimeout" << dendl; + boost::intrusive_ptr<ReplicatedPG> pg(watch->pg); + OSDService *osd(watch->osd); + osd->watch_lock.Unlock(); + pg->lock(); + watch->cb = NULL; + if (!watch->is_discarded() && !canceled) + watch->pg->handle_watch_timeout(watch); + delete this; // ~Watch requires pg lock! + pg->unlock(); + osd->watch_lock.Lock(); + } +}; + +class HandleDelayedWatchTimeout : public CancelableContext { + WatchRef watch; +public: + bool canceled; + HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} + void cancel() { + canceled = true; + } + void finish(int) { + dout(10) << "HandleWatchTimeoutDelayed" << dendl; + assert(watch->pg->is_locked()); + watch->cb = NULL; + if (!watch->is_discarded() && !canceled) + watch->pg->handle_watch_timeout(watch); + } +}; + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +string Watch::gen_dbg_prefix() { + stringstream ss; + ss << pg->gen_prefix() << " -- Watch(" + << make_pair(cookie, entity) + << ", obc->ref=" << (obc ? obc->ref : -1) << ") "; + return ss.str(); +} + +Watch::Watch( + ReplicatedPG *pg, + OSDService *osd, + ObjectContext *obc, + uint32_t timeout, + uint64_t cookie, + entity_name_t entity) + : cb(NULL), + osd(osd), + pg(pg), + obc(obc), + timeout(timeout), + cookie(cookie), + entity(entity), + discarded(false) { + obc->get(); + dout(10) << "Watch()" << dendl; +} + +Watch::~Watch() { + dout(10) << "~Watch" << dendl; + // users must have called remove() or discard() prior to this point + assert(!obc); + assert(!conn); +} + +bool Watch::connected() { return conn; } + +Context *Watch::get_delayed_cb() +{ + assert(!cb); + cb = new HandleDelayedWatchTimeout(self.lock()); + return cb; } -void Watch::C_NotifyTimeout::finish(int r) +ObjectContext *Watch::get_obc() { - osd->handle_notify_timeout(notif); + assert(obc); + obc->get(); + return obc; } -void Watch::C_WatchTimeout::finish(int r) +void Watch::register_cb() { - osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity, - expire); + Mutex::Locker l(osd->watch_lock); + dout(15) << "registering callback, timeout: " << timeout << dendl; + cb = new HandleWatchTimeout(self.lock()); + osd->watch_timer.add_event_after( + timeout, + cb); } +void Watch::unregister_cb() +{ + dout(15) << "unregister_cb" << dendl; + if (!cb) + return; + dout(15) << "actually registered, cancelling" << dendl; + cb->cancel(); + { + Mutex::Locker l(osd->watch_lock); + osd->watch_timer.cancel_event(cb); // harmless if not registered with timer + } + cb = NULL; +} + +void Watch::connect(ConnectionRef con) +{ + dout(10) << "connecting" << dendl; + conn = con; + OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv())); + sessionref->wstate.addWatch(self.lock()); + sessionref->put(); + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + send_notify(i->second); + } + unregister_cb(); +} + +void Watch::disconnect() +{ + dout(10) << "disconnect" << dendl; + conn = ConnectionRef(); + register_cb(); +} + +void Watch::discard() +{ + dout(10) << "discard" << dendl; + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + i->second->discard(); + } + discard_state(); +} + +void Watch::discard_state() +{ + assert(pg->is_locked()); + assert(!discarded); + assert(obc); + in_progress_notifies.clear(); + unregister_cb(); + discarded = true; + if (conn) { + OSD::Session* sessionref(static_cast<OSD::Session*>(conn->get_priv())); + sessionref->wstate.removeWatch(self.lock()); + sessionref->put(); + conn = ConnectionRef(); + } + pg->put_object_context(obc); + obc = NULL; +} + +bool Watch::is_discarded() +{ + return discarded; +} + +void Watch::remove() +{ + dout(10) << "remove" << dendl; + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + i->second->complete_watcher(self.lock()); + } + discard_state(); +} + +void Watch::start_notify(NotifyRef notif) +{ + dout(10) << "start_notify " << notif->notify_id << dendl; + assert(in_progress_notifies.find(notif->notify_id) == + in_progress_notifies.end()); + in_progress_notifies[notif->notify_id] = notif; + notif->start_watcher(self.lock()); + if (connected()) + send_notify(notif); +} + +void Watch::cancel_notify(NotifyRef notif) +{ + dout(10) << "cancel_notify " << notif->notify_id << dendl; + in_progress_notifies.erase(notif->notify_id); +} + +void Watch::send_notify(NotifyRef notif) +{ + dout(10) << "send_notify" << dendl; + MWatchNotify *notify_msg = new MWatchNotify( + cookie, notif->version, notif->notify_id, + WATCH_NOTIFY, notif->payload); + osd->send_message_osd_client(notify_msg, conn.get()); +} + +void Watch::notify_ack(uint64_t notify_id) +{ + dout(10) << "notify_ack" << dendl; + map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id); + if (i != in_progress_notifies.end()) { + i->second->complete_watcher(self.lock()); + in_progress_notifies.erase(i); + } +} + +WatchRef Watch::makeWatchRef( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity) +{ + WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity)); + ret->set_self(ret); + return ret; +} + +void WatchConState::addWatch(WatchRef watch) +{ + Mutex::Locker l(lock); + watches.insert(watch); +} + +void WatchConState::removeWatch(WatchRef watch) +{ + Mutex::Locker l(lock); + watches.erase(watch); +} + +void WatchConState::reset() +{ + set<WatchRef> _watches; + { + Mutex::Locker l(lock); + _watches.swap(watches); + } + for (set<WatchRef>::iterator i = _watches.begin(); + i != _watches.end(); + ++i) { + boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg()); + pg->lock(); + if (!(*i)->is_discarded()) { + (*i)->disconnect(); + } + pg->unlock(); + } +} diff --git a/src/osd/Watch.h b/src/osd/Watch.h index cb48de4d426..089350f35bb 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -11,95 +11,252 @@ * Foundation. See file COPYING. * */ - - #ifndef CEPH_WATCH_H #define CEPH_WATCH_H -#include <map> +#include <boost/intrusive_ptr.hpp> +#include <tr1/memory> +#include <set> -#include "OSD.h" -#include "common/config.h" +#include "msg/Messenger.h" +#include "include/Context.h" +#include "common/Mutex.h" + +enum WatcherState { + WATCHER_PENDING, + WATCHER_NOTIFIED, +}; +class OSDService; +class ReplicatedPG; +void intrusive_ptr_add_ref(ReplicatedPG *pg); +void intrusive_ptr_release(ReplicatedPG *pg); +class ObjectContext; class MWatchNotify; -/* keeps track and accounts sessions, watchers and notifiers */ -class Watch { - uint64_t notif_id; +class Watch; +typedef std::tr1::shared_ptr<Watch> WatchRef; +typedef std::tr1::weak_ptr<Watch> WWatchRef; -public: - enum WatcherState { - WATCHER_PENDING, - WATCHER_NOTIFIED, - }; - - struct Notification { - std::map<entity_name_t, WatcherState> watchers; - entity_name_t name; - uint64_t id; - OSD::Session *session; - uint64_t cookie; - MWatchNotify *reply; - Context *timeout; - void *obc; - pg_t pgid; - bufferlist bl; - - void add_watcher(const entity_name_t& name, WatcherState state) { - watchers[name] = state; - } - - Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) - : name(n), id(0), session(s), cookie(c), reply(0), timeout(0), - obc(0), bl(b) { } - }; - - class C_NotifyTimeout : public Context { - OSD *osd; - Notification *notif; - public: - C_NotifyTimeout(OSD *_osd, Notification *_notif) : osd(_osd), notif(_notif) {} - void finish(int r); - }; - - class C_WatchTimeout : public Context { - OSD *osd; - void *obc; - void *pg; - entity_name_t entity; - public: - utime_t expire; - C_WatchTimeout(OSD *_osd, void *_obc, void *_pg, - entity_name_t _entity, utime_t _expire) : - osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {} - void finish(int r); - }; - -private: - std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */ +class Notify; +typedef std::tr1::shared_ptr<Notify> NotifyRef; +typedef std::tr1::weak_ptr<Notify> WNotifyRef; -public: - Watch() : notif_id(0) {} +class CancelableContext; - void add_notification(Notification *notif) { - notif->id = ++notif_id; - notifs[notif->id] = notif; +/** + * Notify tracks the progress of a particular notify + * + * References are held by Watch and the timeout callback. + */ +class NotifyTimeoutCB; +class Notify { + friend class NotifyTimeoutCB; + friend class Watch; + WNotifyRef self; + ConnectionRef client; + unsigned in_progress_watchers; + bool complete; + bool discarded; + set<WatchRef> watchers; + + bufferlist payload; + uint32_t timeout; + uint64_t cookie; + uint64_t notify_id; + uint64_t version; + + OSDService *osd; + CancelableContext *cb; + Mutex lock; + + + /// true if this notify is being discarded + bool is_discarded() { + return discarded || complete; } - Notification *get_notif(uint64_t id) { - map<uint64_t, Notification *>::iterator iter = notifs.find(id); - if (iter != notifs.end()) - return iter->second; - return NULL; + + /// Sends notify completion if in_progress_watchers == 0 + void maybe_complete_notify(); + + /// Called on Notify timeout + void do_timeout(); + + Notify( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd); + + /// registers a timeout callback with the watch_timer + void register_cb(); + + /// removes the timeout callback, called on completion or cancellation + void unregister_cb(); +public: + string gen_dbg_prefix() { + stringstream ss; + ss << "Notify(" << make_pair(cookie, notify_id) << " " + << " in_progress_watchers=" << in_progress_watchers + << ") "; + return ss.str(); } - void remove_notification(Notification *notif) { - map<uint64_t, Notification *>::iterator iter = notifs.find(notif->id); - if (iter != notifs.end()) - notifs.erase(iter); + void set_self(NotifyRef _self) { + self = _self; } + static NotifyRef makeNotifyRef( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd); + + /// Call after creation to initialize + void init(); + + /// Called once per watcher prior to init() + void start_watcher( + WatchRef watcher ///< [in] watcher to complete + ); - bool ack_notification(entity_name_t& watcher, Notification *notif); + /// Called once per NotifyAck + void complete_watcher( + WatchRef watcher ///< [in] watcher to complete + ); + + /// Called when the notify is canceled due to a new peering interval + void discard(); }; +/** + * Watch is a mapping between a Connection and an ObjectContext + * + * References are held by ObjectContext and the timeout callback + */ +class HandleWatchTimeout; +class HandleDelayedWatchTimeout; +class Watch { + WWatchRef self; + friend class HandleWatchTimeout; + friend class HandleDelayedWatchTimeout; + ConnectionRef conn; + CancelableContext *cb; + + OSDService *osd; + boost::intrusive_ptr<ReplicatedPG> pg; + ObjectContext *obc; + + std::map<uint64_t, NotifyRef> in_progress_notifies; + + uint32_t timeout; + uint64_t cookie; + entity_name_t entity; + bool discarded; + Watch( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, + uint64_t cookie, entity_name_t entity); + + /// Registers the timeout callback with watch_timer + void register_cb(); + + /// Unregisters the timeout callback + void unregister_cb(); + + /// send a Notify message when connected for notif + void send_notify(NotifyRef notif); + + /// Cleans up state on discard or remove (including Connection state, obc) + void discard_state(); +public: + /// NOTE: must be called with pg lock held + ~Watch(); + + string gen_dbg_prefix(); + static WatchRef makeWatchRef( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity); + void set_self(WatchRef _self) { + self = _self; + } + + /// Does not grant a ref count! + boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; } + + /// Grants a ref count! + ObjectContext *get_obc(); + uint64_t get_cookie() const { return cookie; } + entity_name_t get_entity() const { return entity; } + + /// Generates context for use if watch timeout is delayed by scrub or recovery + Context *get_delayed_cb(); + + /// True if currently connected + bool connected(); + + /// Transitions Watch to connected, unregister_cb, resends pending Notifies + void connect( + ConnectionRef con ///< [in] Reference to new connection + ); + + /// Transitions watch to disconnected, register_cb + void disconnect(); + + /// Called if Watch state is discarded due to new peering interval + void discard(); + + /// True if removed or discarded + bool is_discarded(); + + /// Called on unwatch + void remove(); + + /// Adds notif as in-progress notify + void start_notify( + NotifyRef notif ///< [in] Reference to new in-progress notify + ); + + /// Removes timed out notify + void cancel_notify( + NotifyRef notif ///< [in] notify which timed out + ); + + /// Call when notify_ack received on notify_id + void notify_ack( + uint64_t notify_id ///< [in] id of acked notify + ); +}; + +/** + * Holds weak refs to Watch structures corresponding to a connection + * Lives in the OSD::Session object of an OSD connection + */ +class WatchConState { + Mutex lock; + std::set<WatchRef> watches; +public: + WatchConState() : lock("WatchConState") {} + + /// Add a watch + void addWatch( + WatchRef watch ///< [in] Ref to new watch object + ); + + /// Remove a watch + void removeWatch( + WatchRef watch ///< [in] Ref to watch object to remove + ); + + /// Called on session reset, disconnects watchers + void reset(); +}; #endif diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 2136191bc19..6953f182000 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2491,7 +2491,14 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid, void object_info_t::encode(bufferlist& bl) const { - ENCODE_START(10, 8, bl); + map<entity_name_t, watch_info_t> old_watchers; + for (map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator i = + watchers.begin(); + i != watchers.end(); + ++i) { + old_watchers.insert(make_pair(i->first.second, i->second)); + } + ENCODE_START(11, 8, bl); ::encode(soid, bl); ::encode(oloc, bl); ::encode(category, bl); @@ -2507,15 +2514,17 @@ void object_info_t::encode(bufferlist& bl) const ::encode(truncate_seq, bl); ::encode(truncate_size, bl); ::encode(lost, bl); - ::encode(watchers, bl); + ::encode(old_watchers, bl); ::encode(user_version, bl); ::encode(uses_tmap, bl); + ::encode(watchers, bl); ENCODE_FINISH(bl); } void object_info_t::decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(10, 8, 8, bl); + DECODE_START_LEGACY_COMPAT_LEN(11, 8, 8, bl); + map<entity_name_t, watch_info_t> old_watchers; if (struct_v >= 2 && struct_v <= 5) { sobject_t obj; ::decode(obj, bl); @@ -2549,7 +2558,7 @@ void object_info_t::decode(bufferlist::iterator& bl) else lost = false; if (struct_v >= 4) { - ::decode(watchers, bl); + ::decode(old_watchers, bl); ::decode(user_version, bl); } if (struct_v >= 9) @@ -2558,6 +2567,17 @@ void object_info_t::decode(bufferlist::iterator& bl) uses_tmap = true; if (struct_v < 10) soid.pool = oloc.pool; + if (struct_v >= 11) { + ::decode(watchers, bl); + } else { + for (map<entity_name_t, watch_info_t>::iterator i = old_watchers.begin(); + i != old_watchers.end(); + ++i) { + watchers.insert( + make_pair( + make_pair(i->second.cookie, i->first), i->second)); + } + } DECODE_FINISH(bl); } @@ -2584,9 +2604,10 @@ void object_info_t::dump(Formatter *f) const f->dump_unsigned("truncate_seq", truncate_seq); f->dump_unsigned("truncate_size", truncate_size); f->open_object_section("watchers"); - for (map<entity_name_t,watch_info_t>::const_iterator p = watchers.begin(); p != watchers.end(); ++p) { + for (map<pair<uint64_t, entity_name_t>,watch_info_t>::const_iterator p = + watchers.begin(); p != watchers.end(); ++p) { stringstream ss; - ss << p->first; + ss << p->first.second; f->open_object_section(ss.str().c_str()); p->second.dump(f); f->close_section(); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 13099bd1bfe..008fe8e9ded 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -27,7 +27,7 @@ #include "common/snap_types.h" #include "common/Formatter.h" #include "os/hobject.h" - +#include "Watch.h" #define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v026" @@ -1728,7 +1728,6 @@ static inline ostream& operator<<(ostream& out, const notify_info_t& n) { } - struct object_info_t { hobject_t soid; object_locator_t oloc; @@ -1748,7 +1747,7 @@ struct object_info_t { uint64_t truncate_seq, truncate_size; - map<entity_name_t, watch_info_t> watchers; + map<pair<uint64_t, entity_name_t>, watch_info_t> watchers; bool uses_tmap; void copy_user_bits(const object_info_t& other); @@ -1823,9 +1822,7 @@ public: set<ObjectContext*> blocking; // objects whose writes we block // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. - map<entity_name_t, OSD::Session *> watchers; - map<entity_name_t, Watch::C_WatchTimeout *> unconnected_watchers; - map<Watch::Notification *, bool> notifs; + map<pair<uint64_t, entity_name_t>, WatchRef> watchers; ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_) : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_), |