diff options
author | Samuel Just <sam.just@inktank.com> | 2013-02-20 13:29:31 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-02-20 13:29:31 -0800 |
commit | b531aa3688d9e8831837c23abac0bdaba04ec793 (patch) | |
tree | 3b383cd25a7c96b5d8ea051a11f2b31e5fcd1ae3 | |
parent | 8713f18da8f24a4c0ab56aa98764d0845c1ebe46 (diff) | |
parent | 0202bf29035a93cd31ece08844ef3657f20e43b5 (diff) | |
download | ceph-b531aa3688d9e8831837c23abac0bdaba04ec793.tar.gz |
Merge branch 'wip_watch_cleanup'
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | doc/dev/osd_internals/watch_notify.rst | 78 | ||||
-rw-r--r-- | src/common/Timer.cc | 3 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.cc | 16 | ||||
-rw-r--r-- | src/librados/IoCtxImpl.h | 5 | ||||
-rw-r--r-- | src/librados/RadosClient.cc | 9 | ||||
-rw-r--r-- | src/librados/RadosClient.h | 3 | ||||
-rw-r--r-- | src/osd/OSD.cc | 141 | ||||
-rw-r--r-- | src/osd/OSD.h | 20 | ||||
-rw-r--r-- | src/osd/PG.cc | 1 | ||||
-rw-r--r-- | src/osd/PG.h | 11 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 535 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 145 | ||||
-rw-r--r-- | src/osd/Watch.cc | 453 | ||||
-rw-r--r-- | src/osd/Watch.h | 303 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 33 | ||||
-rw-r--r-- | src/osd/osd_types.h | 108 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 4 |
17 files changed, 1100 insertions, 768 deletions
diff --git a/doc/dev/osd_internals/watch_notify.rst b/doc/dev/osd_internals/watch_notify.rst new file mode 100644 index 00000000000..e5f48b78a68 --- /dev/null +++ b/doc/dev/osd_internals/watch_notify.rst @@ -0,0 +1,78 @@ +============ +Watch Notify +============ + +See librados for the watch/notify interface. + +Overview +-------- +The object_info (See osd/osd_types.h) tracks the set of watchers for +a particular object persistently in the object_info_t::watchers map. +In order to track notify progress, we also maintain some ephemeral +structures associated with the ObjectContext. + +Each Watch has an associated Watch object (See osd/Watch.h). The +ObjectContext for a watched object will have a (strong) reference +to one Watch object per watch, and each Watch object holds a +reference to the corresponding ObjectContext. This circular reference +is deliberate and is broken when the Watch state is discarded on +a new peering interval or removed upon timeout expiration or an +unwatch operation. + +A watch tracks the associated connection via a strong +ConnectionRef Watch::conn. The associated connection has a +WatchConState stashed in the OSD::Session for tracking associated +Watches in order to be able to notify them upon ms_handle_reset() +(via WatchConState::reset()). + +Each Watch object tracks the set of currently un-acked notifies. +start_notify() on a Watch object adds a reference to a new in-progress +Notify to the Watch and either: + * if the Watch is *connected*, sends a Notify message to the client + * if the Watch is *unconnected*, does nothing. +When the Watch becomes connected (in ReplicatedPG::do_osd_op_effects), +Notifies are resent to all remaining tracked Notify objects. + +Each Notify object tracks the set of un-notified Watchers via +calls to complete_watcher(). Once the remaining set is empty or the +timeout expires (cb, registered in init()) a notify completion +is sent to the client. + +Watch Lifecycle +--------------- +A watch may be in one of 5 states: + 1. Non existent. + 2. On disk, but not registered with an object context. + 3. Connected + 4. Disconnected, callback registered with timer + 5. Disconnected, callback in queue for scrub or is_degraded + +Case 2 occurs between when an OSD goes active and the ObjectContext +for an object with watchers is loaded into memory due to an access. +During Case 2, no state is registered for the watch. Case 2 +transitions to Case 4 in ReplicatedPG::populate_obc_watchers() during +ReplicatedPG::find_object_context. Case 1 becomes case 3 via +OSD::do_osd_op_effects due to a watch operation. Case 4,5 become case +3 in the same way. Case 3 becomes case 4 when the connection resets +on a watcher's session. + +Cases 4&5 can use some explanation. Normally, when a Watch enters Case +4, a callback is registered with the OSDService::watch_timer to be +called at timeout expiration. At the time that the callback is +called, however, the pg might be in a state where it cannot write +to the object in order to remove the watch (i.e., during a scrub +or while the object is degraded). In that case, we use +Watch::get_delayed_cb() to generate another Context for use from +the callbacks_for_degraded_object and Scrubber::callbacks lists. +In either case, Watch::unregister_cb() does the right thing +(SafeTimer::cancel_event() is harmless for contexts not registered +with the timer). + +Notify Lifecycle +---------------- +The notify timeout is simpler: a timeout callback is registered when +the notify is init()'d. If all watchers ack notifies before the +timeout occurs, the timeout is canceled and the client is notified +of the notify completion. Otherwise, the timeout fires, the Notify +object pings each Watch via cancel_notify to remove itself, and +sends the notify completion to the client early. diff --git a/src/common/Timer.cc b/src/common/Timer.cc index ac0550c768f..f90d9ab9694 100644 --- a/src/common/Timer.cc +++ b/src/common/Timer.cc @@ -102,8 +102,7 @@ void SafeTimer::timer_thread() if (!safe_callbacks) lock.Unlock(); - callback->finish(0); - delete callback; + callback->complete(0); if (!safe_callbacks) lock.Lock(); } diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index ea63b2a59b4..800e27f90b6 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1403,7 +1403,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, lock->Lock(); WatchContext *wc = new WatchContext(this, oid, ctx); - client->register_watcher(wc, oid, ctx, cookie); + client->register_watcher(wc, cookie); prepare_assert_ops(&rd); rd.watch(*cookie, ver, 1); bufferlist bl; @@ -1431,12 +1431,14 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, /* this is called with IoCtxImpl::lock held */ -int librados::IoCtxImpl::_notify_ack(const object_t& oid, - uint64_t notify_id, uint64_t ver) +int librados::IoCtxImpl::_notify_ack( + const object_t& oid, + uint64_t notify_id, uint64_t ver, + uint64_t cookie) { ::ObjectOperation rd; prepare_assert_ops(&rd); - rd.notify_ack(notify_id, ver); + rd.notify_ack(notify_id, ver, cookie); objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); return 0; @@ -1491,7 +1493,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b lock->Lock(); WatchContext *wc = new WatchContext(this, oid, ctx); - client->register_watcher(wc, oid, ctx, &cookie); + client->register_watcher(wc, &cookie); uint32_t prot_ver = 1; uint32_t timeout = notify_timeout; ::encode(prot_ver, inbl); @@ -1687,7 +1689,7 @@ void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode, librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, const object_t& _oc, librados::WatchCtx *_ctx) - : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) + : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0) { io_ctx_impl->get(); } @@ -1706,7 +1708,7 @@ void librados::WatchContext::notify(Mutex *client_lock, ctx->notify(opcode, ver, payload); if (opcode != WATCH_NOTIFY_COMPLETE) { client_lock->Lock(); - io_ctx_impl->_notify_ack(oid, notify_id, ver); + io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie); client_lock->Unlock(); } } diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 33d1f3ebd14..c5b14cab8e3 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -194,7 +194,9 @@ struct librados::IoCtxImpl { int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); int unwatch(const object_t& oid, uint64_t cookie); int notify(const object_t& oid, uint64_t ver, bufferlist& bl); - int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t ver); + int _notify_ack( + const object_t& oid, uint64_t notify_id, uint64_t ver, + uint64_t cookie); eversion_t last_version(); void set_assert_version(uint64_t ver); @@ -217,6 +219,7 @@ struct WatchContext : public RefCountedWaitObject { const object_t oid; librados::WatchCtx *ctx; uint64_t linger_id; + uint64_t cookie; WatchContext(IoCtxImpl *io_ctx_impl_, const object_t& _oc, diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 7e76b65694d..feb0dfc3602 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -478,14 +478,11 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti return r; } -void librados::RadosClient::register_watcher(WatchContext *wc, - const object_t& oid, - librados::WatchCtx *ctx, - uint64_t *cookie) +void librados::RadosClient::register_watcher(WatchContext *wc, uint64_t *cookie) { assert(lock.is_locked()); - *cookie = ++max_watch_cookie; - watchers[*cookie] = wc; + wc->cookie = *cookie = ++max_watch_cookie; + watchers[wc->cookie] = wc; } void librados::RadosClient::unregister_watcher(uint64_t cookie) diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index 1f39f22fb3f..6dd6d109c64 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -97,8 +97,7 @@ public: uint64_t max_watch_cookie; map<uint64_t, librados::WatchContext *> watchers; - void register_watcher(librados::WatchContext *wc, const object_t& oid, - librados::WatchCtx *ctx, uint64_t *cookie); + void register_watcher(librados::WatchContext *wc, uint64_t *cookie); void unregister_watcher(uint64_t cookie); void watch_notify(MWatchNotify *m); void get(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index b20e6d690f2..66f976b1c15 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -168,7 +168,6 @@ OSDService::OSDService(OSD *osd) : scrubs_active(0), watch_lock("OSD::watch_lock"), watch_timer(osd->client_messenger->cct, watch_lock), - watch(NULL), backfill_request_lock("OSD::backfill_request_lock"), backfill_request_timer(g_ceph_context, backfill_request_lock, false), last_tid(0), @@ -252,15 +251,12 @@ void OSDService::shutdown() watch_lock.Lock(); watch_timer.shutdown(); watch_lock.Unlock(); - - delete watch; } void OSDService::init() { reserver_finisher.start(); watch_timer.init(); - watch = new Watch(); } ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev) @@ -2571,152 +2567,17 @@ void OSD::ms_handle_connect(Connection *con) } } -void OSD::put_object_context(void *_obc, pg_t pgid) -{ - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc; - ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(pgid); - // If pg is being deleted, (which is the only case in which - // it will be NULL) it will clean up its object contexts itself - if (pg) { - pg->put_object_context(obc); - pg->unlock(); - } -} - -void OSD::complete_notify(void *_notif, void *_obc) -{ - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc; - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "complete_notify " << notif << " got the last reply from pending watchers, can send response now" << dendl; - MWatchNotify *reply = notif->reply; - client_messenger->send_message(reply, notif->session->con); - notif->session->put(); - notif->session->con->put(); - service.watch->remove_notification(notif); - if (notif->timeout) - service.watch_timer.cancel_event(notif->timeout); - map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif); - if (iter != obc->notifs.end()) - obc->notifs.erase(iter); - delete notif; -} - -void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, ReplicatedPG *pg) -{ - assert(service.watch_lock.is_locked()); - pg->assert_locked(); - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "ack_notification " << name << " notif " << notif << " id " << notif->id << dendl; - if (service.watch->ack_notification(name, notif)) { - complete_notify(notif, _obc); - pg->put_object_context(static_cast<ReplicatedPG::ObjectContext *>(_obc)); - } -} - -void OSD::handle_watch_timeout(void *obc, - ReplicatedPG *pg, - entity_name_t entity, - utime_t expire) -{ - // watch_lock is inside pg->lock; handle_watch_timeout checks for the race. - service.watch_lock.Unlock(); - pg->lock(); - service.watch_lock.Lock(); - - pg->handle_watch_timeout(obc, entity, expire); - pg->unlock(); - pg->put(); -} - -void OSD::disconnect_session_watches(Session *session) -{ - // get any watched obc's - map<ReplicatedPG::ObjectContext *, pg_t> obcs; - service.watch_lock.Lock(); - for (map<void *, pg_t>::iterator iter = session->watches.begin(); iter != session->watches.end(); ++iter) { - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)iter->first; - obcs[obc] = iter->second; - } - service.watch_lock.Unlock(); - - for (map<ReplicatedPG::ObjectContext *, pg_t>::iterator oiter = obcs.begin(); oiter != obcs.end(); ++oiter) { - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first; - dout(10) << "obc=" << (void *)obc << dendl; - - ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second)); - if (!pg) { - /* pg removed between watch_unlock.Unlock() and now, all related - * watch structures would have been cleaned up in remove_watchers_and_notifies - */ - continue; - } - service.watch_lock.Lock(); - - if (!session->watches.count((void*)obc)) { - // Raced with watch removal, obc is invalid - service.watch_lock.Unlock(); - pg->unlock(); - continue; - } - - /* NOTE! fix this one, should be able to just lookup entity name, - however, we currently only keep EntityName on the session and not - entity_name_t. */ - map<entity_name_t, Session *>::iterator witer = obc->watchers.begin(); - while (1) { - while (witer != obc->watchers.end() && witer->second == session) { - dout(10) << "removing watching session entity_name=" << session->entity_name - << " from " << obc->obs.oi << dendl; - entity_name_t entity = witer->first; - watch_info_t& w = obc->obs.oi.watchers[entity]; - utime_t expire = ceph_clock_now(g_ceph_context); - expire += w.timeout_seconds; - pg->register_unconnected_watcher(obc, entity, expire); - dout(10) << " disconnected watch " << w << " by " << entity << " session " << session - << ", expires " << expire << dendl; - obc->watchers.erase(witer++); - pg->put_object_context(obc); - session->con->put(); - session->put(); - } - if (witer == obc->watchers.end()) - break; - ++witer; - } - service.watch_lock.Unlock(); - pg->unlock(); - } -} - bool OSD::ms_handle_reset(Connection *con) { dout(1) << "OSD::ms_handle_reset()" << dendl; OSD::Session *session = (OSD::Session *)con->get_priv(); if (!session) return false; - disconnect_session_watches(session); + session->wstate.reset(); session->put(); return true; } -void OSD::handle_notify_timeout(void *_notif) -{ - assert(service.watch_lock.is_locked()); - Watch::Notification *notif = (Watch::Notification *)_notif; - dout(10) << "OSD::handle_notify_timeout notif " << notif << " id " << notif->id << dendl; - - ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)notif->obc; - - pg_t pgid = notif->pgid; - - complete_notify(_notif, obc); - service.watch_lock.Unlock(); /* drop lock to change locking order */ - - put_object_context(obc, pgid); - service.watch_lock.Lock(); - /* exiting with watch_lock held */ -} - struct C_OSD_GetVersion : public Context { OSD *osd; uint64_t oldest, newest; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 1837195d339..25e1eee13ad 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -48,6 +48,7 @@ using namespace std; #include <ext/hash_set> using namespace __gnu_cxx; +#include "Watch.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "common/sharedptr_registry.hpp" @@ -295,7 +296,11 @@ public: // -- Watch -- Mutex watch_lock; SafeTimer watch_timer; - Watch *watch; + uint64_t next_notif_id; + uint64_t get_next_id(epoch_t cur_epoch) { + Mutex::Locker l(watch_lock); + return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); + } // -- Backfill Request Scheduling -- Mutex backfill_request_lock; @@ -526,7 +531,7 @@ public: int64_t auid; epoch_t last_sent_epoch; Connection *con; - std::map<void *, pg_t> watches; + WatchConState wstate; Session() : auid(-1), last_sent_epoch(0), con(0) {} }; @@ -1468,17 +1473,6 @@ public: int init_op_flags(OpRequestRef op); - - void put_object_context(void *_obc, pg_t pgid); - void complete_notify(void *notif, void *obc); - void ack_notification(entity_name_t& peer_addr, void *notif, void *obc, - ReplicatedPG *pg); - void handle_notify_timeout(void *notif); - void disconnect_session_watches(Session *session); - void handle_watch_timeout(void *obc, - ReplicatedPG *pg, - entity_name_t entity, - utime_t expire); OSDService service; friend class OSDService; }; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 3c10230c41a..09d27a6a037 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -5436,6 +5436,7 @@ PG::RecoveryState::Reset::react(const FlushedEvt&) { PG *pg = context< RecoveryMachine >().pg; pg->flushed = true; + pg->on_flushed(); pg->requeue_ops(pg->waiting_for_active); return discard_event(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index ec3d4664a90..ed85c4e2946 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1916,17 +1916,8 @@ public: virtual void on_role_change() = 0; virtual void on_change() = 0; virtual void on_activate() = 0; + virtual void on_flushed() = 0; virtual void on_shutdown() = 0; - virtual void remove_watchers_and_notifies() = 0; - - virtual void register_unconnected_watcher(void *obc, - entity_name_t entity, - utime_t expire) = 0; - virtual void unregister_unconnected_watcher(void *obc, - entity_name_t entity) = 0; - virtual void handle_watch_timeout(void *obc, - entity_name_t entity, - utime_t expire) = 0; }; WRITE_CLASS_ENCODER(PG::OndiskLog) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d23db2884ed..83502de7d12 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1567,94 +1567,6 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr) } } -void ReplicatedPG::dump_watchers(ObjectContext *obc) -{ - assert(osd->watch_lock.is_locked()); - - dout(10) << "dump_watchers " << obc->obs.oi.soid << " " << obc->obs.oi << dendl; - for (map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.begin(); - iter != obc->watchers.end(); - ++iter) - dout(10) << " * obc->watcher: " << iter->first << " session=" << iter->second << dendl; - - for (map<entity_name_t, watch_info_t>::iterator oi_iter = obc->obs.oi.watchers.begin(); - oi_iter != obc->obs.oi.watchers.end(); - oi_iter++) { - watch_info_t& w = oi_iter->second; - dout(10) << " * oi->watcher: " << oi_iter->first << " cookie=" << w.cookie << dendl; - } -} - -void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity) -{ - assert_locked(); - assert(osd->watch_lock.is_locked()); - dout(10) << "remove_watcher " << *obc << " " << entity << dendl; - map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity); - assert(iter != obc->watchers.end()); - OSD::Session *session = iter->second; - dout(10) << "remove_watcher removing session " << session << dendl; - - obc->watchers.erase(iter); - assert(session->watches.count(obc)); - session->watches.erase(obc); - - put_object_context(obc); - session->con->put(); - session->put(); -} - -void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif) -{ - assert_locked(); - assert(osd->watch_lock.is_locked()); - map<Watch::Notification *, bool>::iterator niter = obc->notifs.find(notif); - - // Cancel notification - if (notif->timeout) - osd->watch_timer.cancel_event(notif->timeout); - osd->watch->remove_notification(notif); - - assert(niter != obc->notifs.end()); - - niter->first->session->con->put(); - niter->first->session->put(); - obc->notifs.erase(niter); - - put_object_context(obc); - delete notif; -} - -void ReplicatedPG::remove_watchers_and_notifies() -{ - assert_locked(); - - dout(10) << "remove_watchers" << dendl; - - osd->watch_lock.Lock(); - for (map<hobject_t, ObjectContext*>::iterator oiter = object_contexts.begin(); - oiter != object_contexts.end(); - ) { - map<hobject_t, ObjectContext *>::iterator iter = oiter++; - ObjectContext *obc = iter->second; - obc->ref++; - for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin(); - witer != obc->watchers.end(); - remove_watcher(obc, (witer++)->first)) ; - for (map<entity_name_t,Watch::C_WatchTimeout*>::iterator iter = obc->unconnected_watchers.begin(); - iter != obc->unconnected_watchers.end(); - ) { - map<entity_name_t,Watch::C_WatchTimeout*>::iterator i = iter++; - unregister_unconnected_watcher(obc, i->first); - } - for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin(); - niter != obc->notifs.end(); - remove_notify(obc, (niter++)->first)) ; - put_object_context(obc); - } - osd->watch_lock.Unlock(); -} - // ======================================================================== // low level osd ops @@ -2329,20 +2241,20 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) case CEPH_OSD_OP_NOTIFY_ACK: { - osd->watch_lock.Lock(); - entity_name_t source = ctx->op->request->get_source(); - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source); - Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie); - if (oi_iter != oi.watchers.end() && notif) { - ctx->notify_acks.push_back(op.watch.cookie); - } else { - if (!notif) - dout(10) << " no pending notify for cookie " << op.watch.cookie << dendl; - else - dout(10) << " not registered as a watcher" << dendl; - result = -EINVAL; + try { + uint64_t notify_id = 0; + uint64_t watch_cookie = 0; + ::decode(notify_id, bp); + ::decode(watch_cookie, bp); + OpContext::NotifyAck ack(notify_id, watch_cookie); + ctx->notify_acks.push_back(ack); + } catch (const buffer::error &e) { + OpContext::NotifyAck ack( + // op.watch.cookie is actually the notify_id for historical reasons + op.watch.cookie + ); + ctx->notify_acks.push_back(ack); } - osd->watch_lock.Unlock(); } break; @@ -2557,27 +2469,24 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) watch_info_t w(cookie, 30); // FIXME: where does the timeout come from? if (do_watch) { - if (oi.watchers.count(entity) && oi.watchers[entity] == w) { + if (oi.watchers.count(make_pair(cookie, entity))) { dout(10) << " found existing watch " << w << " by " << entity << dendl; } else { dout(10) << " registered new watch " << w << " by " << entity << dendl; - oi.watchers[entity] = w; + oi.watchers[make_pair(cookie, entity)] = w; t.nop(); // make sure update the object_info on disk! } - ctx->watch_connect = true; - ctx->watch_info = w; + ctx->watch_connects.push_back(w); assert(obc->registered); } else { - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity); + map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter = + oi.watchers.find(make_pair(cookie, entity)); if (oi_iter != oi.watchers.end()) { - dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl; - oi.watchers.erase(entity); + dout(10) << " removed watch " << oi_iter->second << " by " + << entity << dendl; + oi.watchers.erase(oi_iter); t.nop(); // update oi on disk - - ctx->watch_disconnect = true; - - // FIXME: trigger notifies? - + ctx->watch_disconnects.push_back(w); } else { dout(10) << " can't remove: no watch by " << entity << dendl; } @@ -3340,154 +3249,94 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum void ReplicatedPG::do_osd_op_effects(OpContext *ctx) { - if (ctx->watch_connect || ctx->watch_disconnect || - !ctx->notifies.empty() || !ctx->notify_acks.empty()) { - OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv(); - assert(session); - ObjectContext *obc = ctx->obc; - object_info_t& oi = ctx->new_obs.oi; - hobject_t& soid = oi.soid; - entity_name_t entity = ctx->reqid.name; + ConnectionRef conn(ctx->op->request->get_connection()); + boost::intrusive_ptr<OSD::Session> session( + (OSD::Session *)conn->get_priv()); + entity_name_t entity = ctx->reqid.name; - dout(10) << "do_osd_op_effects applying watch/notify effects on session " << session << dendl; - - osd->watch_lock.Lock(); - dump_watchers(obc); - - map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity); - if (ctx->watch_connect) { - watch_info_t w = ctx->watch_info; - - if (iter == obc->watchers.end()) { - dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl; - obc->watchers[entity] = session; - session->con->get(); - session->get(); - session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - obc->ref++; - } else if (iter->second == session) { - // already there - dout(10) << " already connected to " << w << " by " << entity - << " session " << session << dendl; - } else { - // weird: same entity, different session. - dout(10) << " reconnected (with different session!) watch " << w << " by " << entity - << " session " << session << " (was " << iter->second << ")" << dendl; - session->con->get(); - session->get(); - - iter->second->watches.erase(obc); - iter->second->con->put(); - iter->second->put(); - - iter->second = session; - session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - } - map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter = - obc->unconnected_watchers.find(entity); - if (un_iter != obc->unconnected_watchers.end()) { - unregister_unconnected_watcher(obc, un_iter->first); - } + dout(15) << "do_osd_op_effects on session " << session.get() << dendl; - map<Watch::Notification *, bool>::iterator niter; - for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) { - Watch::Notification *notif = niter->first; - map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity); - if (iter != notif->watchers.end()) { - /* there is a pending notification for this watcher, we should resend it anyway - even if we already sent it as it might not have received it */ - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->send_message_osd_client(notify_msg, session->con); - } - } + for (list<watch_info_t>::iterator i = ctx->watch_connects.begin(); + i != ctx->watch_connects.end(); + ++i) { + pair<uint64_t, entity_name_t> watcher(i->cookie, entity); + dout(15) << "do_osd_op_effects applying watch connect on session " + << session.get() << " watcher " << watcher << dendl; + WatchRef watch; + if (ctx->obc->watchers.count(watcher)) { + dout(15) << "do_osd_op_effects found existing watch watcher " << watcher + << dendl; + watch = ctx->obc->watchers[watcher]; + } else { + dout(15) << "do_osd_op_effects new watcher " << watcher + << dendl; + watch = Watch::makeWatchRef( + this, osd, ctx->obc, i->timeout_seconds, + i->cookie, entity); + ctx->obc->watchers.insert( + make_pair( + watcher, + watch)); } - - if (ctx->watch_disconnect) { - if (iter != obc->watchers.end()) { - remove_watcher(obc, entity); - } else { - assert(obc->unconnected_watchers.count(entity)); - unregister_unconnected_watcher(obc, entity); - } + watch->connect(conn); + } - // ack any pending notifies - map<Watch::Notification *, bool>::iterator p = obc->notifs.begin(); - while (p != obc->notifs.end()) { - Watch::Notification *notif = p->first; - entity_name_t by = entity; - p++; - assert(notif->obc == obc); - dout(10) << " acking pending notif " << notif->id << " by " << by << dendl; - // TODOSAM: osd->osd-> not good - osd->osd->ack_notification(entity, notif, obc, this); - } + for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin(); + i != ctx->watch_disconnects.end(); + ++i) { + pair<uint64_t, entity_name_t> watcher(i->cookie, entity); + dout(15) << "do_osd_op_effects applying watch disconnect on session " + << session.get() << " and watcher " << watcher << dendl; + if (ctx->obc->watchers.count(watcher)) { + WatchRef watch = ctx->obc->watchers[watcher]; + dout(10) << "do_osd_op_effects applying disconnect found watcher " + << watcher << dendl; + ctx->obc->watchers.erase(watcher); + watch->remove(); + } else { + dout(10) << "do_osd_op_effects failed to find watcher " + << watcher << dendl; } + } - for (list<notify_info_t>::iterator p = ctx->notifies.begin(); - p != ctx->notifies.end(); - ++p) { - - dout(10) << " " << *p << dendl; - - Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl); - session->get(); // notif got a reference - session->con->get(); - notif->pgid = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); - - osd->watch->add_notification(notif); - dout(20) << " notify id " << notif->id << dendl; - - // connected - for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin(); - i != obc->obs.oi.watchers.end(); - ++i) { - map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first); - if (q != obc->watchers.end()) { - entity_name_t name = q->first; - OSD::Session *s = q->second; - watch_info_t& w = obc->obs.oi.watchers[q->first]; - - notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race - - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->send_message_osd_client(notify_msg, s->con); - } else { - // unconnected - entity_name_t name = i->first; - notif->add_watcher(name, Watch::WATCHER_PENDING); - } - } - - notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl); - if (notif->watchers.empty()) { - // TODOSAM: osd->osd-> not good - osd->osd->complete_notify(notif, obc); - } else { - obc->notifs[notif] = true; - obc->ref++; - notif->obc = obc; - // TODOSAM: osd->osd not good - notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif); - osd->watch_timer.add_event_after(p->timeout, notif->timeout); - } + for (list<notify_info_t>::iterator p = ctx->notifies.begin(); + p != ctx->notifies.end(); + ++p) { + dout(10) << "do_osd_op_effects, notify " << *p << dendl; + NotifyRef notif( + Notify::makeNotifyRef( + conn, + ctx->obc->watchers.size(), + p->bl, + p->timeout, + p->cookie, + osd->get_next_id(get_osdmap()->get_epoch()), + ctx->obc->obs.oi.user_version.version, + osd)); + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = + ctx->obc->watchers.begin(); + i != ctx->obc->watchers.end(); + ++i) { + dout(10) << "starting notify on watch " << i->first << dendl; + i->second->start_notify(notif); } + notif->init(); + } - for (list<uint64_t>::iterator p = ctx->notify_acks.begin(); p != ctx->notify_acks.end(); ++p) { - uint64_t cookie = *p; - - dout(10) << " notify_ack " << cookie << dendl; - map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity); - assert(oi_iter != oi.watchers.end()); - - Watch::Notification *notif = osd->watch->get_notif(cookie); - assert(notif); - - // TODOSAM: osd->osd-> not good - osd->osd->ack_notification(entity, notif, obc, this); + for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin(); + p != ctx->notify_acks.end(); + ++p) { + dout(10) << "notify_ack " << make_pair(p->watch_cookie, p->notify_id) << dendl; + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i = + ctx->obc->watchers.begin(); + i != ctx->obc->watchers.end(); + ++i) { + if (i->first.second != entity) continue; + if (p->watch_cookie && + p->watch_cookie.get() != i->first.first) continue; + dout(10) << "acking notify on watch " << i->first << dendl; + i->second->notify_ack(p->notify_id); } - - osd->watch_lock.Unlock(); - session->put(); } } @@ -4115,113 +3964,39 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc) log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version)); dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl; - if (!obc->obs.oi.watchers.empty()) { - Mutex::Locker l(osd->watch_lock); - assert(obc->unconnected_watchers.size() == 0); - assert(obc->watchers.size() == 0); - // populate unconnected_watchers - utime_t now = ceph_clock_now(g_ceph_context); - for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin(); - p != obc->obs.oi.watchers.end(); - p++) { - utime_t expire = now; - expire += p->second.timeout_seconds; - dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl; - register_unconnected_watcher(obc, p->first, expire); - } - } -} - -void ReplicatedPG::unregister_unconnected_watcher(void *_obc, - entity_name_t entity) -{ - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - - /* If we failed to cancel the event, the event will fire and the obc - * ref and the pg ref will be taken care of */ - if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) { - put_object_context(obc); - put(); + assert(obc->watchers.size() == 0); + // populate unconnected_watchers + utime_t now = ceph_clock_now(g_ceph_context); + for (map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator p = + obc->obs.oi.watchers.begin(); + p != obc->obs.oi.watchers.end(); + p++) { + utime_t expire = now; + expire += p->second.timeout_seconds; + dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl; + WatchRef watch( + Watch::makeWatchRef( + this, osd, obc, p->second.timeout_seconds, p->first.first, p->first.second)); + watch->disconnect(); + obc->watchers.insert( + make_pair( + make_pair(p->first.first, p->first.second), + watch)); } - obc->unconnected_watchers.erase(entity); } -void ReplicatedPG::register_unconnected_watcher(void *_obc, - entity_name_t entity, - utime_t expire) +void ReplicatedPG::handle_watch_timeout(WatchRef watch) { - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - pg_t pgid = info.pgid; - pgid.set_ps(obc->obs.oi.soid.hash); - get(); - obc->ref++; - Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd, - static_cast<void *>(obc), - this, - entity, expire); - osd->watch_timer.add_event_at(expire, cb); - obc->unconnected_watchers[entity] = cb; -} - -void ReplicatedPG::handle_watch_timeout(void *_obc, - entity_name_t entity, - utime_t expire) -{ - dout(10) << "handle_watch_timeout obc " << _obc << dendl; - struct HandleWatchTimeout : public Context { - epoch_t cur_epoch; - boost::intrusive_ptr<ReplicatedPG> pg; - void *obc; - entity_name_t entity; - utime_t expire; - HandleWatchTimeout( - epoch_t cur_epoch, - ReplicatedPG *pg, - void *obc, - entity_name_t entity, - utime_t expire) : cur_epoch(cur_epoch), - pg(pg), obc(obc), entity(entity), expire(expire) { - assert(pg->is_locked()); - static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); - } - void finish(int) { - assert(pg->is_locked()); - if (cur_epoch < pg->last_peering_reset) - return; - // handle_watch_timeout gets its own ref - static_cast<ReplicatedPG::ObjectContext*>(obc)->get(); - pg->handle_watch_timeout(obc, entity, expire); - } - ~HandleWatchTimeout() { - assert(pg->is_locked()); - pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc)); - } - }; - - ObjectContext *obc = static_cast<ObjectContext *>(_obc); - - if (obc->unconnected_watchers.count(entity) == 0 || - (obc->unconnected_watchers[entity] && - obc->unconnected_watchers[entity]->expire != expire)) { - /* If obc->unconnected_watchers[entity] == NULL we know at least that - * the watcher for obc,entity should expire. We might not have been - * the intended Context*, but that's ok since the intended one will - * take this branch and assume it raced. */ - dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " - << entity << dendl; - put_object_context(obc); - return; - } + ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref + dout(10) << "handle_watch_timeout obc " << obc << dendl; if (is_degraded_object(obc->obs.oi.soid)) { callbacks_for_degraded_object[obc->obs.oi.soid].push_back( - new HandleWatchTimeout(get_osdmap()->get_epoch(), - this, _obc, entity, expire) + watch->get_delayed_cb() ); dout(10) << "handle_watch_timeout waiting for degraded on obj " << obc->obs.oi.soid << dendl; - obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! put_object_context(obc); // callback got its own ref return; } @@ -4230,15 +4005,16 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, dout(10) << "handle_watch_timeout waiting for scrub on obj " << obc->obs.oi.soid << dendl; - scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(), - this, _obc, entity, expire)); - obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one! - put_object_context(obc); // callback got its own ref + scrubber.add_callback( + watch->get_delayed_cb() // This callback! + ); + put_object_context(obc); return; } - obc->unconnected_watchers.erase(entity); - obc->obs.oi.watchers.erase(entity); + obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); + obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity())); + watch->remove(); vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); @@ -4283,7 +4059,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, eval_repop(repop); } -ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid) +ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid) { map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid); if (p != object_contexts.end()) @@ -4291,7 +4067,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_ return NULL; } -ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi, +ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi, SnapSetContext *ssc) { ObjectContext *obc = new ObjectContext(oi, false, ssc); @@ -4302,7 +4078,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_in return obc; } -ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, +ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid, const object_locator_t& oloc, bool can_create) { @@ -4354,7 +4130,24 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s void ReplicatedPG::context_registry_on_change() { - remove_watchers_and_notifies(); + list<ObjectContext *> contexts; + for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin(); + i != object_contexts.end(); + ++i) { + i->second->get(); + contexts.push_back(i->second); + for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = + i->second->watchers.begin(); + j != i->second->watchers.end(); + i->second->watchers.erase(j++)) { + j->second->discard(); + } + } + for (list<ObjectContext *>::iterator i = contexts.begin(); + i != contexts.end(); + contexts.erase(i++)) { + put_object_context(*i); + } } @@ -4529,7 +4322,7 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t * pgstat->stats.cat_sum[oi.category].add(stat); } -ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) +SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) { SnapSetContext *ssc = new SnapSetContext(oid); dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl; @@ -4538,10 +4331,10 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_ return ssc; } -ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, - const string& key, - ps_t seed, - bool can_create) +SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, + const string& key, + ps_t seed, + bool can_create) { SnapSetContext *ssc; map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid); @@ -6028,7 +5821,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid) /* Mark an object as lost */ -ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, +ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t, const hobject_t &oid, eversion_t version, utime_t mtime, int what) { @@ -6065,7 +5858,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac struct C_PG_MarkUnfoundLost : public Context { ReplicatedPG *pg; - list<ReplicatedPG::ObjectContext*> obcs; + list<ObjectContext*> obcs; C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) { pg->get(); } @@ -6246,14 +6039,19 @@ void ReplicatedPG::on_removal() { dout(10) << "on_removal" << dendl; apply_and_flush_repops(false); - remove_watchers_and_notifies(); + context_registry_on_change(); } void ReplicatedPG::on_shutdown() { dout(10) << "on_shutdown" << dendl; apply_and_flush_repops(false); - remove_watchers_and_notifies(); + context_registry_on_change(); +} + +void ReplicatedPG::on_flushed() +{ + assert(object_contexts.empty()); } void ReplicatedPG::on_activate() @@ -7581,4 +7379,5 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim& return transit< NotTrimming >(); } - +void intrusive_ptr_add_ref(ReplicatedPG *pg) { pg->get(); } +void intrusive_ptr_release(ReplicatedPG *pg) { pg->put(); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index ae506e55074..50af643163a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -14,6 +14,9 @@ #ifndef CEPH_REPLICATEDPG_H #define CEPH_REPLICATEDPG_H +#include <boost/optional.hpp> + +#include "include/assert.h" #include "PG.h" #include "OSD.h" @@ -60,6 +63,7 @@ public: class ReplicatedPG : public PG { friend class OSD; + friend class Watch; public: /* @@ -95,24 +99,6 @@ public: */ - struct SnapSetContext { - object_t oid; - int ref; - bool registered; - SnapSet snapset; - - SnapSetContext(const object_t& o) : oid(o), ref(0), registered(false) { } - }; - - struct ObjectState { - object_info_t oi; - bool exists; - - ObjectState(const object_info_t &oi_, bool exists_) - : oi(oi_), exists(exists_) {} - }; - - struct AccessMode { typedef enum { IDLE, @@ -254,81 +240,6 @@ public: } }; - - /* - * keep tabs on object modifications that are in flight. - * we need to know the projected existence, size, snapset, - * etc., because we don't send writes down to disk until after - * replicas ack. - */ - struct ObjectContext { - int ref; - bool registered; - ObjectState obs; - - SnapSetContext *ssc; // may be null - - private: - Mutex lock; - public: - Cond cond; - int unstable_writes, readers, writers_waiting, readers_waiting; - - // set if writes for this object are blocked on another objects recovery - ObjectContext *blocked_by; // object blocking our writes - set<ObjectContext*> blocking; // objects whose writes we block - - // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. - map<entity_name_t, OSD::Session *> watchers; - map<entity_name_t, Watch::C_WatchTimeout *> unconnected_watchers; - map<Watch::Notification *, bool> notifs; - - ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_) - : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_), - lock("ReplicatedPG::ObjectContext::lock"), - unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), - blocked_by(0) {} - - void get() { ++ref; } - - // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. - void ondisk_write_lock() { - lock.Lock(); - writers_waiting++; - while (readers_waiting || readers) - cond.Wait(lock); - writers_waiting--; - unstable_writes++; - lock.Unlock(); - } - void ondisk_write_unlock() { - lock.Lock(); - assert(unstable_writes > 0); - unstable_writes--; - if (!unstable_writes && readers_waiting) - cond.Signal(); - lock.Unlock(); - } - void ondisk_read_lock() { - lock.Lock(); - readers_waiting++; - while (unstable_writes) - cond.Wait(lock); - readers_waiting--; - readers++; - lock.Unlock(); - } - void ondisk_read_unlock() { - lock.Lock(); - assert(readers > 0); - readers--; - if (!readers && writers_waiting) - cond.Signal(); - lock.Unlock(); - } - }; - - /* * Capture all object state associated with an in-progress read or write. */ @@ -349,10 +260,17 @@ public: bool user_modify; // user-visible modification // side effects - bool watch_connect, watch_disconnect; - watch_info_t watch_info; + list<watch_info_t> watch_connects; + list<watch_info_t> watch_disconnects; list<notify_info_t> notifies; - list<uint64_t> notify_acks; + struct NotifyAck { + boost::optional<uint64_t> watch_cookie; + uint64_t notify_id; + NotifyAck(uint64_t notify_id) : notify_id(notify_id) {} + NotifyAck(uint64_t notify_id, uint64_t cookie) + : watch_cookie(cookie), notify_id(notify_id) {} + }; + list<NotifyAck> notify_acks; uint64_t bytes_written, bytes_read; @@ -386,7 +304,6 @@ public: op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0), new_obs(_obs->oi, _obs->exists), modify(false), user_modify(false), - watch_connect(false), watch_disconnect(false), bytes_written(0), bytes_read(0), obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { if (_ssc) { @@ -517,14 +434,9 @@ protected: map<object_t, SnapSetContext*> snapset_contexts; void populate_obc_watchers(ObjectContext *obc); - void register_unconnected_watcher(void *obc, - entity_name_t entity, - utime_t expire); - void unregister_unconnected_watcher(void *obc, - entity_name_t entity); - void handle_watch_timeout(void *obc, - entity_name_t entity, - utime_t expire); +public: + void handle_watch_timeout(WatchRef watch); +protected: ObjectContext *lookup_object_context(const hobject_t& soid) { if (object_contexts.count(soid)) { @@ -818,11 +730,6 @@ protected: void send_remove_op(const hobject_t& oid, eversion_t v, int peer); - void dump_watchers(ObjectContext *obc); - void remove_watcher(ObjectContext *obc, entity_name_t entity); - void remove_notify(ObjectContext *obc, Watch::Notification *notif); - void remove_watchers_and_notifies(); - struct RepModify { ReplicatedPG *pg; OpRequestRef op; @@ -1108,24 +1015,11 @@ public: void on_role_change(); void on_change(); void on_activate(); + void on_flushed(); void on_removal(); void on_shutdown(); }; - -inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectState& obs) -{ - out << obs.oi.soid; - if (!obs.exists) - out << "(dne)"; - return out; -} - -inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc) -{ - return out << "obc(" << obc.obs << ")"; -} - inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { out << "repgather(" << &repop @@ -1151,4 +1045,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::AccessMode& mode) return out; } +void intrusive_ptr_add_ref(ReplicatedPG *pg); +void intrusive_ptr_release(ReplicatedPG *pg); + #endif diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index da90cef4103..1ee5f35b5e3 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -1,7 +1,8 @@ - +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "PG.h" #include "include/types.h" +#include "messages/MWatchNotify.h" #include <map> @@ -11,26 +12,454 @@ #include "common/config.h" -bool Watch::ack_notification(entity_name_t& watcher, Notification *notif) +struct CancelableContext : public Context { + virtual void cancel() = 0; +}; + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +static ostream& _prefix( + std::ostream* _dout, + Notify *notify) { + return *_dout << notify->gen_dbg_prefix(); +} + +Notify::Notify( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd) + : client(client), + in_progress_watchers(num_watchers), + complete(false), + discarded(false), + payload(payload), + timeout(timeout), + cookie(cookie), + notify_id(notify_id), + version(version), + osd(osd), + cb(NULL), + lock("Notify::lock") {} + +NotifyRef Notify::makeNotifyRef( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd) { + NotifyRef ret( + new Notify( + client, num_watchers, + payload, timeout, + cookie, notify_id, + version, osd)); + ret->set_self(ret); + return ret; +} + +class NotifyTimeoutCB : public CancelableContext { + NotifyRef notif; + bool canceled; // protected by notif lock +public: + NotifyTimeoutCB(NotifyRef notif) : notif(notif) {} + void finish(int) { + notif->osd->watch_lock.Unlock(); + notif->lock.Lock(); + if (!canceled) + notif->do_timeout(); // drops lock + else + notif->lock.Unlock(); + notif->osd->watch_lock.Lock(); + } + void cancel() { + assert(notif->lock.is_locked_by_me()); + canceled = true; + } +}; + +void Notify::do_timeout() { - map<entity_name_t, WatcherState>::iterator iter = notif->watchers.find(watcher); + assert(lock.is_locked_by_me()); + dout(10) << "timeout" << dendl; + cb = NULL; + if (is_discarded()) { + lock.Unlock(); + return; + } - if (iter == notif->watchers.end()) // client was not suppose to ack this notification - return false; + in_progress_watchers = 0; // we give up TODO: we should return an error code + maybe_complete_notify(); + assert(complete); + set<WatchRef> _watchers; + _watchers.swap(watchers); + lock.Unlock(); - notif->watchers.erase(iter); + for (set<WatchRef>::iterator i = _watchers.begin(); + i != _watchers.end(); + ++i) { + boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg()); + pg->lock(); + if (!(*i)->is_discarded()) { + (*i)->cancel_notify(self.lock()); + } + pg->unlock(); + } +} - return notif->watchers.empty(); // true if there are no more watchers +void Notify::register_cb() +{ + assert(lock.is_locked_by_me()); + { + osd->watch_lock.Lock(); + cb = new NotifyTimeoutCB(self.lock()); + osd->watch_timer.add_event_after( + timeout, + cb); + osd->watch_lock.Unlock(); + } +} + +void Notify::unregister_cb() +{ + assert(lock.is_locked_by_me()); + if (!cb) + return; + cb->cancel(); + { + osd->watch_lock.Lock(); + osd->watch_timer.cancel_event(cb); + cb = NULL; + osd->watch_lock.Unlock(); + } +} + +void Notify::start_watcher(WatchRef watch) +{ + Mutex::Locker l(lock); + dout(10) << "start_watcher" << dendl; + watchers.insert(watch); +} + +void Notify::complete_watcher(WatchRef watch) +{ + Mutex::Locker l(lock); + dout(10) << "complete_watcher" << dendl; + if (is_discarded()) + return; + assert(in_progress_watchers > 0); + watchers.erase(watch); + --in_progress_watchers; + maybe_complete_notify(); +} + +void Notify::maybe_complete_notify() +{ + dout(10) << "maybe_complete_notify -- " + << in_progress_watchers + << " in progress watchers " << dendl; + if (!in_progress_watchers) { + MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id, + WATCH_NOTIFY, payload)); + osd->send_message_osd_client(reply, client.get()); + unregister_cb(); + complete = true; + } +} + +void Notify::discard() +{ + Mutex::Locker l(lock); + discarded = true; + unregister_cb(); + watchers.clear(); +} + +void Notify::init() +{ + Mutex::Locker l(lock); + register_cb(); + maybe_complete_notify(); + assert(in_progress_watchers == watchers.size()); +} + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, watch.get()) + +static ostream& _prefix( + std::ostream* _dout, + Watch *watch) { + return *_dout << watch->gen_dbg_prefix(); +} + +class HandleWatchTimeout : public CancelableContext { + WatchRef watch; +public: + bool canceled; // protected by watch->pg->lock + HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} + void cancel() { + canceled = true; + } + void finish(int) { assert(0); /* not used */ } + void complete(int) { + dout(10) << "HandleWatchTimeout" << dendl; + boost::intrusive_ptr<ReplicatedPG> pg(watch->pg); + OSDService *osd(watch->osd); + osd->watch_lock.Unlock(); + pg->lock(); + watch->cb = NULL; + if (!watch->is_discarded() && !canceled) + watch->pg->handle_watch_timeout(watch); + delete this; // ~Watch requires pg lock! + pg->unlock(); + osd->watch_lock.Lock(); + } +}; + +class HandleDelayedWatchTimeout : public CancelableContext { + WatchRef watch; +public: + bool canceled; + HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} + void cancel() { + canceled = true; + } + void finish(int) { + dout(10) << "HandleWatchTimeoutDelayed" << dendl; + assert(watch->pg->is_locked()); + watch->cb = NULL; + if (!watch->is_discarded() && !canceled) + watch->pg->handle_watch_timeout(watch); + } +}; + +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +string Watch::gen_dbg_prefix() { + stringstream ss; + ss << pg->gen_prefix() << " -- Watch(" + << make_pair(cookie, entity) + << ", obc->ref=" << (obc ? obc->ref : -1) << ") "; + return ss.str(); +} + +Watch::Watch( + ReplicatedPG *pg, + OSDService *osd, + ObjectContext *obc, + uint32_t timeout, + uint64_t cookie, + entity_name_t entity) + : cb(NULL), + osd(osd), + pg(pg), + obc(obc), + timeout(timeout), + cookie(cookie), + entity(entity), + discarded(false) { + obc->get(); + dout(10) << "Watch()" << dendl; +} + +Watch::~Watch() { + dout(10) << "~Watch" << dendl; + // users must have called remove() or discard() prior to this point + assert(!obc); + assert(!conn); +} + +bool Watch::connected() { return conn; } + +Context *Watch::get_delayed_cb() +{ + assert(!cb); + cb = new HandleDelayedWatchTimeout(self.lock()); + return cb; } -void Watch::C_NotifyTimeout::finish(int r) +ObjectContext *Watch::get_obc() { - osd->handle_notify_timeout(notif); + assert(obc); + obc->get(); + return obc; } -void Watch::C_WatchTimeout::finish(int r) +void Watch::register_cb() { - osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity, - expire); + Mutex::Locker l(osd->watch_lock); + dout(15) << "registering callback, timeout: " << timeout << dendl; + cb = new HandleWatchTimeout(self.lock()); + osd->watch_timer.add_event_after( + timeout, + cb); } +void Watch::unregister_cb() +{ + dout(15) << "unregister_cb" << dendl; + if (!cb) + return; + dout(15) << "actually registered, cancelling" << dendl; + cb->cancel(); + { + Mutex::Locker l(osd->watch_lock); + osd->watch_timer.cancel_event(cb); // harmless if not registered with timer + } + cb = NULL; +} + +void Watch::connect(ConnectionRef con) +{ + dout(10) << "connecting" << dendl; + conn = con; + OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv())); + sessionref->wstate.addWatch(self.lock()); + sessionref->put(); + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + send_notify(i->second); + } + unregister_cb(); +} + +void Watch::disconnect() +{ + dout(10) << "disconnect" << dendl; + conn = ConnectionRef(); + register_cb(); +} + +void Watch::discard() +{ + dout(10) << "discard" << dendl; + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + i->second->discard(); + } + discard_state(); +} + +void Watch::discard_state() +{ + assert(pg->is_locked()); + assert(!discarded); + assert(obc); + in_progress_notifies.clear(); + unregister_cb(); + discarded = true; + if (conn) { + OSD::Session* sessionref(static_cast<OSD::Session*>(conn->get_priv())); + sessionref->wstate.removeWatch(self.lock()); + sessionref->put(); + conn = ConnectionRef(); + } + pg->put_object_context(obc); + obc = NULL; +} + +bool Watch::is_discarded() +{ + return discarded; +} + +void Watch::remove() +{ + dout(10) << "remove" << dendl; + for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); + i != in_progress_notifies.end(); + ++i) { + i->second->complete_watcher(self.lock()); + } + discard_state(); +} + +void Watch::start_notify(NotifyRef notif) +{ + dout(10) << "start_notify " << notif->notify_id << dendl; + assert(in_progress_notifies.find(notif->notify_id) == + in_progress_notifies.end()); + in_progress_notifies[notif->notify_id] = notif; + notif->start_watcher(self.lock()); + if (connected()) + send_notify(notif); +} + +void Watch::cancel_notify(NotifyRef notif) +{ + dout(10) << "cancel_notify " << notif->notify_id << dendl; + in_progress_notifies.erase(notif->notify_id); +} + +void Watch::send_notify(NotifyRef notif) +{ + dout(10) << "send_notify" << dendl; + MWatchNotify *notify_msg = new MWatchNotify( + cookie, notif->version, notif->notify_id, + WATCH_NOTIFY, notif->payload); + osd->send_message_osd_client(notify_msg, conn.get()); +} + +void Watch::notify_ack(uint64_t notify_id) +{ + dout(10) << "notify_ack" << dendl; + map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id); + if (i != in_progress_notifies.end()) { + i->second->complete_watcher(self.lock()); + in_progress_notifies.erase(i); + } +} + +WatchRef Watch::makeWatchRef( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity) +{ + WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity)); + ret->set_self(ret); + return ret; +} + +void WatchConState::addWatch(WatchRef watch) +{ + Mutex::Locker l(lock); + watches.insert(watch); +} + +void WatchConState::removeWatch(WatchRef watch) +{ + Mutex::Locker l(lock); + watches.erase(watch); +} + +void WatchConState::reset() +{ + set<WatchRef> _watches; + { + Mutex::Locker l(lock); + _watches.swap(watches); + } + for (set<WatchRef>::iterator i = _watches.begin(); + i != _watches.end(); + ++i) { + boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg()); + pg->lock(); + if (!(*i)->is_discarded()) { + (*i)->disconnect(); + } + pg->unlock(); + } +} diff --git a/src/osd/Watch.h b/src/osd/Watch.h index cb48de4d426..089350f35bb 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -11,95 +11,252 @@ * Foundation. See file COPYING. * */ - - #ifndef CEPH_WATCH_H #define CEPH_WATCH_H -#include <map> +#include <boost/intrusive_ptr.hpp> +#include <tr1/memory> +#include <set> -#include "OSD.h" -#include "common/config.h" +#include "msg/Messenger.h" +#include "include/Context.h" +#include "common/Mutex.h" + +enum WatcherState { + WATCHER_PENDING, + WATCHER_NOTIFIED, +}; +class OSDService; +class ReplicatedPG; +void intrusive_ptr_add_ref(ReplicatedPG *pg); +void intrusive_ptr_release(ReplicatedPG *pg); +class ObjectContext; class MWatchNotify; -/* keeps track and accounts sessions, watchers and notifiers */ -class Watch { - uint64_t notif_id; +class Watch; +typedef std::tr1::shared_ptr<Watch> WatchRef; +typedef std::tr1::weak_ptr<Watch> WWatchRef; -public: - enum WatcherState { - WATCHER_PENDING, - WATCHER_NOTIFIED, - }; - - struct Notification { - std::map<entity_name_t, WatcherState> watchers; - entity_name_t name; - uint64_t id; - OSD::Session *session; - uint64_t cookie; - MWatchNotify *reply; - Context *timeout; - void *obc; - pg_t pgid; - bufferlist bl; - - void add_watcher(const entity_name_t& name, WatcherState state) { - watchers[name] = state; - } - - Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) - : name(n), id(0), session(s), cookie(c), reply(0), timeout(0), - obc(0), bl(b) { } - }; - - class C_NotifyTimeout : public Context { - OSD *osd; - Notification *notif; - public: - C_NotifyTimeout(OSD *_osd, Notification *_notif) : osd(_osd), notif(_notif) {} - void finish(int r); - }; - - class C_WatchTimeout : public Context { - OSD *osd; - void *obc; - void *pg; - entity_name_t entity; - public: - utime_t expire; - C_WatchTimeout(OSD *_osd, void *_obc, void *_pg, - entity_name_t _entity, utime_t _expire) : - osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {} - void finish(int r); - }; - -private: - std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */ +class Notify; +typedef std::tr1::shared_ptr<Notify> NotifyRef; +typedef std::tr1::weak_ptr<Notify> WNotifyRef; -public: - Watch() : notif_id(0) {} +class CancelableContext; - void add_notification(Notification *notif) { - notif->id = ++notif_id; - notifs[notif->id] = notif; +/** + * Notify tracks the progress of a particular notify + * + * References are held by Watch and the timeout callback. + */ +class NotifyTimeoutCB; +class Notify { + friend class NotifyTimeoutCB; + friend class Watch; + WNotifyRef self; + ConnectionRef client; + unsigned in_progress_watchers; + bool complete; + bool discarded; + set<WatchRef> watchers; + + bufferlist payload; + uint32_t timeout; + uint64_t cookie; + uint64_t notify_id; + uint64_t version; + + OSDService *osd; + CancelableContext *cb; + Mutex lock; + + + /// true if this notify is being discarded + bool is_discarded() { + return discarded || complete; } - Notification *get_notif(uint64_t id) { - map<uint64_t, Notification *>::iterator iter = notifs.find(id); - if (iter != notifs.end()) - return iter->second; - return NULL; + + /// Sends notify completion if in_progress_watchers == 0 + void maybe_complete_notify(); + + /// Called on Notify timeout + void do_timeout(); + + Notify( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd); + + /// registers a timeout callback with the watch_timer + void register_cb(); + + /// removes the timeout callback, called on completion or cancellation + void unregister_cb(); +public: + string gen_dbg_prefix() { + stringstream ss; + ss << "Notify(" << make_pair(cookie, notify_id) << " " + << " in_progress_watchers=" << in_progress_watchers + << ") "; + return ss.str(); } - void remove_notification(Notification *notif) { - map<uint64_t, Notification *>::iterator iter = notifs.find(notif->id); - if (iter != notifs.end()) - notifs.erase(iter); + void set_self(NotifyRef _self) { + self = _self; } + static NotifyRef makeNotifyRef( + ConnectionRef client, + unsigned num_watchers, + bufferlist &payload, + uint32_t timeout, + uint64_t cookie, + uint64_t notify_id, + uint64_t version, + OSDService *osd); + + /// Call after creation to initialize + void init(); + + /// Called once per watcher prior to init() + void start_watcher( + WatchRef watcher ///< [in] watcher to complete + ); - bool ack_notification(entity_name_t& watcher, Notification *notif); + /// Called once per NotifyAck + void complete_watcher( + WatchRef watcher ///< [in] watcher to complete + ); + + /// Called when the notify is canceled due to a new peering interval + void discard(); }; +/** + * Watch is a mapping between a Connection and an ObjectContext + * + * References are held by ObjectContext and the timeout callback + */ +class HandleWatchTimeout; +class HandleDelayedWatchTimeout; +class Watch { + WWatchRef self; + friend class HandleWatchTimeout; + friend class HandleDelayedWatchTimeout; + ConnectionRef conn; + CancelableContext *cb; + + OSDService *osd; + boost::intrusive_ptr<ReplicatedPG> pg; + ObjectContext *obc; + + std::map<uint64_t, NotifyRef> in_progress_notifies; + + uint32_t timeout; + uint64_t cookie; + entity_name_t entity; + bool discarded; + Watch( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, + uint64_t cookie, entity_name_t entity); + + /// Registers the timeout callback with watch_timer + void register_cb(); + + /// Unregisters the timeout callback + void unregister_cb(); + + /// send a Notify message when connected for notif + void send_notify(NotifyRef notif); + + /// Cleans up state on discard or remove (including Connection state, obc) + void discard_state(); +public: + /// NOTE: must be called with pg lock held + ~Watch(); + + string gen_dbg_prefix(); + static WatchRef makeWatchRef( + ReplicatedPG *pg, OSDService *osd, + ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity); + void set_self(WatchRef _self) { + self = _self; + } + + /// Does not grant a ref count! + boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; } + + /// Grants a ref count! + ObjectContext *get_obc(); + uint64_t get_cookie() const { return cookie; } + entity_name_t get_entity() const { return entity; } + + /// Generates context for use if watch timeout is delayed by scrub or recovery + Context *get_delayed_cb(); + + /// True if currently connected + bool connected(); + + /// Transitions Watch to connected, unregister_cb, resends pending Notifies + void connect( + ConnectionRef con ///< [in] Reference to new connection + ); + + /// Transitions watch to disconnected, register_cb + void disconnect(); + + /// Called if Watch state is discarded due to new peering interval + void discard(); + + /// True if removed or discarded + bool is_discarded(); + + /// Called on unwatch + void remove(); + + /// Adds notif as in-progress notify + void start_notify( + NotifyRef notif ///< [in] Reference to new in-progress notify + ); + + /// Removes timed out notify + void cancel_notify( + NotifyRef notif ///< [in] notify which timed out + ); + + /// Call when notify_ack received on notify_id + void notify_ack( + uint64_t notify_id ///< [in] id of acked notify + ); +}; + +/** + * Holds weak refs to Watch structures corresponding to a connection + * Lives in the OSD::Session object of an OSD connection + */ +class WatchConState { + Mutex lock; + std::set<WatchRef> watches; +public: + WatchConState() : lock("WatchConState") {} + + /// Add a watch + void addWatch( + WatchRef watch ///< [in] Ref to new watch object + ); + + /// Remove a watch + void removeWatch( + WatchRef watch ///< [in] Ref to watch object to remove + ); + + /// Called on session reset, disconnects watchers + void reset(); +}; #endif diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 2136191bc19..6953f182000 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2491,7 +2491,14 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid, void object_info_t::encode(bufferlist& bl) const { - ENCODE_START(10, 8, bl); + map<entity_name_t, watch_info_t> old_watchers; + for (map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator i = + watchers.begin(); + i != watchers.end(); + ++i) { + old_watchers.insert(make_pair(i->first.second, i->second)); + } + ENCODE_START(11, 8, bl); ::encode(soid, bl); ::encode(oloc, bl); ::encode(category, bl); @@ -2507,15 +2514,17 @@ void object_info_t::encode(bufferlist& bl) const ::encode(truncate_seq, bl); ::encode(truncate_size, bl); ::encode(lost, bl); - ::encode(watchers, bl); + ::encode(old_watchers, bl); ::encode(user_version, bl); ::encode(uses_tmap, bl); + ::encode(watchers, bl); ENCODE_FINISH(bl); } void object_info_t::decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN(10, 8, 8, bl); + DECODE_START_LEGACY_COMPAT_LEN(11, 8, 8, bl); + map<entity_name_t, watch_info_t> old_watchers; if (struct_v >= 2 && struct_v <= 5) { sobject_t obj; ::decode(obj, bl); @@ -2549,7 +2558,7 @@ void object_info_t::decode(bufferlist::iterator& bl) else lost = false; if (struct_v >= 4) { - ::decode(watchers, bl); + ::decode(old_watchers, bl); ::decode(user_version, bl); } if (struct_v >= 9) @@ -2558,6 +2567,17 @@ void object_info_t::decode(bufferlist::iterator& bl) uses_tmap = true; if (struct_v < 10) soid.pool = oloc.pool; + if (struct_v >= 11) { + ::decode(watchers, bl); + } else { + for (map<entity_name_t, watch_info_t>::iterator i = old_watchers.begin(); + i != old_watchers.end(); + ++i) { + watchers.insert( + make_pair( + make_pair(i->second.cookie, i->first), i->second)); + } + } DECODE_FINISH(bl); } @@ -2584,9 +2604,10 @@ void object_info_t::dump(Formatter *f) const f->dump_unsigned("truncate_seq", truncate_seq); f->dump_unsigned("truncate_size", truncate_size); f->open_object_section("watchers"); - for (map<entity_name_t,watch_info_t>::const_iterator p = watchers.begin(); p != watchers.end(); ++p) { + for (map<pair<uint64_t, entity_name_t>,watch_info_t>::const_iterator p = + watchers.begin(); p != watchers.end(); ++p) { stringstream ss; - ss << p->first; + ss << p->first.second; f->open_object_section(ss.str().c_str()); p->second.dump(f); f->close_section(); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index ff8c2c5219e..008fe8e9ded 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -27,7 +27,7 @@ #include "common/snap_types.h" #include "common/Formatter.h" #include "os/hobject.h" - +#include "Watch.h" #define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v026" @@ -1728,7 +1728,6 @@ static inline ostream& operator<<(ostream& out, const notify_info_t& n) { } - struct object_info_t { hobject_t soid; object_locator_t oloc; @@ -1748,7 +1747,7 @@ struct object_info_t { uint64_t truncate_seq, truncate_size; - map<entity_name_t, watch_info_t> watchers; + map<pair<uint64_t, entity_name_t>, watch_info_t> watchers; bool uses_tmap; void copy_user_bits(const object_info_t& other); @@ -1780,6 +1779,109 @@ struct object_info_t { }; WRITE_CLASS_ENCODER(object_info_t) +struct ObjectState { + object_info_t oi; + bool exists; + + ObjectState(const object_info_t &oi_, bool exists_) + : oi(oi_), exists(exists_) {} +}; + + +struct SnapSetContext { + object_t oid; + int ref; + bool registered; + SnapSet snapset; + + SnapSetContext(const object_t& o) : oid(o), ref(0), registered(false) { } +}; + + +/* + * keep tabs on object modifications that are in flight. + * we need to know the projected existence, size, snapset, + * etc., because we don't send writes down to disk until after + * replicas ack. + */ +struct ObjectContext { + int ref; + bool registered; + ObjectState obs; + + SnapSetContext *ssc; // may be null + +private: + Mutex lock; +public: + Cond cond; + int unstable_writes, readers, writers_waiting, readers_waiting; + + // set if writes for this object are blocked on another objects recovery + ObjectContext *blocked_by; // object blocking our writes + set<ObjectContext*> blocking; // objects whose writes we block + + // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. + map<pair<uint64_t, entity_name_t>, WatchRef> watchers; + + ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_) + : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_), + lock("ReplicatedPG::ObjectContext::lock"), + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), + blocked_by(0) {} + + void get() { ++ref; } + + // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. + void ondisk_write_lock() { + lock.Lock(); + writers_waiting++; + while (readers_waiting || readers) + cond.Wait(lock); + writers_waiting--; + unstable_writes++; + lock.Unlock(); + } + void ondisk_write_unlock() { + lock.Lock(); + assert(unstable_writes > 0); + unstable_writes--; + if (!unstable_writes && readers_waiting) + cond.Signal(); + lock.Unlock(); + } + void ondisk_read_lock() { + lock.Lock(); + readers_waiting++; + while (unstable_writes) + cond.Wait(lock); + readers_waiting--; + readers++; + lock.Unlock(); + } + void ondisk_read_unlock() { + lock.Lock(); + assert(readers > 0); + readers--; + if (!readers && writers_waiting) + cond.Signal(); + lock.Unlock(); + } +}; + +inline ostream& operator<<(ostream& out, ObjectState& obs) +{ + out << obs.oi.soid; + if (!obs.exists) + out << "(dne)"; + return out; +} + +inline ostream& operator<<(ostream& out, ObjectContext& obc) +{ + return out << "obc(" << obc.obs << ")"; +} + ostream& operator<<(ostream& out, const object_info_t& oi); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index baf600c53be..9ff02f6ab93 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -487,8 +487,10 @@ struct ObjectOperation { add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl); } - void notify_ack(uint64_t notify_id, uint64_t ver) { + void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) { bufferlist bl; + ::encode(notify_id, bl); + ::encode(cookie, bl); add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl); } |