summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-12-14 14:32:44 -0800
committerSage Weil <sage@inktank.com>2012-12-14 14:32:44 -0800
commit79db5a40c7adb1a09f1ab7831ac242e721a464ff (patch)
tree405c6e26051abc06a46a8c0ceb35afe56da8116e
parenta7de975d9344621201a09b03453682cc7c798892 (diff)
parent97cc55d599ea1588af2e73d6972e1c9dfd9a545b (diff)
downloadceph-79db5a40c7adb1a09f1ab7831ac242e721a464ff.tar.gz
Merge branch 'wip_watch' into next
-rw-r--r--src/osd/OSD.cc16
-rw-r--r--src/osd/PG.cc3
-rw-r--r--src/osd/PG.h32
-rw-r--r--src/osd/ReplicatedPG.cc109
4 files changed, 143 insertions, 17 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 421d7e1af32..217dd23b152 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -2431,8 +2431,21 @@ void OSD::disconnect_session_watches(Session *session)
dout(10) << "obc=" << (void *)obc << dendl;
ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
- assert(pg);
+ if (!pg) {
+ /* pg removed between watch_unlock.Unlock() and now, all related
+ * watch structures would have been cleaned up in remove_watchers_and_notifies
+ */
+ continue;
+ }
service.watch_lock.Lock();
+
+ if (!session->watches.count((void*)obc)) {
+ // Raced with watch removal, obc is invalid
+ service.watch_lock.Unlock();
+ pg->unlock();
+ continue;
+ }
+
/* NOTE! fix this one, should be able to just lookup entity name,
however, we currently only keep EntityName on the session and not
entity_name_t. */
@@ -2450,6 +2463,7 @@ void OSD::disconnect_session_watches(Session *session)
<< ", expires " << expire << dendl;
obc->watchers.erase(witer++);
pg->put_object_context(obc);
+ session->con->put();
session->put();
}
if (witer == obc->watchers.end())
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 49d12ea35ef..c25cce15df5 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1358,6 +1358,8 @@ void PG::activate(ObjectStore::Transaction& t,
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map)
{
assert(!is_active());
+ assert(scrubber.callbacks.empty());
+ assert(callbacks_for_degraded_object.empty());
// -- crash recovery?
if (is_primary() &&
@@ -3941,6 +3943,7 @@ void PG::chunky_scrub() {
scrub_compare_maps();
scrubber.block_writes = false;
+ scrubber.run_callbacks();
// requeue the writes from the chunk that just finished
requeue_ops(waiting_for_active);
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 2cf1173203d..acaa89708d6 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -656,6 +656,8 @@ protected:
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
+ // Callbacks should assume pg (and nothing else) is locked
+ map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);
@@ -857,6 +859,20 @@ public:
// deep scrub
bool deep;
+ list<Context*> callbacks;
+ void add_callback(Context *context) {
+ callbacks.push_back(context);
+ }
+ void run_callbacks() {
+ list<Context*> to_run;
+ to_run.swap(callbacks);
+ for (list<Context*>::iterator i = to_run.begin();
+ i != to_run.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
+
static const char *state_string(const PG::Scrubber::State& state) {
const char *ret = NULL;
switch( state )
@@ -875,6 +891,21 @@ public:
bool is_chunky_scrub_active() const { return state != INACTIVE; }
+ // classic (non chunk) scrubs block all writes
+ // chunky scrubs only block writes to a range
+ bool write_blocked_by_scrub(const hobject_t &soid) {
+ if (!block_writes)
+ return false;
+
+ if (!is_chunky)
+ return true;
+
+ if (soid >= start && soid < end)
+ return true;
+
+ return false;
+ }
+
// clear all state
void reset() {
finalizing = false;
@@ -896,6 +927,7 @@ public:
errors = 0;
fixed = 0;
deep = false;
+ run_callbacks();
}
} scrubber;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 675973e8f07..4c6c481cc65 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -628,15 +628,11 @@ void ReplicatedPG::do_op(OpRequestRef op)
CEPH_NOSNAP, m->get_pg().ps(),
info.pgid.pool());
- if (scrubber.block_writes && m->may_write()) {
- // classic (non chunk) scrubs block all writes
- // chunky scrubs only block writes to a range
- if (!scrubber.is_chunky || (head >= scrubber.start && head < scrubber.end)) {
- dout(20) << __func__ << ": waiting for scrub" << dendl;
- waiting_for_active.push_back(op);
- op->mark_delayed();
- return;
- }
+ if (m->may_write() && scrubber.write_blocked_by_scrub(head)) {
+ dout(20) << __func__ << ": waiting for scrub" << dendl;
+ waiting_for_active.push_back(op);
+ op->mark_delayed();
+ return;
}
// missing object?
@@ -1590,6 +1586,7 @@ void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity)
session->watches.erase(obc);
put_object_context(obc);
+ session->con->put();
session->put();
}
@@ -1606,8 +1603,8 @@ void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif)
assert(niter != obc->notifs.end());
- niter->first->session->put();
niter->first->session->con->put();
+ niter->first->session->put();
obc->notifs.erase(niter);
put_object_context(obc);
@@ -3349,6 +3346,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
if (iter == obc->watchers.end()) {
dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl;
obc->watchers[entity] = session;
+ session->con->get();
session->get();
session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
obc->ref++;
@@ -3360,10 +3358,14 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
// weird: same entity, different session.
dout(10) << " reconnected (with different session!) watch " << w << " by " << entity
<< " session " << session << " (was " << iter->second << ")" << dendl;
+ session->con->get();
+ session->get();
+
iter->second->watches.erase(obc);
+ iter->second->con->put();
iter->second->put();
+
iter->second = session;
- session->get();
session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
}
map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter =
@@ -4114,10 +4116,14 @@ void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
entity_name_t entity)
{
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
- osd->watch_timer.cancel_event(obc->unconnected_watchers[entity]);
+
+ /* If we failed to cancel the event, the event will fire and the obc
+ * ref and the pg ref will be taken care of */
+ if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) {
+ put_object_context(obc);
+ put();
+ }
obc->unconnected_watchers.erase(entity);
- put_object_context(obc);
- put();
}
void ReplicatedPG::register_unconnected_watcher(void *_obc,
@@ -4141,15 +4147,76 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
entity_name_t entity,
utime_t expire)
{
+ dout(10) << "handle_watch_timeout obc " << _obc << dendl;
+ struct HandleWatchTimeout : public Context {
+ epoch_t cur_epoch;
+ boost::intrusive_ptr<ReplicatedPG> pg;
+ void *obc;
+ entity_name_t entity;
+ utime_t expire;
+ HandleWatchTimeout(
+ epoch_t cur_epoch,
+ ReplicatedPG *pg,
+ void *obc,
+ entity_name_t entity,
+ utime_t expire) : cur_epoch(cur_epoch),
+ pg(pg), obc(obc), entity(entity), expire(expire) {
+ assert(pg->is_locked());
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ }
+ void finish(int) {
+ assert(pg->is_locked());
+ if (cur_epoch < pg->last_peering_reset)
+ return;
+ // handle_watch_timeout gets its own ref
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ pg->handle_watch_timeout(obc, entity, expire);
+ }
+ ~HandleWatchTimeout() {
+ assert(pg->is_locked());
+ pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc));
+ }
+ };
+
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
if (obc->unconnected_watchers.count(entity) == 0 ||
- obc->unconnected_watchers[entity]->expire != expire) {
- dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " << entity << dendl;
+ (obc->unconnected_watchers[entity] &&
+ obc->unconnected_watchers[entity]->expire != expire)) {
+ /* If obc->unconnected_watchers[entity] == NULL we know at least that
+ * the watcher for obc,entity should expire. We might not have been
+ * the intended Context*, but that's ok since the intended one will
+ * take this branch and assume it raced. */
+ dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher "
+ << entity << dendl;
put_object_context(obc);
return;
}
+ if (is_degraded_object(obc->obs.oi.soid)) {
+ callbacks_for_degraded_object[obc->obs.oi.soid].push_back(
+ new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire)
+ );
+ dout(10) << "handle_watch_timeout waiting for degraded on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
+ if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ dout(10) << "handle_watch_timeout waiting for scrub on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire));
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
obc->unconnected_watchers.erase(entity);
obc->obs.oi.watchers.erase(entity);
@@ -5619,6 +5686,16 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
}
put_object_context(i->second);
}
+ if (callbacks_for_degraded_object.count(oid)) {
+ list<Context*> contexts;
+ contexts.swap(callbacks_for_degraded_object[oid]);
+ callbacks_for_degraded_object.erase(oid);
+ for (list<Context*>::iterator i = contexts.begin();
+ i != contexts.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
}
/** op_pull