diff options
author | Yan, Zheng <zheng.z.yan@intel.com> | 2013-09-19 09:55:31 +0800 |
---|---|---|
committer | Yan, Zheng <zheng.z.yan@intel.com> | 2013-09-24 08:45:55 +0800 |
commit | 623e31c1a230fe40edf7e669a8d73e9a55c07258 (patch) | |
tree | afa9a5779e115262927c34c3954eabed8f379d7e | |
parent | 08c386f54254bb5652d811e4caf339b619a39109 (diff) | |
download | ceph-623e31c1a230fe40edf7e669a8d73e9a55c07258.tar.gz |
mds: start internal MDS request for fragmentating directory
Start internal MDS request for fragmentating directory operation. With
MDS request, we can easily acquire locks required by the fragmentating
directory operation. (The old way to get locks is 'try lock' style,
which is not reliable)
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
-rw-r--r-- | src/common/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/ceph_fs.h | 3 | ||||
-rw-r--r-- | src/mds/MDCache.cc | 163 | ||||
-rw-r--r-- | src/mds/MDCache.h | 12 |
4 files changed, 93 insertions, 86 deletions
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc index cd08083967a..f2e6045a546 100644 --- a/src/common/ceph_strings.cc +++ b/src/common/ceph_strings.cc @@ -181,6 +181,7 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_RMSNAP: return "rmsnap"; case CEPH_MDS_OP_SETFILELOCK: return "setfilelock"; case CEPH_MDS_OP_GETFILELOCK: return "getfilelock"; + case CEPH_MDS_OP_FRAGMENTDIR: return "fragmentdir"; } return "???"; } diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 6c41d14f5da..7d0eb07735c 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -332,6 +332,9 @@ enum { CEPH_MDS_OP_MKSNAP = 0x01400, CEPH_MDS_OP_RMSNAP = 0x01401, CEPH_MDS_OP_LSSNAP = 0x00402, + + // internal op + CEPH_MDS_OP_FRAGMENTDIR= 0x01500, }; extern const char *ceph_mds_op_name(int op); diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 9dc1229fbb9..fabcd0c5bc7 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -8666,9 +8666,9 @@ void MDCache::dispatch_request(MDRequest *mdr) mds->server->dispatch_slave_request(mdr); } else { switch (mdr->internal_op) { - - // ... - + case CEPH_MDS_OP_FRAGMENTDIR: + dispatch_fragment_dir(mdr); + break; default: assert(0); } @@ -10862,17 +10862,6 @@ public: } }; - -bool MDCache::can_fragment_lock(CInode *diri) -{ - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(7) << "can_fragment: can't wrlock dftlock" << dendl; - mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL); - return false; - } - return true; -} - bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs) { if (mds->mdsmap->is_degraded()) { @@ -10920,11 +10909,6 @@ void MDCache::split_dir(CDir *dir, int bits) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - mds->balancer->queue_split(dir); - return; - } C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); @@ -10952,11 +10936,6 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - //dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - //mds->mdbalancer->split_queue.insert(dir->dirfrag()); - return; - } CDir *first = dirs.front(); int bits = first->get_frag().bits() - frag.bits(); @@ -11064,64 +11043,88 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs) class C_MDC_FragmentLoggedAndStored : public Context { MDCache *mdcache; - Mutation *mut; - list<CDir*> resultfrags; - frag_t basefrag; - int bits; + MDRequest *mdr; public: - C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) : - mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} + C_MDC_FragmentLoggedAndStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} virtual void finish(int r) { - mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits); + mdcache->fragment_logged_and_stored(mdr); } }; void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) { - CInode *diri = dirs.front()->get_inode(); + dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits + << " on " << dirs.front()->get_inode() << dendl; - if (bits > 0) { + if (bits > 0) assert(dirs.size() == 1); - } else { - assert(bits < 0); - } + else if (bits < 0) + assert(dirs.size() > 1); + else + assert(0); - dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits - << " on " << *diri << dendl; + MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR); + fragment_info_t &info = fragment_requests[mdr->reqid]; + info.basefrag = basefrag; + info.bits = bits; + info.dirs = dirs; - // wrlock dirfragtreelock - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl; - fragment_unmark_unfreeze_dirs(dirs); - return; + dispatch_fragment_dir(mdr); +} + +void MDCache::dispatch_fragment_dir(MDRequest *mdr) +{ + assert(fragment_requests.count(mdr->reqid)); + fragment_info_t &info = fragment_requests[mdr->reqid]; + CInode *diri = info.dirs.front()->get_inode(); + + dout(10) << "dispatch_fragment_dir " << info.resultfrags << " " + << info.basefrag << " bits " << info.bits << " on " << *diri << dendl; + + // avoid freeze dir deadlock + if (!mdr->is_auth_pinned(diri)) { + if (!diri->can_auth_pin()) { + dout(10) << " can't auth_pin " << *diri << ", requeuing dir " + << info.dirs.front()->dirfrag() << dendl; + if (info.bits > 0) + mds->balancer->queue_split(info.dirs.front()); + else + mds->balancer->queue_merge(info.dirs.front()); + fragment_unmark_unfreeze_dirs(info.dirs); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); + return; + } + mdr->auth_pin(diri); } - diri->dirfragtreelock.get_wrlock(true); + set<SimpleLock*> rdlocks, wrlocks, xlocks; + wrlocks.insert(&diri->dirfragtreelock); // prevent a racing gather on any other scatterlocks too - diri->nestlock.get_wrlock(true); - diri->filelock.get_wrlock(true); + wrlocks.insert(&diri->nestlock); + wrlocks.insert(&diri->filelock); + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; // refragment - list<CDir*> resultfrags; list<Context*> waiters; - adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false); + adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits, + info.resultfrags, waiters, false); if (g_conf->mds_debug_frag) diri->verify_dirfrags(); mds->queue_waiters(waiters); - // journal - Mutation *mut = new Mutation; - - mut->ls = mds->mdlog->get_current_segment(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits); + mdr->ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), + info.basefrag, info.bits); mds->mdlog->start_entry(le); - le->metablob.add_dir_context(*resultfrags.begin()); + le->metablob.add_dir_context(*info.resultfrags.begin()); // dft lock mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock); - mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); - mut->add_updated_lock(&diri->dirfragtreelock); + mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); + mdr->add_updated_lock(&diri->dirfragtreelock); /* // filelock @@ -11136,12 +11139,10 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) */ // freeze, journal, and store resulting frags - C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentLoggedAndStored(this, mut, - resultfrags, basefrag, bits)); + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentLoggedAndStored(this, mdr)); - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; @@ -11157,26 +11158,28 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) gather.activate(); } -void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits) +void MDCache::fragment_logged_and_stored(MDRequest *mdr) { - CInode *diri = resultfrags.front()->get_inode(); + assert(fragment_requests.count(mdr->reqid)); + fragment_info_t &info = fragment_requests[mdr->reqid]; + CInode *diri = info.resultfrags.front()->get_inode(); - dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits - << " on " << *diri << dendl; + dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; // journal commit - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - mds->mdlog->submit_entry(le); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), + info.basefrag, info.bits); + mds->mdlog->start_submit_entry(le); // tell peers - CDir *first = *resultfrags.begin(); + CDir *first = *info.resultfrags.begin(); for (map<int,int>::iterator p = first->replica_map.begin(); p != first->replica_map.end(); ++p) { if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN) continue; - MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits); + MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits); /* // freshly replicate new dirs to peers @@ -11187,20 +11190,12 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags mds->send_message_mds(notify, p->first); } - mut->apply(); // mark scatterlock - mds->locker->drop_locks(mut); - mut->cleanup(); - delete mut; - - // drop dft wrlock - bool need_issue = false; - mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue); + mdr->apply(); // mark scatterlock + mds->locker->drop_locks(mdr); // unfreeze resulting frags - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; @@ -11220,8 +11215,8 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags dir->unfreeze_dir(); } - if (need_issue) - mds->locker->issue_caps(diri); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index d8f2a9486fb..cb219360ccf 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -946,6 +946,14 @@ public: set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations private: + struct fragment_info_t { + frag_t basefrag; + int bits; + list<CDir*> dirs; + list<CDir*> resultfrags; + }; + map<metareqid_t, fragment_info_t> fragment_requests; + void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits, list<CDir*>& frags, list<Context*>& waiters, bool replay); void adjust_dir_fragments(CInode *diri, @@ -960,7 +968,6 @@ private: friend class EFragment; - bool can_fragment_lock(CInode *diri); bool can_fragment(CInode *diri, list<CDir*>& dirs); public: @@ -972,7 +979,8 @@ private: void fragment_mark_and_complete(list<CDir*>& dirs); void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits); void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs); - void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits); + void dispatch_fragment_dir(MDRequest *mdr); + void fragment_logged_and_stored(MDRequest *mdr); public: void rollback_uncommitted_fragments(); private: |