summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-06-20 15:55:41 -0700
committerSage Weil <sage@inktank.com>2013-06-20 15:55:41 -0700
commite97a2c86de2dbbf911d86419db5913e6d789cd1f (patch)
tree96a0c16c81a1d4d488a55d294afdca41b70b62ed
parent4bf5b732cd8869276e87d4bbc4f261ee9e0c6a4c (diff)
parentb2f1a1ad58b71a86512c5abfea751f174aea044d (diff)
downloadceph-e97a2c86de2dbbf911d86419db5913e6d789cd1f.tar.gz
Merge remote-tracking branch 'yan/wip-mds' into wip-mds
-rw-r--r--src/mds/CDir.cc5
-rw-r--r--src/mds/CDir.h4
-rw-r--r--src/mds/CInode.cc53
-rw-r--r--src/mds/CInode.h1
-rw-r--r--src/mds/Capability.h5
-rw-r--r--src/mds/Locker.cc19
-rw-r--r--src/mds/MDCache.cc117
-rw-r--r--src/mds/MDCache.h1
-rw-r--r--src/mds/MDS.h2
-rw-r--r--src/mds/Migrator.cc8
-rw-r--r--src/mds/Migrator.h3
-rw-r--r--src/mds/Server.cc58
-rw-r--r--src/mds/Server.h1
-rw-r--r--src/messages/MMDSSlaveRequest.h6
14 files changed, 232 insertions, 51 deletions
diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc
index 211cec08b4f..2b991d78fde 100644
--- a/src/mds/CDir.cc
+++ b/src/mds/CDir.cc
@@ -154,6 +154,7 @@ ostream& CDir::print_db_line_prefix(ostream& out)
// CDir
CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
+ mseq(0),
dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
item_dirty(this), item_new(this),
pop_me(ceph_clock_now(g_ceph_context)),
@@ -1211,6 +1212,7 @@ void CDir::finish_waiting(uint64_t mask, int result)
fnode_t *CDir::project_fnode()
{
+ assert(get_version() != 0);
fnode_t *p = new fnode_t;
*p = *get_projected_fnode();
projected_fnode.push_back(p);
@@ -2120,6 +2122,8 @@ void CDir::_committed(version_t v)
void CDir::encode_export(bufferlist& bl)
{
assert(!is_projected());
+ ceph_seq_t seq = mseq + 1;
+ ::encode(seq, bl);
::encode(first, bl);
::encode(fnode, bl);
::encode(dirty_old_rstat, bl);
@@ -2149,6 +2153,7 @@ void CDir::finish_export(utime_t now)
void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
{
+ ::decode(mseq, blp);
::decode(first, blp);
::decode(fnode, blp);
::decode(dirty_old_rstat, blp);
diff --git a/src/mds/CDir.h b/src/mds/CDir.h
index 87c79c2af1b..11f4a76d047 100644
--- a/src/mds/CDir.h
+++ b/src/mds/CDir.h
@@ -170,6 +170,7 @@ public:
fnode_t fnode;
snapid_t first;
+ ceph_seq_t mseq; // migrate sequence
map<snapid_t,old_rstat_t> dirty_old_rstat; // [value.first,key]
// my inodes with dirty rstat data
@@ -547,7 +548,8 @@ public:
// -- import/export --
void encode_export(bufferlist& bl);
void finish_export(utime_t now);
- void abort_export() {
+ void abort_export() {
+ mseq += 2;
put(PIN_TEMPEXPORTING);
}
void decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls);
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index 0e1429377f8..48529948955 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -1069,11 +1069,12 @@ void CInode::_stored_backtrace(version_t v, Context *fin)
{
dout(10) << "_stored_backtrace" << dendl;
+ auth_unpin(this);
if (v == inode.backtrace_version)
clear_dirty_parent();
- auth_unpin(this);
if (fin)
fin->complete(0);
+ mdcache->maybe_eval_stray(this);
}
void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
@@ -1221,6 +1222,7 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
dout(20) << fg << " fragstat " << pf->fragstat << dendl;
dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
::encode(fg, tmp);
+ ::encode(dir->mseq, tmp);
::encode(dir->first, tmp);
::encode(pf->fragstat, tmp);
::encode(pf->accounted_fragstat, tmp);
@@ -1254,6 +1256,7 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
dout(10) << fg << " " << pf->rstat << dendl;
dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
::encode(fg, tmp);
+ ::encode(dir->mseq, tmp);
::encode(dir->first, tmp);
::encode(pf->rstat, tmp);
::encode(pf->accounted_rstat, tmp);
@@ -1403,10 +1406,12 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
while (n--) {
frag_t fg;
+ ceph_seq_t mseq;
snapid_t fgfirst;
frag_info_t fragstat;
frag_info_t accounted_fragstat;
::decode(fg, p);
+ ::decode(mseq, p);
::decode(fgfirst, p);
::decode(fragstat, p);
::decode(accounted_fragstat, p);
@@ -1419,6 +1424,12 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
assert(dir); // i am auth; i had better have this dir open
dout(10) << fg << " first " << dir->first << " -> " << fgfirst
<< " on " << *dir << dendl;
+ if (dir->fnode.fragstat.version == get_projected_inode()->dirstat.version &&
+ ceph_seq_cmp(mseq, dir->mseq) < 0) {
+ dout(10) << " mseq " << mseq << " < " << dir->mseq << ", ignoring" << dendl;
+ continue;
+ }
+ dir->mseq = mseq;
dir->first = fgfirst;
dir->fnode.fragstat = fragstat;
dir->fnode.accounted_fragstat = accounted_fragstat;
@@ -1461,11 +1472,13 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
::decode(n, p);
while (n--) {
frag_t fg;
+ ceph_seq_t mseq;
snapid_t fgfirst;
nest_info_t rstat;
nest_info_t accounted_rstat;
map<snapid_t,old_rstat_t> dirty_old_rstat;
::decode(fg, p);
+ ::decode(mseq, p);
::decode(fgfirst, p);
::decode(rstat, p);
::decode(accounted_rstat, p);
@@ -1480,6 +1493,12 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
assert(dir); // i am auth; i had better have this dir open
dout(10) << fg << " first " << dir->first << " -> " << fgfirst
<< " on " << *dir << dendl;
+ if (dir->fnode.rstat.version == get_projected_inode()->rstat.version &&
+ ceph_seq_cmp(mseq, dir->mseq) < 0) {
+ dout(10) << " mseq " << mseq << " < " << dir->mseq << ", ignoring" << dendl;
+ continue;
+ }
+ dir->mseq = mseq;
dir->first = fgfirst;
dir->fnode.rstat = rstat;
dir->fnode.accounted_rstat = accounted_rstat;
@@ -1605,6 +1624,36 @@ void CInode::start_scatter(ScatterLock *lock)
}
}
+/*
+ * set dirfrag_version to inode_version - 1. so that we can use dirfrag version
+ * to check if we have gathered scatter state for a given dirfrag.
+ */
+void CInode::start_scatter_gather(ScatterLock *lock, int auth)
+{
+ assert(is_auth());
+ inode_t *pi = get_projected_inode();
+
+ for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
+ p != dirfrags.end();
+ ++p) {
+ CDir *dir = p->second;
+
+ if (dir->is_auth())
+ continue;
+ if (auth >= 0 && dir->authority().first != auth)
+ continue;
+
+ switch (lock->get_type()) {
+ case CEPH_LOCK_IFILE:
+ dir->fnode.fragstat.version = pi->dirstat.version - 1;
+ break;
+ case CEPH_LOCK_INEST:
+ dir->fnode.rstat.version = pi->rstat.version - 1;
+ break;
+ }
+ }
+}
+
struct C_Inode_FragUpdate : public Context {
CInode *in;
CDir *dir;
@@ -1624,6 +1673,8 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
if (dir->is_frozen()) {
dout(10) << "finish_scatter_update " << fg << " frozen, marking " << *lock << " stale " << *dir << dendl;
+ } else if (dir->get_version() == 0) {
+ dout(10) << "finish_scatter_update " << fg << " not loaded, marking " << *lock << " stale " << *dir << dendl;
} else {
if (dir_accounted_version != inode_version) {
dout(10) << "finish_scatter_update " << fg << " journaling accounted scatterstat update v" << inode_version << dendl;
diff --git a/src/mds/CInode.h b/src/mds/CInode.h
index 779bb63f485..82c23474308 100644
--- a/src/mds/CInode.h
+++ b/src/mds/CInode.h
@@ -651,6 +651,7 @@ public:
void clear_scatter_dirty(); // on rejoin ack
void start_scatter(ScatterLock *lock);
+ void start_scatter_gather(ScatterLock *lock, int auth=-1);
void finish_scatter_update(ScatterLock *lock, CDir *dir,
version_t inode_version, version_t dir_accounted_version);
void finish_scatter_gather_update(int type);
diff --git a/src/mds/Capability.h b/src/mds/Capability.h
index 54d2312daeb..fdecb9090b3 100644
--- a/src/mds/Capability.h
+++ b/src/mds/Capability.h
@@ -273,7 +273,7 @@ public:
return Export(_wanted, issued(), pending(), client_follows, mseq+1, last_issue_stamp);
}
void rejoin_import() { mseq++; }
- void merge(Export& other) {
+ void merge(Export& other, bool auth_cap) {
// issued + pending
int newpending = other.pending | pending();
if (other.issued & ~newpending)
@@ -286,7 +286,8 @@ public:
// wanted
_wanted = _wanted | other.wanted;
- mseq = other.mseq;
+ if (auth_cap)
+ mseq = other.mseq;
}
void merge(int otherwanted, int otherissued) {
// issued + pending
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
index 8065207011a..30e014ab785 100644
--- a/src/mds/Locker.cc
+++ b/src/mds/Locker.cc
@@ -731,8 +731,9 @@ void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<C
lock->get_parent()->is_replicated()) {
dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
send_lock_message(lock, LOCK_AC_LOCK);
- lock->init_gather();
lock->set_state(LOCK_MIX_LOCK2);
+ lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
return;
}
@@ -3430,7 +3431,7 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
assert(lock->is_stable());
CInode *in = 0;
- if (lock->get_cap_shift())
+ if (lock->get_type() != CEPH_LOCK_DN)
in = static_cast<CInode *>(lock->get_parent());
int old_state = lock->get_state();
@@ -3453,10 +3454,11 @@ bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
send_lock_message(lock, LOCK_AC_SYNC);
lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
- if (in && in->is_head()) {
+ if (lock->get_cap_shift() && in->is_head()) {
if (in->issued_caps_need_gather(lock)) {
if (need_issue)
*need_issue = true;
@@ -3568,7 +3570,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
assert(lock->get_state() != LOCK_LOCK);
CInode *in = 0;
- if (lock->get_cap_shift())
+ if (lock->get_type() != CEPH_LOCK_DN)
in = static_cast<CInode *>(lock->get_parent());
int old_state = lock->get_state();
@@ -3596,7 +3598,7 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
}
if (lock->is_rdlocked())
gather++;
- if (in && in->is_head()) {
+ if (lock->get_cap_shift() && in->is_head()) {
if (in->issued_caps_need_gather(lock)) {
if (need_issue)
*need_issue = true;
@@ -3629,6 +3631,8 @@ void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
gather++;
send_lock_message(lock, LOCK_AC_LOCK);
lock->init_gather();
+ if (lock->get_state() == LOCK_MIX_LOCK2)
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
}
}
@@ -4034,8 +4038,9 @@ void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
if (lock->get_state() == LOCK_MIX_TSYN &&
in->is_replicated()) {
- lock->init_gather();
send_lock_message(lock, LOCK_AC_LOCK);
+ lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
@@ -4364,6 +4369,8 @@ void Locker::file_excl(ScatterLock *lock, bool *need_issue)
lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
send_lock_message(lock, LOCK_AC_LOCK);
lock->init_gather();
+ if (lock->get_state() == LOCK_MIX_EXCL)
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
if (lock->is_leased()) {
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index 2b7ad7152f3..e592dde96ca 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -2651,6 +2651,15 @@ void MDCache::handle_mds_failure(int who)
if (p->second->slave_to_mds == who) {
if (p->second->slave_did_prepare()) {
dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << dendl;
+ if (!p->second->more()->waiting_on_slave.empty()) {
+ assert(p->second->more()->srcdn_auth_mds == mds->get_nodeid());
+ // will rollback, no need to wait
+ if (p->second->slave_request) {
+ p->second->slave_request->put();
+ p->second->slave_request = 0;
+ }
+ p->second->more()->waiting_on_slave.clear();
+ }
} else {
dout(10) << " slave request " << *p->second << " has no prepare, finishing up" << dendl;
if (p->second->slave_request)
@@ -2660,12 +2669,22 @@ void MDCache::handle_mds_failure(int who)
}
}
- if (p->second->is_slave() &&
- p->second->slave_did_prepare() && p->second->more()->srcdn_auth_mds == who &&
- mds->mdsmap->is_clientreplay_or_active_or_stopping(p->second->slave_to_mds)) {
- // rename srcdn's auth mds failed, resolve even I'm a survivor.
- dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << dendl;
- add_ambiguous_slave_update(p->first, p->second->slave_to_mds);
+ if (p->second->is_slave() && p->second->slave_did_prepare()) {
+ if (p->second->more()->waiting_on_slave.count(who)) {
+ assert(p->second->more()->srcdn_auth_mds == mds->get_nodeid());
+ dout(10) << " slave request " << *p->second << " no longer need rename notity ack from mds."
+ << who << dendl;
+ p->second->more()->waiting_on_slave.erase(who);
+ if (p->second->more()->waiting_on_slave.empty())
+ mds->queue_waiter(new C_MDS_RetryRequest(this, p->second));
+ }
+
+ if (p->second->more()->srcdn_auth_mds == who &&
+ mds->mdsmap->is_clientreplay_or_active_or_stopping(p->second->slave_to_mds)) {
+ // rename srcdn's auth mds failed, resolve even I'm a survivor.
+ dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << dendl;
+ add_ambiguous_slave_update(p->first, p->second->slave_to_mds);
+ }
}
// failed node is slave?
@@ -3745,6 +3764,7 @@ void MDCache::rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
dout(15) << " add_strong_dirfrag " << *dir << dendl;
rejoin->add_strong_dirfrag(dir->dirfrag(), dir->get_replica_nonce(), dir->get_dir_rep());
dir->state_set(CDir::STATE_REJOINING);
+ dir->mseq = 0;
for (CDir::map_t::iterator p = dir->items.begin();
p != dir->items.end();
@@ -3894,11 +3914,15 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
++p) {
CInode *in = get_inode(p->first);
assert(in);
+ if (survivor) {
+ in->start_scatter_gather(&in->filelock, from);
+ in->start_scatter_gather(&in->nestlock, from);
+ } else {
+ rejoin_potential_updated_scatterlocks.insert(in);
+ }
in->decode_lock_state(CEPH_LOCK_IFILE, p->second.file);
in->decode_lock_state(CEPH_LOCK_INEST, p->second.nest);
in->decode_lock_state(CEPH_LOCK_IDFT, p->second.dft);
- if (!survivor)
- rejoin_potential_updated_scatterlocks.insert(in);
}
// recovering peer may send incorrect dirfrags here. we need to
@@ -4495,25 +4519,29 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong)
mdr->locks.insert(lock);
}
}
- // wrlock(s)?
- if (strong->wrlocked_inodes.count(in->vino())) {
- for (map<int, list<MMDSCacheRejoin::slave_reqid> >::iterator q = strong->wrlocked_inodes[in->vino()].begin();
- q != strong->wrlocked_inodes[in->vino()].end();
- ++q) {
- SimpleLock *lock = in->get_lock(q->first);
- for (list<MMDSCacheRejoin::slave_reqid>::iterator r = q->second.begin();
- r != q->second.end();
- ++r) {
- dout(10) << " inode wrlock by " << *r << " on " << *lock << " on " << *in << dendl;
- MDRequest *mdr = request_get(r->reqid); // should have this from auth_pin above.
+ }
+ // wrlock(s)?
+ for (map<vinodeno_t, map<int, list<MMDSCacheRejoin::slave_reqid> > >::iterator p = strong->wrlocked_inodes.begin();
+ p != strong->wrlocked_inodes.end();
+ ++p) {
+ CInode *in = get_inode(p->first);
+ for (map<int, list<MMDSCacheRejoin::slave_reqid> >::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ SimpleLock *lock = in->get_lock(q->first);
+ for (list<MMDSCacheRejoin::slave_reqid>::iterator r = q->second.begin();
+ r != q->second.end();
+ ++r) {
+ dout(10) << " inode wrlock by " << *r << " on " << *lock << " on " << *in << dendl;
+ MDRequest *mdr = request_get(r->reqid); // should have this from auth_pin above.
+ if (in->is_auth())
assert(mdr->is_auth_pinned(in));
- lock->set_state(LOCK_MIX);
- if (lock == &in->filelock)
- in->loner_cap = -1;
- lock->get_wrlock(true);
- mdr->wrlocks.insert(lock);
- mdr->locks.insert(lock);
- }
+ lock->set_state(LOCK_MIX);
+ if (lock == &in->filelock)
+ in->loner_cap = -1;
+ lock->get_wrlock(true);
+ mdr->wrlocks.insert(lock);
+ mdr->locks.insert(lock);
}
}
}
@@ -8015,6 +8043,29 @@ void MDCache::_open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int r
do_open_ino(ino, info, ret);
}
+void MDCache::_open_ino_fetch_dir(inodeno_t ino, MMDSOpenIno *m, CDir *dir)
+{
+ if (dir->state_test(CDir::STATE_REJOINUNDEF) && dir->get_frag() == frag_t()) {
+ rejoin_undef_dirfrags.erase(dir);
+ dir->state_clear(CDir::STATE_REJOINUNDEF);
+
+ CInode *diri = dir->get_inode();
+ diri->force_dirfrags();
+ list<CDir*> ls;
+ diri->get_dirfrags(ls);
+
+ C_GatherBuilder gather(g_ceph_context, _open_ino_get_waiter(ino, m));
+ for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
+ rejoin_undef_dirfrags.insert(*p);
+ (*p)->state_set(CDir::STATE_REJOINUNDEF);
+ (*p)->fetch(gather.new_sub());
+ }
+ assert(gather.has_subs());
+ gather.activate();
+ } else
+ dir->fetch(_open_ino_get_waiter(ino, m));
+}
+
int MDCache::open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
vector<inode_backpointer_t>& ancestors,
bool discover, bool want_xlocked, int *hint)
@@ -8032,8 +8083,14 @@ int MDCache::open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
continue;
}
- if (diri->state_test(CInode::STATE_REJOINUNDEF))
- continue;
+ if (diri->state_test(CInode::STATE_REJOINUNDEF)) {
+ CDir *dir = diri->get_parent_dir();
+ while (dir->state_test(CDir::STATE_REJOINUNDEF) &&
+ dir->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
+ dir = dir->get_inode()->get_parent_dir();
+ _open_ino_fetch_dir(ino, m, dir);
+ return 1;
+ }
if (!diri->is_dir()) {
dout(10) << " " << *diri << " is not dir" << dendl;
@@ -8067,14 +8124,14 @@ int MDCache::open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
if (dnl && dnl->is_primary() &&
dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF)) {
dout(10) << " fetching undef " << *dnl->get_inode() << dendl;
- dir->fetch(_open_ino_get_waiter(ino, m));
+ _open_ino_fetch_dir(ino, m, dir);
return 1;
}
if (!dnl && !dir->is_complete() &&
(!dir->has_bloom() || dir->is_in_bloom(name))) {
dout(10) << " fetching incomplete " << *dir << dendl;
- dir->fetch(_open_ino_get_waiter(ino, m));
+ _open_ino_fetch_dir(ino, m, dir);
return 1;
}
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index 3da8a36f799..36a322c6324 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -790,6 +790,7 @@ protected:
void _open_ino_backtrace_fetched(inodeno_t ino, bufferlist& bl, int err);
void _open_ino_parent_opened(inodeno_t ino, int ret);
void _open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int err);
+ void _open_ino_fetch_dir(inodeno_t ino, MMDSOpenIno *m, CDir *dir);
Context* _open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m);
int open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
vector<inode_backpointer_t>& ancestors,
diff --git a/src/mds/MDS.h b/src/mds/MDS.h
index 9e3e2dae9c3..b99815dab34 100644
--- a/src/mds/MDS.h
+++ b/src/mds/MDS.h
@@ -35,7 +35,7 @@
#include "SessionMap.h"
-#define CEPH_MDS_PROTOCOL 17 /* cluster internal */
+#define CEPH_MDS_PROTOCOL 18 /* cluster internal */
enum {
diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc
index 6ea28c9386a..0647448c40c 100644
--- a/src/mds/Migrator.cc
+++ b/src/mds/Migrator.cc
@@ -2223,7 +2223,7 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from,
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = import_caps[dir].begin();
p != import_caps[dir].end();
++p) {
- finish_import_inode_caps(p->first, from, p->second);
+ finish_import_inode_caps(p->first, true, p->second);
}
// send notify's etc.
@@ -2398,7 +2398,7 @@ void Migrator::decode_import_inode_caps(CInode *in,
}
}
-void Migrator::finish_import_inode_caps(CInode *in, int from,
+void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap,
map<client_t,Capability::Export> &cap_map)
{
for (map<client_t,Capability::Export>::iterator it = cap_map.begin();
@@ -2412,7 +2412,7 @@ void Migrator::finish_import_inode_caps(CInode *in, int from,
if (!cap) {
cap = in->add_client_cap(it->first, session);
}
- cap->merge(it->second);
+ cap->merge(it->second, auth_cap);
mds->mdcache->do_cap_import(session, in, cap);
}
@@ -2688,7 +2688,7 @@ void Migrator::logged_import_caps(CInode *in,
mds->server->finish_force_open_sessions(client_map, sseqmap);
assert(cap_imports.count(in));
- finish_import_inode_caps(in, from, cap_imports[in]);
+ finish_import_inode_caps(in, false, cap_imports[in]);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
mds->send_message_mds(new MExportCapsAck(in->ino()), from);
diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h
index 70b59bc0f97..afe2e6cd65a 100644
--- a/src/mds/Migrator.h
+++ b/src/mds/Migrator.h
@@ -256,7 +256,8 @@ public:
void decode_import_inode_caps(CInode *in,
bufferlist::iterator &blp,
map<CInode*, map<client_t,Capability::Export> >& cap_imports);
- void finish_import_inode_caps(CInode *in, int from, map<client_t,Capability::Export> &cap_map);
+ void finish_import_inode_caps(CInode *in, bool auth_cap,
+ map<client_t,Capability::Export> &cap_map);
int decode_import_dir(bufferlist::iterator& blp,
int oldauth,
CDir *import_root,
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index 253c56d7a37..1d16d04dc3d 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -1280,6 +1280,16 @@ void Server::handle_slave_request(MMDSSlaveRequest *m)
if (m->is_reply())
return handle_slave_request_reply(m);
+ // the purpose of rename notify is enforcing causal message ordering. making sure
+ // bystanders have received all messages from rename srcdn's auth MDS.
+ if (m->get_op() == MMDSSlaveRequest::OP_RENAMENOTIFY) {
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(m->get_reqid(), m->get_attempt(),
+ MMDSSlaveRequest::OP_RENAMENOTIFYACK);
+ mds->send_message(reply, m->get_connection());
+ m->put();
+ return;
+ }
+
CDentry *straydn = NULL;
if (m->stray.length() > 0) {
straydn = mdcache->add_replica_stray(m->stray, from);
@@ -1432,6 +1442,10 @@ void Server::handle_slave_request_reply(MMDSSlaveRequest *m)
handle_slave_rename_prep_ack(mdr, m);
break;
+ case MMDSSlaveRequest::OP_RENAMENOTIFYACK:
+ handle_slave_rename_notify_ack(mdr, m);
+ break;
+
default:
assert(0);
}
@@ -2204,7 +2218,7 @@ CDir* Server::try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr)
}
// not open and inode frozen?
- if (!dir && diri->is_frozen_dir()) {
+ if (!dir && diri->is_frozen()) {
dout(10) << "try_open_auth_dirfrag: dir inode is frozen, waiting " << *diri << dendl;
assert(diri->get_parent_dir());
diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
@@ -3996,7 +4010,8 @@ public:
if (newi->inode.is_dir()) {
CDir *dir = newi->get_dirfrag(frag_t());
assert(dir);
- dir->mark_dirty(1, mdr->ls);
+ dir->fnode.version--;
+ dir->mark_dirty(dir->fnode.version + 1, mdr->ls);
dir->mark_new(mdr->ls);
}
@@ -4155,7 +4170,7 @@ void Server::handle_client_mkdir(MDRequest *mdr)
// ...and that new dir is empty.
CDir *newdir = newi->get_or_open_dirfrag(mds->mdcache, frag_t());
newdir->mark_complete();
- newdir->pre_dirty();
+ newdir->fnode.version = newdir->pre_dirty();
// prepare finisher
mdr->ls = mdlog->get_current_segment();
@@ -6560,6 +6575,9 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
// am i srcdn auth?
if (srcdn->is_auth()) {
+ set<int> srcdnrep;
+ srcdn->list_replicas(srcdnrep);
+
bool reply_witness = false;
if (srcdnl->is_primary() && !srcdnl->get_inode()->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
// freeze?
@@ -6594,12 +6612,19 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
if (mdr->slave_request->witnesses.size() > 1) {
dout(10) << " set srci ambiguous auth; providing srcdn replica list" << dendl;
reply_witness = true;
+ for (set<int>::iterator p = srcdnrep.begin(); p != srcdnrep.end(); ++p) {
+ if (*p == mdr->slave_to_mds ||
+ !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p))
+ continue;
+ MMDSSlaveRequest *notify = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RENAMENOTIFY);
+ mds->send_message_mds(notify, *p);
+ mdr->more()->waiting_on_slave.insert(*p);
+ }
}
}
// is witness list sufficient?
- set<int> srcdnrep;
- srcdn->list_replicas(srcdnrep);
for (set<int>::iterator p = srcdnrep.begin(); p != srcdnrep.end(); ++p) {
if (*p == mdr->slave_to_mds ||
mdr->slave_request->witnesses.count(*p)) continue;
@@ -6619,6 +6644,11 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
return;
}
dout(10) << " witness list sufficient: includes all srcdn replicas" << dendl;
+ if (!mdr->more()->waiting_on_slave.empty()) {
+ dout(10) << " still waiting for rename notify acks from "
+ << mdr->more()->waiting_on_slave << dendl;
+ return;
+ }
} else if (srcdnl->is_primary() && srcdn->authority() != destdn->authority()) {
// set ambiguous auth for srci on witnesses
mdr->set_ambiguous_auth(srcdnl->get_inode());
@@ -7187,6 +7217,24 @@ void Server::handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *ack)
dout(10) << "still waiting on slaves " << mdr->more()->waiting_on_slave << dendl;
}
+void Server::handle_slave_rename_notify_ack(MDRequest *mdr, MMDSSlaveRequest *ack)
+{
+ dout(10) << "handle_slave_rename_notify_ack " << *mdr << " from mds."
+ << ack->get_source() << dendl;
+ assert(mdr->is_slave());
+ int from = ack->get_source().num();
+
+ if (mdr->more()->waiting_on_slave.count(from)) {
+ mdr->more()->waiting_on_slave.erase(from);
+
+ if (mdr->more()->waiting_on_slave.empty()) {
+ if (mdr->slave_request)
+ dispatch_slave_request(mdr);
+ } else
+ dout(10) << " still waiting for rename notify acks from "
+ << mdr->more()->waiting_on_slave << dendl;
+ }
+}
diff --git a/src/mds/Server.h b/src/mds/Server.h
index 35a405b58eb..269b286b431 100644
--- a/src/mds/Server.h
+++ b/src/mds/Server.h
@@ -242,6 +242,7 @@ public:
// slaving
void handle_slave_rename_prep(MDRequest *mdr);
void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
+ void handle_slave_rename_notify_ack(MDRequest *mdr, MMDSSlaveRequest *m);
void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, bool finish_mdr=false);
diff --git a/src/messages/MMDSSlaveRequest.h b/src/messages/MMDSSlaveRequest.h
index 35af81d928c..f8f30b273af 100644
--- a/src/messages/MMDSSlaveRequest.h
+++ b/src/messages/MMDSSlaveRequest.h
@@ -43,6 +43,9 @@ class MMDSSlaveRequest : public Message {
static const int OP_DROPLOCKS = 11;
+ static const int OP_RENAMENOTIFY = 12;
+ static const int OP_RENAMENOTIFYACK = -12;
+
static const int OP_FINISH = 17;
static const int OP_COMMITTED = -18;
@@ -77,6 +80,9 @@ class MMDSSlaveRequest : public Message {
case OP_DROPLOCKS: return "drop_locks";
+ case OP_RENAMENOTIFY: return "reame_notify";
+ case OP_RENAMENOTIFYACK: return "rename_notify_ack";
+
case OP_ABORT: return "abort";
//case OP_COMMIT: return "commit";