summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-02-20 13:29:31 -0800
committerSamuel Just <sam.just@inktank.com>2013-02-20 13:29:31 -0800
commitb531aa3688d9e8831837c23abac0bdaba04ec793 (patch)
tree3b383cd25a7c96b5d8ea051a11f2b31e5fcd1ae3
parent8713f18da8f24a4c0ab56aa98764d0845c1ebe46 (diff)
parent0202bf29035a93cd31ece08844ef3657f20e43b5 (diff)
downloadceph-b531aa3688d9e8831837c23abac0bdaba04ec793.tar.gz
Merge branch 'wip_watch_cleanup'
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--doc/dev/osd_internals/watch_notify.rst78
-rw-r--r--src/common/Timer.cc3
-rw-r--r--src/librados/IoCtxImpl.cc16
-rw-r--r--src/librados/IoCtxImpl.h5
-rw-r--r--src/librados/RadosClient.cc9
-rw-r--r--src/librados/RadosClient.h3
-rw-r--r--src/osd/OSD.cc141
-rw-r--r--src/osd/OSD.h20
-rw-r--r--src/osd/PG.cc1
-rw-r--r--src/osd/PG.h11
-rw-r--r--src/osd/ReplicatedPG.cc535
-rw-r--r--src/osd/ReplicatedPG.h145
-rw-r--r--src/osd/Watch.cc453
-rw-r--r--src/osd/Watch.h303
-rw-r--r--src/osd/osd_types.cc33
-rw-r--r--src/osd/osd_types.h108
-rw-r--r--src/osdc/Objecter.h4
17 files changed, 1100 insertions, 768 deletions
diff --git a/doc/dev/osd_internals/watch_notify.rst b/doc/dev/osd_internals/watch_notify.rst
new file mode 100644
index 00000000000..e5f48b78a68
--- /dev/null
+++ b/doc/dev/osd_internals/watch_notify.rst
@@ -0,0 +1,78 @@
+============
+Watch Notify
+============
+
+See librados for the watch/notify interface.
+
+Overview
+--------
+The object_info (See osd/osd_types.h) tracks the set of watchers for
+a particular object persistently in the object_info_t::watchers map.
+In order to track notify progress, we also maintain some ephemeral
+structures associated with the ObjectContext.
+
+Each Watch has an associated Watch object (See osd/Watch.h). The
+ObjectContext for a watched object will have a (strong) reference
+to one Watch object per watch, and each Watch object holds a
+reference to the corresponding ObjectContext. This circular reference
+is deliberate and is broken when the Watch state is discarded on
+a new peering interval or removed upon timeout expiration or an
+unwatch operation.
+
+A watch tracks the associated connection via a strong
+ConnectionRef Watch::conn. The associated connection has a
+WatchConState stashed in the OSD::Session for tracking associated
+Watches in order to be able to notify them upon ms_handle_reset()
+(via WatchConState::reset()).
+
+Each Watch object tracks the set of currently un-acked notifies.
+start_notify() on a Watch object adds a reference to a new in-progress
+Notify to the Watch and either:
+ * if the Watch is *connected*, sends a Notify message to the client
+ * if the Watch is *unconnected*, does nothing.
+When the Watch becomes connected (in ReplicatedPG::do_osd_op_effects),
+Notifies are resent to all remaining tracked Notify objects.
+
+Each Notify object tracks the set of un-notified Watchers via
+calls to complete_watcher(). Once the remaining set is empty or the
+timeout expires (cb, registered in init()) a notify completion
+is sent to the client.
+
+Watch Lifecycle
+---------------
+A watch may be in one of 5 states:
+ 1. Non existent.
+ 2. On disk, but not registered with an object context.
+ 3. Connected
+ 4. Disconnected, callback registered with timer
+ 5. Disconnected, callback in queue for scrub or is_degraded
+
+Case 2 occurs between when an OSD goes active and the ObjectContext
+for an object with watchers is loaded into memory due to an access.
+During Case 2, no state is registered for the watch. Case 2
+transitions to Case 4 in ReplicatedPG::populate_obc_watchers() during
+ReplicatedPG::find_object_context. Case 1 becomes case 3 via
+OSD::do_osd_op_effects due to a watch operation. Case 4,5 become case
+3 in the same way. Case 3 becomes case 4 when the connection resets
+on a watcher's session.
+
+Cases 4&5 can use some explanation. Normally, when a Watch enters Case
+4, a callback is registered with the OSDService::watch_timer to be
+called at timeout expiration. At the time that the callback is
+called, however, the pg might be in a state where it cannot write
+to the object in order to remove the watch (i.e., during a scrub
+or while the object is degraded). In that case, we use
+Watch::get_delayed_cb() to generate another Context for use from
+the callbacks_for_degraded_object and Scrubber::callbacks lists.
+In either case, Watch::unregister_cb() does the right thing
+(SafeTimer::cancel_event() is harmless for contexts not registered
+with the timer).
+
+Notify Lifecycle
+----------------
+The notify timeout is simpler: a timeout callback is registered when
+the notify is init()'d. If all watchers ack notifies before the
+timeout occurs, the timeout is canceled and the client is notified
+of the notify completion. Otherwise, the timeout fires, the Notify
+object pings each Watch via cancel_notify to remove itself, and
+sends the notify completion to the client early.
diff --git a/src/common/Timer.cc b/src/common/Timer.cc
index ac0550c768f..f90d9ab9694 100644
--- a/src/common/Timer.cc
+++ b/src/common/Timer.cc
@@ -102,8 +102,7 @@ void SafeTimer::timer_thread()
if (!safe_callbacks)
lock.Unlock();
- callback->finish(0);
- delete callback;
+ callback->complete(0);
if (!safe_callbacks)
lock.Lock();
}
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc
index ea63b2a59b4..800e27f90b6 100644
--- a/src/librados/IoCtxImpl.cc
+++ b/src/librados/IoCtxImpl.cc
@@ -1403,7 +1403,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
lock->Lock();
WatchContext *wc = new WatchContext(this, oid, ctx);
- client->register_watcher(wc, oid, ctx, cookie);
+ client->register_watcher(wc, cookie);
prepare_assert_ops(&rd);
rd.watch(*cookie, ver, 1);
bufferlist bl;
@@ -1431,12 +1431,14 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
/* this is called with IoCtxImpl::lock held */
-int librados::IoCtxImpl::_notify_ack(const object_t& oid,
- uint64_t notify_id, uint64_t ver)
+int librados::IoCtxImpl::_notify_ack(
+ const object_t& oid,
+ uint64_t notify_id, uint64_t ver,
+ uint64_t cookie)
{
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify_ack(notify_id, ver);
+ rd.notify_ack(notify_id, ver, cookie);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
return 0;
@@ -1491,7 +1493,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
lock->Lock();
WatchContext *wc = new WatchContext(this, oid, ctx);
- client->register_watcher(wc, oid, ctx, &cookie);
+ client->register_watcher(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
::encode(prot_ver, inbl);
@@ -1687,7 +1689,7 @@ void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
librados::WatchCtx *_ctx)
- : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0)
+ : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0)
{
io_ctx_impl->get();
}
@@ -1706,7 +1708,7 @@ void librados::WatchContext::notify(Mutex *client_lock,
ctx->notify(opcode, ver, payload);
if (opcode != WATCH_NOTIFY_COMPLETE) {
client_lock->Lock();
- io_ctx_impl->_notify_ack(oid, notify_id, ver);
+ io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie);
client_lock->Unlock();
}
}
diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h
index 33d1f3ebd14..c5b14cab8e3 100644
--- a/src/librados/IoCtxImpl.h
+++ b/src/librados/IoCtxImpl.h
@@ -194,7 +194,9 @@ struct librados::IoCtxImpl {
int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(const object_t& oid, uint64_t cookie);
int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
- int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t ver);
+ int _notify_ack(
+ const object_t& oid, uint64_t notify_id, uint64_t ver,
+ uint64_t cookie);
eversion_t last_version();
void set_assert_version(uint64_t ver);
@@ -217,6 +219,7 @@ struct WatchContext : public RefCountedWaitObject {
const object_t oid;
librados::WatchCtx *ctx;
uint64_t linger_id;
+ uint64_t cookie;
WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc
index 7e76b65694d..feb0dfc3602 100644
--- a/src/librados/RadosClient.cc
+++ b/src/librados/RadosClient.cc
@@ -478,14 +478,11 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti
return r;
}
-void librados::RadosClient::register_watcher(WatchContext *wc,
- const object_t& oid,
- librados::WatchCtx *ctx,
- uint64_t *cookie)
+void librados::RadosClient::register_watcher(WatchContext *wc, uint64_t *cookie)
{
assert(lock.is_locked());
- *cookie = ++max_watch_cookie;
- watchers[*cookie] = wc;
+ wc->cookie = *cookie = ++max_watch_cookie;
+ watchers[wc->cookie] = wc;
}
void librados::RadosClient::unregister_watcher(uint64_t cookie)
diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h
index 1f39f22fb3f..6dd6d109c64 100644
--- a/src/librados/RadosClient.h
+++ b/src/librados/RadosClient.h
@@ -97,8 +97,7 @@ public:
uint64_t max_watch_cookie;
map<uint64_t, librados::WatchContext *> watchers;
- void register_watcher(librados::WatchContext *wc, const object_t& oid,
- librados::WatchCtx *ctx, uint64_t *cookie);
+ void register_watcher(librados::WatchContext *wc, uint64_t *cookie);
void unregister_watcher(uint64_t cookie);
void watch_notify(MWatchNotify *m);
void get();
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index b20e6d690f2..66f976b1c15 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -168,7 +168,6 @@ OSDService::OSDService(OSD *osd) :
scrubs_active(0),
watch_lock("OSD::watch_lock"),
watch_timer(osd->client_messenger->cct, watch_lock),
- watch(NULL),
backfill_request_lock("OSD::backfill_request_lock"),
backfill_request_timer(g_ceph_context, backfill_request_lock, false),
last_tid(0),
@@ -252,15 +251,12 @@ void OSDService::shutdown()
watch_lock.Lock();
watch_timer.shutdown();
watch_lock.Unlock();
-
- delete watch;
}
void OSDService::init()
{
reserver_finisher.start();
watch_timer.init();
- watch = new Watch();
}
ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev)
@@ -2571,152 +2567,17 @@ void OSD::ms_handle_connect(Connection *con)
}
}
-void OSD::put_object_context(void *_obc, pg_t pgid)
-{
- ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
- ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(pgid);
- // If pg is being deleted, (which is the only case in which
- // it will be NULL) it will clean up its object contexts itself
- if (pg) {
- pg->put_object_context(obc);
- pg->unlock();
- }
-}
-
-void OSD::complete_notify(void *_notif, void *_obc)
-{
- ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
- Watch::Notification *notif = (Watch::Notification *)_notif;
- dout(10) << "complete_notify " << notif << " got the last reply from pending watchers, can send response now" << dendl;
- MWatchNotify *reply = notif->reply;
- client_messenger->send_message(reply, notif->session->con);
- notif->session->put();
- notif->session->con->put();
- service.watch->remove_notification(notif);
- if (notif->timeout)
- service.watch_timer.cancel_event(notif->timeout);
- map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif);
- if (iter != obc->notifs.end())
- obc->notifs.erase(iter);
- delete notif;
-}
-
-void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, ReplicatedPG *pg)
-{
- assert(service.watch_lock.is_locked());
- pg->assert_locked();
- Watch::Notification *notif = (Watch::Notification *)_notif;
- dout(10) << "ack_notification " << name << " notif " << notif << " id " << notif->id << dendl;
- if (service.watch->ack_notification(name, notif)) {
- complete_notify(notif, _obc);
- pg->put_object_context(static_cast<ReplicatedPG::ObjectContext *>(_obc));
- }
-}
-
-void OSD::handle_watch_timeout(void *obc,
- ReplicatedPG *pg,
- entity_name_t entity,
- utime_t expire)
-{
- // watch_lock is inside pg->lock; handle_watch_timeout checks for the race.
- service.watch_lock.Unlock();
- pg->lock();
- service.watch_lock.Lock();
-
- pg->handle_watch_timeout(obc, entity, expire);
- pg->unlock();
- pg->put();
-}
-
-void OSD::disconnect_session_watches(Session *session)
-{
- // get any watched obc's
- map<ReplicatedPG::ObjectContext *, pg_t> obcs;
- service.watch_lock.Lock();
- for (map<void *, pg_t>::iterator iter = session->watches.begin(); iter != session->watches.end(); ++iter) {
- ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)iter->first;
- obcs[obc] = iter->second;
- }
- service.watch_lock.Unlock();
-
- for (map<ReplicatedPG::ObjectContext *, pg_t>::iterator oiter = obcs.begin(); oiter != obcs.end(); ++oiter) {
- ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first;
- dout(10) << "obc=" << (void *)obc << dendl;
-
- ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
- if (!pg) {
- /* pg removed between watch_unlock.Unlock() and now, all related
- * watch structures would have been cleaned up in remove_watchers_and_notifies
- */
- continue;
- }
- service.watch_lock.Lock();
-
- if (!session->watches.count((void*)obc)) {
- // Raced with watch removal, obc is invalid
- service.watch_lock.Unlock();
- pg->unlock();
- continue;
- }
-
- /* NOTE! fix this one, should be able to just lookup entity name,
- however, we currently only keep EntityName on the session and not
- entity_name_t. */
- map<entity_name_t, Session *>::iterator witer = obc->watchers.begin();
- while (1) {
- while (witer != obc->watchers.end() && witer->second == session) {
- dout(10) << "removing watching session entity_name=" << session->entity_name
- << " from " << obc->obs.oi << dendl;
- entity_name_t entity = witer->first;
- watch_info_t& w = obc->obs.oi.watchers[entity];
- utime_t expire = ceph_clock_now(g_ceph_context);
- expire += w.timeout_seconds;
- pg->register_unconnected_watcher(obc, entity, expire);
- dout(10) << " disconnected watch " << w << " by " << entity << " session " << session
- << ", expires " << expire << dendl;
- obc->watchers.erase(witer++);
- pg->put_object_context(obc);
- session->con->put();
- session->put();
- }
- if (witer == obc->watchers.end())
- break;
- ++witer;
- }
- service.watch_lock.Unlock();
- pg->unlock();
- }
-}
-
bool OSD::ms_handle_reset(Connection *con)
{
dout(1) << "OSD::ms_handle_reset()" << dendl;
OSD::Session *session = (OSD::Session *)con->get_priv();
if (!session)
return false;
- disconnect_session_watches(session);
+ session->wstate.reset();
session->put();
return true;
}
-void OSD::handle_notify_timeout(void *_notif)
-{
- assert(service.watch_lock.is_locked());
- Watch::Notification *notif = (Watch::Notification *)_notif;
- dout(10) << "OSD::handle_notify_timeout notif " << notif << " id " << notif->id << dendl;
-
- ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)notif->obc;
-
- pg_t pgid = notif->pgid;
-
- complete_notify(_notif, obc);
- service.watch_lock.Unlock(); /* drop lock to change locking order */
-
- put_object_context(obc, pgid);
- service.watch_lock.Lock();
- /* exiting with watch_lock held */
-}
-
struct C_OSD_GetVersion : public Context {
OSD *osd;
uint64_t oldest, newest;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 1837195d339..25e1eee13ad 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -48,6 +48,7 @@ using namespace std;
#include <ext/hash_set>
using namespace __gnu_cxx;
+#include "Watch.h"
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
#include "common/sharedptr_registry.hpp"
@@ -295,7 +296,11 @@ public:
// -- Watch --
Mutex watch_lock;
SafeTimer watch_timer;
- Watch *watch;
+ uint64_t next_notif_id;
+ uint64_t get_next_id(epoch_t cur_epoch) {
+ Mutex::Locker l(watch_lock);
+ return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
+ }
// -- Backfill Request Scheduling --
Mutex backfill_request_lock;
@@ -526,7 +531,7 @@ public:
int64_t auid;
epoch_t last_sent_epoch;
Connection *con;
- std::map<void *, pg_t> watches;
+ WatchConState wstate;
Session() : auid(-1), last_sent_epoch(0), con(0) {}
};
@@ -1468,17 +1473,6 @@ public:
int init_op_flags(OpRequestRef op);
-
- void put_object_context(void *_obc, pg_t pgid);
- void complete_notify(void *notif, void *obc);
- void ack_notification(entity_name_t& peer_addr, void *notif, void *obc,
- ReplicatedPG *pg);
- void handle_notify_timeout(void *notif);
- void disconnect_session_watches(Session *session);
- void handle_watch_timeout(void *obc,
- ReplicatedPG *pg,
- entity_name_t entity,
- utime_t expire);
OSDService service;
friend class OSDService;
};
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 3c10230c41a..09d27a6a037 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -5436,6 +5436,7 @@ PG::RecoveryState::Reset::react(const FlushedEvt&)
{
PG *pg = context< RecoveryMachine >().pg;
pg->flushed = true;
+ pg->on_flushed();
pg->requeue_ops(pg->waiting_for_active);
return discard_event();
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index ec3d4664a90..ed85c4e2946 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -1916,17 +1916,8 @@ public:
virtual void on_role_change() = 0;
virtual void on_change() = 0;
virtual void on_activate() = 0;
+ virtual void on_flushed() = 0;
virtual void on_shutdown() = 0;
- virtual void remove_watchers_and_notifies() = 0;
-
- virtual void register_unconnected_watcher(void *obc,
- entity_name_t entity,
- utime_t expire) = 0;
- virtual void unregister_unconnected_watcher(void *obc,
- entity_name_t entity) = 0;
- virtual void handle_watch_timeout(void *obc,
- entity_name_t entity,
- utime_t expire) = 0;
};
WRITE_CLASS_ENCODER(PG::OndiskLog)
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index d23db2884ed..83502de7d12 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1567,94 +1567,6 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr)
}
}
-void ReplicatedPG::dump_watchers(ObjectContext *obc)
-{
- assert(osd->watch_lock.is_locked());
-
- dout(10) << "dump_watchers " << obc->obs.oi.soid << " " << obc->obs.oi << dendl;
- for (map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.begin();
- iter != obc->watchers.end();
- ++iter)
- dout(10) << " * obc->watcher: " << iter->first << " session=" << iter->second << dendl;
-
- for (map<entity_name_t, watch_info_t>::iterator oi_iter = obc->obs.oi.watchers.begin();
- oi_iter != obc->obs.oi.watchers.end();
- oi_iter++) {
- watch_info_t& w = oi_iter->second;
- dout(10) << " * oi->watcher: " << oi_iter->first << " cookie=" << w.cookie << dendl;
- }
-}
-
-void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity)
-{
- assert_locked();
- assert(osd->watch_lock.is_locked());
- dout(10) << "remove_watcher " << *obc << " " << entity << dendl;
- map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
- assert(iter != obc->watchers.end());
- OSD::Session *session = iter->second;
- dout(10) << "remove_watcher removing session " << session << dendl;
-
- obc->watchers.erase(iter);
- assert(session->watches.count(obc));
- session->watches.erase(obc);
-
- put_object_context(obc);
- session->con->put();
- session->put();
-}
-
-void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif)
-{
- assert_locked();
- assert(osd->watch_lock.is_locked());
- map<Watch::Notification *, bool>::iterator niter = obc->notifs.find(notif);
-
- // Cancel notification
- if (notif->timeout)
- osd->watch_timer.cancel_event(notif->timeout);
- osd->watch->remove_notification(notif);
-
- assert(niter != obc->notifs.end());
-
- niter->first->session->con->put();
- niter->first->session->put();
- obc->notifs.erase(niter);
-
- put_object_context(obc);
- delete notif;
-}
-
-void ReplicatedPG::remove_watchers_and_notifies()
-{
- assert_locked();
-
- dout(10) << "remove_watchers" << dendl;
-
- osd->watch_lock.Lock();
- for (map<hobject_t, ObjectContext*>::iterator oiter = object_contexts.begin();
- oiter != object_contexts.end();
- ) {
- map<hobject_t, ObjectContext *>::iterator iter = oiter++;
- ObjectContext *obc = iter->second;
- obc->ref++;
- for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
- witer != obc->watchers.end();
- remove_watcher(obc, (witer++)->first)) ;
- for (map<entity_name_t,Watch::C_WatchTimeout*>::iterator iter = obc->unconnected_watchers.begin();
- iter != obc->unconnected_watchers.end();
- ) {
- map<entity_name_t,Watch::C_WatchTimeout*>::iterator i = iter++;
- unregister_unconnected_watcher(obc, i->first);
- }
- for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
- niter != obc->notifs.end();
- remove_notify(obc, (niter++)->first)) ;
- put_object_context(obc);
- }
- osd->watch_lock.Unlock();
-}
-
// ========================================================================
// low level osd ops
@@ -2329,20 +2241,20 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
case CEPH_OSD_OP_NOTIFY_ACK:
{
- osd->watch_lock.Lock();
- entity_name_t source = ctx->op->request->get_source();
- map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source);
- Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie);
- if (oi_iter != oi.watchers.end() && notif) {
- ctx->notify_acks.push_back(op.watch.cookie);
- } else {
- if (!notif)
- dout(10) << " no pending notify for cookie " << op.watch.cookie << dendl;
- else
- dout(10) << " not registered as a watcher" << dendl;
- result = -EINVAL;
+ try {
+ uint64_t notify_id = 0;
+ uint64_t watch_cookie = 0;
+ ::decode(notify_id, bp);
+ ::decode(watch_cookie, bp);
+ OpContext::NotifyAck ack(notify_id, watch_cookie);
+ ctx->notify_acks.push_back(ack);
+ } catch (const buffer::error &e) {
+ OpContext::NotifyAck ack(
+ // op.watch.cookie is actually the notify_id for historical reasons
+ op.watch.cookie
+ );
+ ctx->notify_acks.push_back(ack);
}
- osd->watch_lock.Unlock();
}
break;
@@ -2557,27 +2469,24 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
watch_info_t w(cookie, 30); // FIXME: where does the timeout come from?
if (do_watch) {
- if (oi.watchers.count(entity) && oi.watchers[entity] == w) {
+ if (oi.watchers.count(make_pair(cookie, entity))) {
dout(10) << " found existing watch " << w << " by " << entity << dendl;
} else {
dout(10) << " registered new watch " << w << " by " << entity << dendl;
- oi.watchers[entity] = w;
+ oi.watchers[make_pair(cookie, entity)] = w;
t.nop(); // make sure update the object_info on disk!
}
- ctx->watch_connect = true;
- ctx->watch_info = w;
+ ctx->watch_connects.push_back(w);
assert(obc->registered);
} else {
- map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
+ map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
+ oi.watchers.find(make_pair(cookie, entity));
if (oi_iter != oi.watchers.end()) {
- dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl;
- oi.watchers.erase(entity);
+ dout(10) << " removed watch " << oi_iter->second << " by "
+ << entity << dendl;
+ oi.watchers.erase(oi_iter);
t.nop(); // update oi on disk
-
- ctx->watch_disconnect = true;
-
- // FIXME: trigger notifies?
-
+ ctx->watch_disconnects.push_back(w);
} else {
dout(10) << " can't remove: no watch by " << entity << dendl;
}
@@ -3340,154 +3249,94 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum
void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
{
- if (ctx->watch_connect || ctx->watch_disconnect ||
- !ctx->notifies.empty() || !ctx->notify_acks.empty()) {
- OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv();
- assert(session);
- ObjectContext *obc = ctx->obc;
- object_info_t& oi = ctx->new_obs.oi;
- hobject_t& soid = oi.soid;
- entity_name_t entity = ctx->reqid.name;
+ ConnectionRef conn(ctx->op->request->get_connection());
+ boost::intrusive_ptr<OSD::Session> session(
+ (OSD::Session *)conn->get_priv());
+ entity_name_t entity = ctx->reqid.name;
- dout(10) << "do_osd_op_effects applying watch/notify effects on session " << session << dendl;
-
- osd->watch_lock.Lock();
- dump_watchers(obc);
-
- map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
- if (ctx->watch_connect) {
- watch_info_t w = ctx->watch_info;
-
- if (iter == obc->watchers.end()) {
- dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl;
- obc->watchers[entity] = session;
- session->con->get();
- session->get();
- session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
- obc->ref++;
- } else if (iter->second == session) {
- // already there
- dout(10) << " already connected to " << w << " by " << entity
- << " session " << session << dendl;
- } else {
- // weird: same entity, different session.
- dout(10) << " reconnected (with different session!) watch " << w << " by " << entity
- << " session " << session << " (was " << iter->second << ")" << dendl;
- session->con->get();
- session->get();
-
- iter->second->watches.erase(obc);
- iter->second->con->put();
- iter->second->put();
-
- iter->second = session;
- session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
- }
- map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter =
- obc->unconnected_watchers.find(entity);
- if (un_iter != obc->unconnected_watchers.end()) {
- unregister_unconnected_watcher(obc, un_iter->first);
- }
+ dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
- map<Watch::Notification *, bool>::iterator niter;
- for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
- Watch::Notification *notif = niter->first;
- map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity);
- if (iter != notif->watchers.end()) {
- /* there is a pending notification for this watcher, we should resend it anyway
- even if we already sent it as it might not have received it */
- MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->send_message_osd_client(notify_msg, session->con);
- }
- }
+ for (list<watch_info_t>::iterator i = ctx->watch_connects.begin();
+ i != ctx->watch_connects.end();
+ ++i) {
+ pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
+ dout(15) << "do_osd_op_effects applying watch connect on session "
+ << session.get() << " watcher " << watcher << dendl;
+ WatchRef watch;
+ if (ctx->obc->watchers.count(watcher)) {
+ dout(15) << "do_osd_op_effects found existing watch watcher " << watcher
+ << dendl;
+ watch = ctx->obc->watchers[watcher];
+ } else {
+ dout(15) << "do_osd_op_effects new watcher " << watcher
+ << dendl;
+ watch = Watch::makeWatchRef(
+ this, osd, ctx->obc, i->timeout_seconds,
+ i->cookie, entity);
+ ctx->obc->watchers.insert(
+ make_pair(
+ watcher,
+ watch));
}
-
- if (ctx->watch_disconnect) {
- if (iter != obc->watchers.end()) {
- remove_watcher(obc, entity);
- } else {
- assert(obc->unconnected_watchers.count(entity));
- unregister_unconnected_watcher(obc, entity);
- }
+ watch->connect(conn);
+ }
- // ack any pending notifies
- map<Watch::Notification *, bool>::iterator p = obc->notifs.begin();
- while (p != obc->notifs.end()) {
- Watch::Notification *notif = p->first;
- entity_name_t by = entity;
- p++;
- assert(notif->obc == obc);
- dout(10) << " acking pending notif " << notif->id << " by " << by << dendl;
- // TODOSAM: osd->osd-> not good
- osd->osd->ack_notification(entity, notif, obc, this);
- }
+ for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
+ i != ctx->watch_disconnects.end();
+ ++i) {
+ pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
+ dout(15) << "do_osd_op_effects applying watch disconnect on session "
+ << session.get() << " and watcher " << watcher << dendl;
+ if (ctx->obc->watchers.count(watcher)) {
+ WatchRef watch = ctx->obc->watchers[watcher];
+ dout(10) << "do_osd_op_effects applying disconnect found watcher "
+ << watcher << dendl;
+ ctx->obc->watchers.erase(watcher);
+ watch->remove();
+ } else {
+ dout(10) << "do_osd_op_effects failed to find watcher "
+ << watcher << dendl;
}
+ }
- for (list<notify_info_t>::iterator p = ctx->notifies.begin();
- p != ctx->notifies.end();
- ++p) {
-
- dout(10) << " " << *p << dendl;
-
- Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
- session->get(); // notif got a reference
- session->con->get();
- notif->pgid = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
-
- osd->watch->add_notification(notif);
- dout(20) << " notify id " << notif->id << dendl;
-
- // connected
- for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin();
- i != obc->obs.oi.watchers.end();
- ++i) {
- map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first);
- if (q != obc->watchers.end()) {
- entity_name_t name = q->first;
- OSD::Session *s = q->second;
- watch_info_t& w = obc->obs.oi.watchers[q->first];
-
- notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
-
- MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->send_message_osd_client(notify_msg, s->con);
- } else {
- // unconnected
- entity_name_t name = i->first;
- notif->add_watcher(name, Watch::WATCHER_PENDING);
- }
- }
-
- notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
- if (notif->watchers.empty()) {
- // TODOSAM: osd->osd-> not good
- osd->osd->complete_notify(notif, obc);
- } else {
- obc->notifs[notif] = true;
- obc->ref++;
- notif->obc = obc;
- // TODOSAM: osd->osd not good
- notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif);
- osd->watch_timer.add_event_after(p->timeout, notif->timeout);
- }
+ for (list<notify_info_t>::iterator p = ctx->notifies.begin();
+ p != ctx->notifies.end();
+ ++p) {
+ dout(10) << "do_osd_op_effects, notify " << *p << dendl;
+ NotifyRef notif(
+ Notify::makeNotifyRef(
+ conn,
+ ctx->obc->watchers.size(),
+ p->bl,
+ p->timeout,
+ p->cookie,
+ osd->get_next_id(get_osdmap()->get_epoch()),
+ ctx->obc->obs.oi.user_version.version,
+ osd));
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
+ ctx->obc->watchers.begin();
+ i != ctx->obc->watchers.end();
+ ++i) {
+ dout(10) << "starting notify on watch " << i->first << dendl;
+ i->second->start_notify(notif);
}
+ notif->init();
+ }
- for (list<uint64_t>::iterator p = ctx->notify_acks.begin(); p != ctx->notify_acks.end(); ++p) {
- uint64_t cookie = *p;
-
- dout(10) << " notify_ack " << cookie << dendl;
- map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
- assert(oi_iter != oi.watchers.end());
-
- Watch::Notification *notif = osd->watch->get_notif(cookie);
- assert(notif);
-
- // TODOSAM: osd->osd-> not good
- osd->osd->ack_notification(entity, notif, obc, this);
+ for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin();
+ p != ctx->notify_acks.end();
+ ++p) {
+ dout(10) << "notify_ack " << make_pair(p->watch_cookie, p->notify_id) << dendl;
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
+ ctx->obc->watchers.begin();
+ i != ctx->obc->watchers.end();
+ ++i) {
+ if (i->first.second != entity) continue;
+ if (p->watch_cookie &&
+ p->watch_cookie.get() != i->first.first) continue;
+ dout(10) << "acking notify on watch " << i->first << dendl;
+ i->second->notify_ack(p->notify_id);
}
-
- osd->watch_lock.Unlock();
- session->put();
}
}
@@ -4115,113 +3964,39 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version));
dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl;
- if (!obc->obs.oi.watchers.empty()) {
- Mutex::Locker l(osd->watch_lock);
- assert(obc->unconnected_watchers.size() == 0);
- assert(obc->watchers.size() == 0);
- // populate unconnected_watchers
- utime_t now = ceph_clock_now(g_ceph_context);
- for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin();
- p != obc->obs.oi.watchers.end();
- p++) {
- utime_t expire = now;
- expire += p->second.timeout_seconds;
- dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl;
- register_unconnected_watcher(obc, p->first, expire);
- }
- }
-}
-
-void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
- entity_name_t entity)
-{
- ObjectContext *obc = static_cast<ObjectContext *>(_obc);
-
- /* If we failed to cancel the event, the event will fire and the obc
- * ref and the pg ref will be taken care of */
- if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) {
- put_object_context(obc);
- put();
+ assert(obc->watchers.size() == 0);
+ // populate unconnected_watchers
+ utime_t now = ceph_clock_now(g_ceph_context);
+ for (map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator p =
+ obc->obs.oi.watchers.begin();
+ p != obc->obs.oi.watchers.end();
+ p++) {
+ utime_t expire = now;
+ expire += p->second.timeout_seconds;
+ dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl;
+ WatchRef watch(
+ Watch::makeWatchRef(
+ this, osd, obc, p->second.timeout_seconds, p->first.first, p->first.second));
+ watch->disconnect();
+ obc->watchers.insert(
+ make_pair(
+ make_pair(p->first.first, p->first.second),
+ watch));
}
- obc->unconnected_watchers.erase(entity);
}
-void ReplicatedPG::register_unconnected_watcher(void *_obc,
- entity_name_t entity,
- utime_t expire)
+void ReplicatedPG::handle_watch_timeout(WatchRef watch)
{
- ObjectContext *obc = static_cast<ObjectContext *>(_obc);
- pg_t pgid = info.pgid;
- pgid.set_ps(obc->obs.oi.soid.hash);
- get();
- obc->ref++;
- Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd,
- static_cast<void *>(obc),
- this,
- entity, expire);
- osd->watch_timer.add_event_at(expire, cb);
- obc->unconnected_watchers[entity] = cb;
-}
-
-void ReplicatedPG::handle_watch_timeout(void *_obc,
- entity_name_t entity,
- utime_t expire)
-{
- dout(10) << "handle_watch_timeout obc " << _obc << dendl;
- struct HandleWatchTimeout : public Context {
- epoch_t cur_epoch;
- boost::intrusive_ptr<ReplicatedPG> pg;
- void *obc;
- entity_name_t entity;
- utime_t expire;
- HandleWatchTimeout(
- epoch_t cur_epoch,
- ReplicatedPG *pg,
- void *obc,
- entity_name_t entity,
- utime_t expire) : cur_epoch(cur_epoch),
- pg(pg), obc(obc), entity(entity), expire(expire) {
- assert(pg->is_locked());
- static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
- }
- void finish(int) {
- assert(pg->is_locked());
- if (cur_epoch < pg->last_peering_reset)
- return;
- // handle_watch_timeout gets its own ref
- static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
- pg->handle_watch_timeout(obc, entity, expire);
- }
- ~HandleWatchTimeout() {
- assert(pg->is_locked());
- pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc));
- }
- };
-
- ObjectContext *obc = static_cast<ObjectContext *>(_obc);
-
- if (obc->unconnected_watchers.count(entity) == 0 ||
- (obc->unconnected_watchers[entity] &&
- obc->unconnected_watchers[entity]->expire != expire)) {
- /* If obc->unconnected_watchers[entity] == NULL we know at least that
- * the watcher for obc,entity should expire. We might not have been
- * the intended Context*, but that's ok since the intended one will
- * take this branch and assume it raced. */
- dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher "
- << entity << dendl;
- put_object_context(obc);
- return;
- }
+ ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref
+ dout(10) << "handle_watch_timeout obc " << obc << dendl;
if (is_degraded_object(obc->obs.oi.soid)) {
callbacks_for_degraded_object[obc->obs.oi.soid].push_back(
- new HandleWatchTimeout(get_osdmap()->get_epoch(),
- this, _obc, entity, expire)
+ watch->get_delayed_cb()
);
dout(10) << "handle_watch_timeout waiting for degraded on obj "
<< obc->obs.oi.soid
<< dendl;
- obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
put_object_context(obc); // callback got its own ref
return;
}
@@ -4230,15 +4005,16 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
dout(10) << "handle_watch_timeout waiting for scrub on obj "
<< obc->obs.oi.soid
<< dendl;
- scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(),
- this, _obc, entity, expire));
- obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
- put_object_context(obc); // callback got its own ref
+ scrubber.add_callback(
+ watch->get_delayed_cb() // This callback!
+ );
+ put_object_context(obc);
return;
}
- obc->unconnected_watchers.erase(entity);
- obc->obs.oi.watchers.erase(entity);
+ obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
+ obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
+ watch->remove();
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
@@ -4283,7 +4059,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
eval_repop(repop);
}
-ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
+ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
{
map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid);
if (p != object_contexts.end())
@@ -4291,7 +4067,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_
return NULL;
}
-ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi,
+ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi,
SnapSetContext *ssc)
{
ObjectContext *obc = new ObjectContext(oi, false, ssc);
@@ -4302,7 +4078,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_in
return obc;
}
-ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
+ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
const object_locator_t& oloc,
bool can_create)
{
@@ -4354,7 +4130,24 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s
void ReplicatedPG::context_registry_on_change()
{
- remove_watchers_and_notifies();
+ list<ObjectContext *> contexts;
+ for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
+ i != object_contexts.end();
+ ++i) {
+ i->second->get();
+ contexts.push_back(i->second);
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
+ i->second->watchers.begin();
+ j != i->second->watchers.end();
+ i->second->watchers.erase(j++)) {
+ j->second->discard();
+ }
+ }
+ for (list<ObjectContext *>::iterator i = contexts.begin();
+ i != contexts.end();
+ contexts.erase(i++)) {
+ put_object_context(*i);
+ }
}
@@ -4529,7 +4322,7 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *
pgstat->stats.cat_sum[oi.category].add(stat);
}
-ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
+SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
{
SnapSetContext *ssc = new SnapSetContext(oid);
dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl;
@@ -4538,10 +4331,10 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_
return ssc;
}
-ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
- const string& key,
- ps_t seed,
- bool can_create)
+SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
+ const string& key,
+ ps_t seed,
+ bool can_create)
{
SnapSetContext *ssc;
map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid);
@@ -6028,7 +5821,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
/* Mark an object as lost
*/
-ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
+ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
const hobject_t &oid, eversion_t version,
utime_t mtime, int what)
{
@@ -6065,7 +5858,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
struct C_PG_MarkUnfoundLost : public Context {
ReplicatedPG *pg;
- list<ReplicatedPG::ObjectContext*> obcs;
+ list<ObjectContext*> obcs;
C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {
pg->get();
}
@@ -6246,14 +6039,19 @@ void ReplicatedPG::on_removal()
{
dout(10) << "on_removal" << dendl;
apply_and_flush_repops(false);
- remove_watchers_and_notifies();
+ context_registry_on_change();
}
void ReplicatedPG::on_shutdown()
{
dout(10) << "on_shutdown" << dendl;
apply_and_flush_repops(false);
- remove_watchers_and_notifies();
+ context_registry_on_change();
+}
+
+void ReplicatedPG::on_flushed()
+{
+ assert(object_contexts.empty());
}
void ReplicatedPG::on_activate()
@@ -7581,4 +7379,5 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&
return transit< NotTrimming >();
}
-
+void intrusive_ptr_add_ref(ReplicatedPG *pg) { pg->get(); }
+void intrusive_ptr_release(ReplicatedPG *pg) { pg->put(); }
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index ae506e55074..50af643163a 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -14,6 +14,9 @@
#ifndef CEPH_REPLICATEDPG_H
#define CEPH_REPLICATEDPG_H
+#include <boost/optional.hpp>
+
+#include "include/assert.h"
#include "PG.h"
#include "OSD.h"
@@ -60,6 +63,7 @@ public:
class ReplicatedPG : public PG {
friend class OSD;
+ friend class Watch;
public:
/*
@@ -95,24 +99,6 @@ public:
*/
- struct SnapSetContext {
- object_t oid;
- int ref;
- bool registered;
- SnapSet snapset;
-
- SnapSetContext(const object_t& o) : oid(o), ref(0), registered(false) { }
- };
-
- struct ObjectState {
- object_info_t oi;
- bool exists;
-
- ObjectState(const object_info_t &oi_, bool exists_)
- : oi(oi_), exists(exists_) {}
- };
-
-
struct AccessMode {
typedef enum {
IDLE,
@@ -254,81 +240,6 @@ public:
}
};
-
- /*
- * keep tabs on object modifications that are in flight.
- * we need to know the projected existence, size, snapset,
- * etc., because we don't send writes down to disk until after
- * replicas ack.
- */
- struct ObjectContext {
- int ref;
- bool registered;
- ObjectState obs;
-
- SnapSetContext *ssc; // may be null
-
- private:
- Mutex lock;
- public:
- Cond cond;
- int unstable_writes, readers, writers_waiting, readers_waiting;
-
- // set if writes for this object are blocked on another objects recovery
- ObjectContext *blocked_by; // object blocking our writes
- set<ObjectContext*> blocking; // objects whose writes we block
-
- // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
- map<entity_name_t, OSD::Session *> watchers;
- map<entity_name_t, Watch::C_WatchTimeout *> unconnected_watchers;
- map<Watch::Notification *, bool> notifs;
-
- ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
- : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_),
- lock("ReplicatedPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
- blocked_by(0) {}
-
- void get() { ++ref; }
-
- // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
- void ondisk_write_lock() {
- lock.Lock();
- writers_waiting++;
- while (readers_waiting || readers)
- cond.Wait(lock);
- writers_waiting--;
- unstable_writes++;
- lock.Unlock();
- }
- void ondisk_write_unlock() {
- lock.Lock();
- assert(unstable_writes > 0);
- unstable_writes--;
- if (!unstable_writes && readers_waiting)
- cond.Signal();
- lock.Unlock();
- }
- void ondisk_read_lock() {
- lock.Lock();
- readers_waiting++;
- while (unstable_writes)
- cond.Wait(lock);
- readers_waiting--;
- readers++;
- lock.Unlock();
- }
- void ondisk_read_unlock() {
- lock.Lock();
- assert(readers > 0);
- readers--;
- if (!readers && writers_waiting)
- cond.Signal();
- lock.Unlock();
- }
- };
-
-
/*
* Capture all object state associated with an in-progress read or write.
*/
@@ -349,10 +260,17 @@ public:
bool user_modify; // user-visible modification
// side effects
- bool watch_connect, watch_disconnect;
- watch_info_t watch_info;
+ list<watch_info_t> watch_connects;
+ list<watch_info_t> watch_disconnects;
list<notify_info_t> notifies;
- list<uint64_t> notify_acks;
+ struct NotifyAck {
+ boost::optional<uint64_t> watch_cookie;
+ uint64_t notify_id;
+ NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
+ NotifyAck(uint64_t notify_id, uint64_t cookie)
+ : watch_cookie(cookie), notify_id(notify_id) {}
+ };
+ list<NotifyAck> notify_acks;
uint64_t bytes_written, bytes_read;
@@ -386,7 +304,6 @@ public:
op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
new_obs(_obs->oi, _obs->exists),
modify(false), user_modify(false),
- watch_connect(false), watch_disconnect(false),
bytes_written(0), bytes_read(0),
obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {
if (_ssc) {
@@ -517,14 +434,9 @@ protected:
map<object_t, SnapSetContext*> snapset_contexts;
void populate_obc_watchers(ObjectContext *obc);
- void register_unconnected_watcher(void *obc,
- entity_name_t entity,
- utime_t expire);
- void unregister_unconnected_watcher(void *obc,
- entity_name_t entity);
- void handle_watch_timeout(void *obc,
- entity_name_t entity,
- utime_t expire);
+public:
+ void handle_watch_timeout(WatchRef watch);
+protected:
ObjectContext *lookup_object_context(const hobject_t& soid) {
if (object_contexts.count(soid)) {
@@ -818,11 +730,6 @@ protected:
void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
- void dump_watchers(ObjectContext *obc);
- void remove_watcher(ObjectContext *obc, entity_name_t entity);
- void remove_notify(ObjectContext *obc, Watch::Notification *notif);
- void remove_watchers_and_notifies();
-
struct RepModify {
ReplicatedPG *pg;
OpRequestRef op;
@@ -1108,24 +1015,11 @@ public:
void on_role_change();
void on_change();
void on_activate();
+ void on_flushed();
void on_removal();
void on_shutdown();
};
-
-inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectState& obs)
-{
- out << obs.oi.soid;
- if (!obs.exists)
- out << "(dne)";
- return out;
-}
-
-inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc)
-{
- return out << "obc(" << obc.obs << ")";
-}
-
inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
{
out << "repgather(" << &repop
@@ -1151,4 +1045,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::AccessMode& mode)
return out;
}
+void intrusive_ptr_add_ref(ReplicatedPG *pg);
+void intrusive_ptr_release(ReplicatedPG *pg);
+
#endif
diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc
index da90cef4103..1ee5f35b5e3 100644
--- a/src/osd/Watch.cc
+++ b/src/osd/Watch.cc
@@ -1,7 +1,8 @@
-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include "PG.h"
#include "include/types.h"
+#include "messages/MWatchNotify.h"
#include <map>
@@ -11,26 +12,454 @@
#include "common/config.h"
-bool Watch::ack_notification(entity_name_t& watcher, Notification *notif)
+struct CancelableContext : public Context {
+ virtual void cancel() = 0;
+};
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+static ostream& _prefix(
+ std::ostream* _dout,
+ Notify *notify) {
+ return *_dout << notify->gen_dbg_prefix();
+}
+
+Notify::Notify(
+ ConnectionRef client,
+ unsigned num_watchers,
+ bufferlist &payload,
+ uint32_t timeout,
+ uint64_t cookie,
+ uint64_t notify_id,
+ uint64_t version,
+ OSDService *osd)
+ : client(client),
+ in_progress_watchers(num_watchers),
+ complete(false),
+ discarded(false),
+ payload(payload),
+ timeout(timeout),
+ cookie(cookie),
+ notify_id(notify_id),
+ version(version),
+ osd(osd),
+ cb(NULL),
+ lock("Notify::lock") {}
+
+NotifyRef Notify::makeNotifyRef(
+ ConnectionRef client,
+ unsigned num_watchers,
+ bufferlist &payload,
+ uint32_t timeout,
+ uint64_t cookie,
+ uint64_t notify_id,
+ uint64_t version,
+ OSDService *osd) {
+ NotifyRef ret(
+ new Notify(
+ client, num_watchers,
+ payload, timeout,
+ cookie, notify_id,
+ version, osd));
+ ret->set_self(ret);
+ return ret;
+}
+
+class NotifyTimeoutCB : public CancelableContext {
+ NotifyRef notif;
+ bool canceled; // protected by notif lock
+public:
+ NotifyTimeoutCB(NotifyRef notif) : notif(notif) {}
+ void finish(int) {
+ notif->osd->watch_lock.Unlock();
+ notif->lock.Lock();
+ if (!canceled)
+ notif->do_timeout(); // drops lock
+ else
+ notif->lock.Unlock();
+ notif->osd->watch_lock.Lock();
+ }
+ void cancel() {
+ assert(notif->lock.is_locked_by_me());
+ canceled = true;
+ }
+};
+
+void Notify::do_timeout()
{
- map<entity_name_t, WatcherState>::iterator iter = notif->watchers.find(watcher);
+ assert(lock.is_locked_by_me());
+ dout(10) << "timeout" << dendl;
+ cb = NULL;
+ if (is_discarded()) {
+ lock.Unlock();
+ return;
+ }
- if (iter == notif->watchers.end()) // client was not suppose to ack this notification
- return false;
+ in_progress_watchers = 0; // we give up TODO: we should return an error code
+ maybe_complete_notify();
+ assert(complete);
+ set<WatchRef> _watchers;
+ _watchers.swap(watchers);
+ lock.Unlock();
- notif->watchers.erase(iter);
+ for (set<WatchRef>::iterator i = _watchers.begin();
+ i != _watchers.end();
+ ++i) {
+ boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg());
+ pg->lock();
+ if (!(*i)->is_discarded()) {
+ (*i)->cancel_notify(self.lock());
+ }
+ pg->unlock();
+ }
+}
- return notif->watchers.empty(); // true if there are no more watchers
+void Notify::register_cb()
+{
+ assert(lock.is_locked_by_me());
+ {
+ osd->watch_lock.Lock();
+ cb = new NotifyTimeoutCB(self.lock());
+ osd->watch_timer.add_event_after(
+ timeout,
+ cb);
+ osd->watch_lock.Unlock();
+ }
+}
+
+void Notify::unregister_cb()
+{
+ assert(lock.is_locked_by_me());
+ if (!cb)
+ return;
+ cb->cancel();
+ {
+ osd->watch_lock.Lock();
+ osd->watch_timer.cancel_event(cb);
+ cb = NULL;
+ osd->watch_lock.Unlock();
+ }
+}
+
+void Notify::start_watcher(WatchRef watch)
+{
+ Mutex::Locker l(lock);
+ dout(10) << "start_watcher" << dendl;
+ watchers.insert(watch);
+}
+
+void Notify::complete_watcher(WatchRef watch)
+{
+ Mutex::Locker l(lock);
+ dout(10) << "complete_watcher" << dendl;
+ if (is_discarded())
+ return;
+ assert(in_progress_watchers > 0);
+ watchers.erase(watch);
+ --in_progress_watchers;
+ maybe_complete_notify();
+}
+
+void Notify::maybe_complete_notify()
+{
+ dout(10) << "maybe_complete_notify -- "
+ << in_progress_watchers
+ << " in progress watchers " << dendl;
+ if (!in_progress_watchers) {
+ MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
+ WATCH_NOTIFY, payload));
+ osd->send_message_osd_client(reply, client.get());
+ unregister_cb();
+ complete = true;
+ }
+}
+
+void Notify::discard()
+{
+ Mutex::Locker l(lock);
+ discarded = true;
+ unregister_cb();
+ watchers.clear();
+}
+
+void Notify::init()
+{
+ Mutex::Locker l(lock);
+ register_cb();
+ maybe_complete_notify();
+ assert(in_progress_watchers == watchers.size());
+}
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, watch.get())
+
+static ostream& _prefix(
+ std::ostream* _dout,
+ Watch *watch) {
+ return *_dout << watch->gen_dbg_prefix();
+}
+
+class HandleWatchTimeout : public CancelableContext {
+ WatchRef watch;
+public:
+ bool canceled; // protected by watch->pg->lock
+ HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+ void cancel() {
+ canceled = true;
+ }
+ void finish(int) { assert(0); /* not used */ }
+ void complete(int) {
+ dout(10) << "HandleWatchTimeout" << dendl;
+ boost::intrusive_ptr<ReplicatedPG> pg(watch->pg);
+ OSDService *osd(watch->osd);
+ osd->watch_lock.Unlock();
+ pg->lock();
+ watch->cb = NULL;
+ if (!watch->is_discarded() && !canceled)
+ watch->pg->handle_watch_timeout(watch);
+ delete this; // ~Watch requires pg lock!
+ pg->unlock();
+ osd->watch_lock.Lock();
+ }
+};
+
+class HandleDelayedWatchTimeout : public CancelableContext {
+ WatchRef watch;
+public:
+ bool canceled;
+ HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+ void cancel() {
+ canceled = true;
+ }
+ void finish(int) {
+ dout(10) << "HandleWatchTimeoutDelayed" << dendl;
+ assert(watch->pg->is_locked());
+ watch->cb = NULL;
+ if (!watch->is_discarded() && !canceled)
+ watch->pg->handle_watch_timeout(watch);
+ }
+};
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+string Watch::gen_dbg_prefix() {
+ stringstream ss;
+ ss << pg->gen_prefix() << " -- Watch("
+ << make_pair(cookie, entity)
+ << ", obc->ref=" << (obc ? obc->ref : -1) << ") ";
+ return ss.str();
+}
+
+Watch::Watch(
+ ReplicatedPG *pg,
+ OSDService *osd,
+ ObjectContext *obc,
+ uint32_t timeout,
+ uint64_t cookie,
+ entity_name_t entity)
+ : cb(NULL),
+ osd(osd),
+ pg(pg),
+ obc(obc),
+ timeout(timeout),
+ cookie(cookie),
+ entity(entity),
+ discarded(false) {
+ obc->get();
+ dout(10) << "Watch()" << dendl;
+}
+
+Watch::~Watch() {
+ dout(10) << "~Watch" << dendl;
+ // users must have called remove() or discard() prior to this point
+ assert(!obc);
+ assert(!conn);
+}
+
+bool Watch::connected() { return conn; }
+
+Context *Watch::get_delayed_cb()
+{
+ assert(!cb);
+ cb = new HandleDelayedWatchTimeout(self.lock());
+ return cb;
}
-void Watch::C_NotifyTimeout::finish(int r)
+ObjectContext *Watch::get_obc()
{
- osd->handle_notify_timeout(notif);
+ assert(obc);
+ obc->get();
+ return obc;
}
-void Watch::C_WatchTimeout::finish(int r)
+void Watch::register_cb()
{
- osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity,
- expire);
+ Mutex::Locker l(osd->watch_lock);
+ dout(15) << "registering callback, timeout: " << timeout << dendl;
+ cb = new HandleWatchTimeout(self.lock());
+ osd->watch_timer.add_event_after(
+ timeout,
+ cb);
}
+void Watch::unregister_cb()
+{
+ dout(15) << "unregister_cb" << dendl;
+ if (!cb)
+ return;
+ dout(15) << "actually registered, cancelling" << dendl;
+ cb->cancel();
+ {
+ Mutex::Locker l(osd->watch_lock);
+ osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
+ }
+ cb = NULL;
+}
+
+void Watch::connect(ConnectionRef con)
+{
+ dout(10) << "connecting" << dendl;
+ conn = con;
+ OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv()));
+ sessionref->wstate.addWatch(self.lock());
+ sessionref->put();
+ for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+ i != in_progress_notifies.end();
+ ++i) {
+ send_notify(i->second);
+ }
+ unregister_cb();
+}
+
+void Watch::disconnect()
+{
+ dout(10) << "disconnect" << dendl;
+ conn = ConnectionRef();
+ register_cb();
+}
+
+void Watch::discard()
+{
+ dout(10) << "discard" << dendl;
+ for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+ i != in_progress_notifies.end();
+ ++i) {
+ i->second->discard();
+ }
+ discard_state();
+}
+
+void Watch::discard_state()
+{
+ assert(pg->is_locked());
+ assert(!discarded);
+ assert(obc);
+ in_progress_notifies.clear();
+ unregister_cb();
+ discarded = true;
+ if (conn) {
+ OSD::Session* sessionref(static_cast<OSD::Session*>(conn->get_priv()));
+ sessionref->wstate.removeWatch(self.lock());
+ sessionref->put();
+ conn = ConnectionRef();
+ }
+ pg->put_object_context(obc);
+ obc = NULL;
+}
+
+bool Watch::is_discarded()
+{
+ return discarded;
+}
+
+void Watch::remove()
+{
+ dout(10) << "remove" << dendl;
+ for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+ i != in_progress_notifies.end();
+ ++i) {
+ i->second->complete_watcher(self.lock());
+ }
+ discard_state();
+}
+
+void Watch::start_notify(NotifyRef notif)
+{
+ dout(10) << "start_notify " << notif->notify_id << dendl;
+ assert(in_progress_notifies.find(notif->notify_id) ==
+ in_progress_notifies.end());
+ in_progress_notifies[notif->notify_id] = notif;
+ notif->start_watcher(self.lock());
+ if (connected())
+ send_notify(notif);
+}
+
+void Watch::cancel_notify(NotifyRef notif)
+{
+ dout(10) << "cancel_notify " << notif->notify_id << dendl;
+ in_progress_notifies.erase(notif->notify_id);
+}
+
+void Watch::send_notify(NotifyRef notif)
+{
+ dout(10) << "send_notify" << dendl;
+ MWatchNotify *notify_msg = new MWatchNotify(
+ cookie, notif->version, notif->notify_id,
+ WATCH_NOTIFY, notif->payload);
+ osd->send_message_osd_client(notify_msg, conn.get());
+}
+
+void Watch::notify_ack(uint64_t notify_id)
+{
+ dout(10) << "notify_ack" << dendl;
+ map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
+ if (i != in_progress_notifies.end()) {
+ i->second->complete_watcher(self.lock());
+ in_progress_notifies.erase(i);
+ }
+}
+
+WatchRef Watch::makeWatchRef(
+ ReplicatedPG *pg, OSDService *osd,
+ ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity)
+{
+ WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity));
+ ret->set_self(ret);
+ return ret;
+}
+
+void WatchConState::addWatch(WatchRef watch)
+{
+ Mutex::Locker l(lock);
+ watches.insert(watch);
+}
+
+void WatchConState::removeWatch(WatchRef watch)
+{
+ Mutex::Locker l(lock);
+ watches.erase(watch);
+}
+
+void WatchConState::reset()
+{
+ set<WatchRef> _watches;
+ {
+ Mutex::Locker l(lock);
+ _watches.swap(watches);
+ }
+ for (set<WatchRef>::iterator i = _watches.begin();
+ i != _watches.end();
+ ++i) {
+ boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg());
+ pg->lock();
+ if (!(*i)->is_discarded()) {
+ (*i)->disconnect();
+ }
+ pg->unlock();
+ }
+}
diff --git a/src/osd/Watch.h b/src/osd/Watch.h
index cb48de4d426..089350f35bb 100644
--- a/src/osd/Watch.h
+++ b/src/osd/Watch.h
@@ -11,95 +11,252 @@
* Foundation. See file COPYING.
*
*/
-
-
#ifndef CEPH_WATCH_H
#define CEPH_WATCH_H
-#include <map>
+#include <boost/intrusive_ptr.hpp>
+#include <tr1/memory>
+#include <set>
-#include "OSD.h"
-#include "common/config.h"
+#include "msg/Messenger.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+
+enum WatcherState {
+ WATCHER_PENDING,
+ WATCHER_NOTIFIED,
+};
+class OSDService;
+class ReplicatedPG;
+void intrusive_ptr_add_ref(ReplicatedPG *pg);
+void intrusive_ptr_release(ReplicatedPG *pg);
+class ObjectContext;
class MWatchNotify;
-/* keeps track and accounts sessions, watchers and notifiers */
-class Watch {
- uint64_t notif_id;
+class Watch;
+typedef std::tr1::shared_ptr<Watch> WatchRef;
+typedef std::tr1::weak_ptr<Watch> WWatchRef;
-public:
- enum WatcherState {
- WATCHER_PENDING,
- WATCHER_NOTIFIED,
- };
-
- struct Notification {
- std::map<entity_name_t, WatcherState> watchers;
- entity_name_t name;
- uint64_t id;
- OSD::Session *session;
- uint64_t cookie;
- MWatchNotify *reply;
- Context *timeout;
- void *obc;
- pg_t pgid;
- bufferlist bl;
-
- void add_watcher(const entity_name_t& name, WatcherState state) {
- watchers[name] = state;
- }
-
- Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b)
- : name(n), id(0), session(s), cookie(c), reply(0), timeout(0),
- obc(0), bl(b) { }
- };
-
- class C_NotifyTimeout : public Context {
- OSD *osd;
- Notification *notif;
- public:
- C_NotifyTimeout(OSD *_osd, Notification *_notif) : osd(_osd), notif(_notif) {}
- void finish(int r);
- };
-
- class C_WatchTimeout : public Context {
- OSD *osd;
- void *obc;
- void *pg;
- entity_name_t entity;
- public:
- utime_t expire;
- C_WatchTimeout(OSD *_osd, void *_obc, void *_pg,
- entity_name_t _entity, utime_t _expire) :
- osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {}
- void finish(int r);
- };
-
-private:
- std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */
+class Notify;
+typedef std::tr1::shared_ptr<Notify> NotifyRef;
+typedef std::tr1::weak_ptr<Notify> WNotifyRef;
-public:
- Watch() : notif_id(0) {}
+class CancelableContext;
- void add_notification(Notification *notif) {
- notif->id = ++notif_id;
- notifs[notif->id] = notif;
+/**
+ * Notify tracks the progress of a particular notify
+ *
+ * References are held by Watch and the timeout callback.
+ */
+class NotifyTimeoutCB;
+class Notify {
+ friend class NotifyTimeoutCB;
+ friend class Watch;
+ WNotifyRef self;
+ ConnectionRef client;
+ unsigned in_progress_watchers;
+ bool complete;
+ bool discarded;
+ set<WatchRef> watchers;
+
+ bufferlist payload;
+ uint32_t timeout;
+ uint64_t cookie;
+ uint64_t notify_id;
+ uint64_t version;
+
+ OSDService *osd;
+ CancelableContext *cb;
+ Mutex lock;
+
+
+ /// true if this notify is being discarded
+ bool is_discarded() {
+ return discarded || complete;
}
- Notification *get_notif(uint64_t id) {
- map<uint64_t, Notification *>::iterator iter = notifs.find(id);
- if (iter != notifs.end())
- return iter->second;
- return NULL;
+
+ /// Sends notify completion if in_progress_watchers == 0
+ void maybe_complete_notify();
+
+ /// Called on Notify timeout
+ void do_timeout();
+
+ Notify(
+ ConnectionRef client,
+ unsigned num_watchers,
+ bufferlist &payload,
+ uint32_t timeout,
+ uint64_t cookie,
+ uint64_t notify_id,
+ uint64_t version,
+ OSDService *osd);
+
+ /// registers a timeout callback with the watch_timer
+ void register_cb();
+
+ /// removes the timeout callback, called on completion or cancellation
+ void unregister_cb();
+public:
+ string gen_dbg_prefix() {
+ stringstream ss;
+ ss << "Notify(" << make_pair(cookie, notify_id) << " "
+ << " in_progress_watchers=" << in_progress_watchers
+ << ") ";
+ return ss.str();
}
- void remove_notification(Notification *notif) {
- map<uint64_t, Notification *>::iterator iter = notifs.find(notif->id);
- if (iter != notifs.end())
- notifs.erase(iter);
+ void set_self(NotifyRef _self) {
+ self = _self;
}
+ static NotifyRef makeNotifyRef(
+ ConnectionRef client,
+ unsigned num_watchers,
+ bufferlist &payload,
+ uint32_t timeout,
+ uint64_t cookie,
+ uint64_t notify_id,
+ uint64_t version,
+ OSDService *osd);
+
+ /// Call after creation to initialize
+ void init();
+
+ /// Called once per watcher prior to init()
+ void start_watcher(
+ WatchRef watcher ///< [in] watcher to complete
+ );
- bool ack_notification(entity_name_t& watcher, Notification *notif);
+ /// Called once per NotifyAck
+ void complete_watcher(
+ WatchRef watcher ///< [in] watcher to complete
+ );
+
+ /// Called when the notify is canceled due to a new peering interval
+ void discard();
};
+/**
+ * Watch is a mapping between a Connection and an ObjectContext
+ *
+ * References are held by ObjectContext and the timeout callback
+ */
+class HandleWatchTimeout;
+class HandleDelayedWatchTimeout;
+class Watch {
+ WWatchRef self;
+ friend class HandleWatchTimeout;
+ friend class HandleDelayedWatchTimeout;
+ ConnectionRef conn;
+ CancelableContext *cb;
+
+ OSDService *osd;
+ boost::intrusive_ptr<ReplicatedPG> pg;
+ ObjectContext *obc;
+
+ std::map<uint64_t, NotifyRef> in_progress_notifies;
+
+ uint32_t timeout;
+ uint64_t cookie;
+ entity_name_t entity;
+ bool discarded;
+ Watch(
+ ReplicatedPG *pg, OSDService *osd,
+ ObjectContext *obc, uint32_t timeout,
+ uint64_t cookie, entity_name_t entity);
+
+ /// Registers the timeout callback with watch_timer
+ void register_cb();
+
+ /// Unregisters the timeout callback
+ void unregister_cb();
+
+ /// send a Notify message when connected for notif
+ void send_notify(NotifyRef notif);
+
+ /// Cleans up state on discard or remove (including Connection state, obc)
+ void discard_state();
+public:
+ /// NOTE: must be called with pg lock held
+ ~Watch();
+
+ string gen_dbg_prefix();
+ static WatchRef makeWatchRef(
+ ReplicatedPG *pg, OSDService *osd,
+ ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity);
+ void set_self(WatchRef _self) {
+ self = _self;
+ }
+
+ /// Does not grant a ref count!
+ boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; }
+
+ /// Grants a ref count!
+ ObjectContext *get_obc();
+ uint64_t get_cookie() const { return cookie; }
+ entity_name_t get_entity() const { return entity; }
+
+ /// Generates context for use if watch timeout is delayed by scrub or recovery
+ Context *get_delayed_cb();
+
+ /// True if currently connected
+ bool connected();
+
+ /// Transitions Watch to connected, unregister_cb, resends pending Notifies
+ void connect(
+ ConnectionRef con ///< [in] Reference to new connection
+ );
+
+ /// Transitions watch to disconnected, register_cb
+ void disconnect();
+
+ /// Called if Watch state is discarded due to new peering interval
+ void discard();
+
+ /// True if removed or discarded
+ bool is_discarded();
+
+ /// Called on unwatch
+ void remove();
+
+ /// Adds notif as in-progress notify
+ void start_notify(
+ NotifyRef notif ///< [in] Reference to new in-progress notify
+ );
+
+ /// Removes timed out notify
+ void cancel_notify(
+ NotifyRef notif ///< [in] notify which timed out
+ );
+
+ /// Call when notify_ack received on notify_id
+ void notify_ack(
+ uint64_t notify_id ///< [in] id of acked notify
+ );
+};
+
+/**
+ * Holds weak refs to Watch structures corresponding to a connection
+ * Lives in the OSD::Session object of an OSD connection
+ */
+class WatchConState {
+ Mutex lock;
+ std::set<WatchRef> watches;
+public:
+ WatchConState() : lock("WatchConState") {}
+
+ /// Add a watch
+ void addWatch(
+ WatchRef watch ///< [in] Ref to new watch object
+ );
+
+ /// Remove a watch
+ void removeWatch(
+ WatchRef watch ///< [in] Ref to watch object to remove
+ );
+
+ /// Called on session reset, disconnects watchers
+ void reset();
+};
#endif
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 2136191bc19..6953f182000 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -2491,7 +2491,14 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid,
void object_info_t::encode(bufferlist& bl) const
{
- ENCODE_START(10, 8, bl);
+ map<entity_name_t, watch_info_t> old_watchers;
+ for (map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator i =
+ watchers.begin();
+ i != watchers.end();
+ ++i) {
+ old_watchers.insert(make_pair(i->first.second, i->second));
+ }
+ ENCODE_START(11, 8, bl);
::encode(soid, bl);
::encode(oloc, bl);
::encode(category, bl);
@@ -2507,15 +2514,17 @@ void object_info_t::encode(bufferlist& bl) const
::encode(truncate_seq, bl);
::encode(truncate_size, bl);
::encode(lost, bl);
- ::encode(watchers, bl);
+ ::encode(old_watchers, bl);
::encode(user_version, bl);
::encode(uses_tmap, bl);
+ ::encode(watchers, bl);
ENCODE_FINISH(bl);
}
void object_info_t::decode(bufferlist::iterator& bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(10, 8, 8, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(11, 8, 8, bl);
+ map<entity_name_t, watch_info_t> old_watchers;
if (struct_v >= 2 && struct_v <= 5) {
sobject_t obj;
::decode(obj, bl);
@@ -2549,7 +2558,7 @@ void object_info_t::decode(bufferlist::iterator& bl)
else
lost = false;
if (struct_v >= 4) {
- ::decode(watchers, bl);
+ ::decode(old_watchers, bl);
::decode(user_version, bl);
}
if (struct_v >= 9)
@@ -2558,6 +2567,17 @@ void object_info_t::decode(bufferlist::iterator& bl)
uses_tmap = true;
if (struct_v < 10)
soid.pool = oloc.pool;
+ if (struct_v >= 11) {
+ ::decode(watchers, bl);
+ } else {
+ for (map<entity_name_t, watch_info_t>::iterator i = old_watchers.begin();
+ i != old_watchers.end();
+ ++i) {
+ watchers.insert(
+ make_pair(
+ make_pair(i->second.cookie, i->first), i->second));
+ }
+ }
DECODE_FINISH(bl);
}
@@ -2584,9 +2604,10 @@ void object_info_t::dump(Formatter *f) const
f->dump_unsigned("truncate_seq", truncate_seq);
f->dump_unsigned("truncate_size", truncate_size);
f->open_object_section("watchers");
- for (map<entity_name_t,watch_info_t>::const_iterator p = watchers.begin(); p != watchers.end(); ++p) {
+ for (map<pair<uint64_t, entity_name_t>,watch_info_t>::const_iterator p =
+ watchers.begin(); p != watchers.end(); ++p) {
stringstream ss;
- ss << p->first;
+ ss << p->first.second;
f->open_object_section(ss.str().c_str());
p->second.dump(f);
f->close_section();
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index ff8c2c5219e..008fe8e9ded 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -27,7 +27,7 @@
#include "common/snap_types.h"
#include "common/Formatter.h"
#include "os/hobject.h"
-
+#include "Watch.h"
#define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v026"
@@ -1728,7 +1728,6 @@ static inline ostream& operator<<(ostream& out, const notify_info_t& n) {
}
-
struct object_info_t {
hobject_t soid;
object_locator_t oloc;
@@ -1748,7 +1747,7 @@ struct object_info_t {
uint64_t truncate_seq, truncate_size;
- map<entity_name_t, watch_info_t> watchers;
+ map<pair<uint64_t, entity_name_t>, watch_info_t> watchers;
bool uses_tmap;
void copy_user_bits(const object_info_t& other);
@@ -1780,6 +1779,109 @@ struct object_info_t {
};
WRITE_CLASS_ENCODER(object_info_t)
+struct ObjectState {
+ object_info_t oi;
+ bool exists;
+
+ ObjectState(const object_info_t &oi_, bool exists_)
+ : oi(oi_), exists(exists_) {}
+};
+
+
+struct SnapSetContext {
+ object_t oid;
+ int ref;
+ bool registered;
+ SnapSet snapset;
+
+ SnapSetContext(const object_t& o) : oid(o), ref(0), registered(false) { }
+};
+
+
+/*
+ * keep tabs on object modifications that are in flight.
+ * we need to know the projected existence, size, snapset,
+ * etc., because we don't send writes down to disk until after
+ * replicas ack.
+ */
+struct ObjectContext {
+ int ref;
+ bool registered;
+ ObjectState obs;
+
+ SnapSetContext *ssc; // may be null
+
+private:
+ Mutex lock;
+public:
+ Cond cond;
+ int unstable_writes, readers, writers_waiting, readers_waiting;
+
+ // set if writes for this object are blocked on another objects recovery
+ ObjectContext *blocked_by; // object blocking our writes
+ set<ObjectContext*> blocking; // objects whose writes we block
+
+ // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
+ map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
+
+ ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
+ : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_),
+ lock("ReplicatedPG::ObjectContext::lock"),
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+ blocked_by(0) {}
+
+ void get() { ++ref; }
+
+ // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy.
+ void ondisk_write_lock() {
+ lock.Lock();
+ writers_waiting++;
+ while (readers_waiting || readers)
+ cond.Wait(lock);
+ writers_waiting--;
+ unstable_writes++;
+ lock.Unlock();
+ }
+ void ondisk_write_unlock() {
+ lock.Lock();
+ assert(unstable_writes > 0);
+ unstable_writes--;
+ if (!unstable_writes && readers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+ void ondisk_read_lock() {
+ lock.Lock();
+ readers_waiting++;
+ while (unstable_writes)
+ cond.Wait(lock);
+ readers_waiting--;
+ readers++;
+ lock.Unlock();
+ }
+ void ondisk_read_unlock() {
+ lock.Lock();
+ assert(readers > 0);
+ readers--;
+ if (!readers && writers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+};
+
+inline ostream& operator<<(ostream& out, ObjectState& obs)
+{
+ out << obs.oi.soid;
+ if (!obs.exists)
+ out << "(dne)";
+ return out;
+}
+
+inline ostream& operator<<(ostream& out, ObjectContext& obc)
+{
+ return out << "obc(" << obc.obs << ")";
+}
+
ostream& operator<<(ostream& out, const object_info_t& oi);
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index baf600c53be..9ff02f6ab93 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -487,8 +487,10 @@ struct ObjectOperation {
add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl);
}
- void notify_ack(uint64_t notify_id, uint64_t ver) {
+ void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) {
bufferlist bl;
+ ::encode(notify_id, bl);
+ ::encode(cookie, bl);
add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
}