diff options
44 files changed, 2269 insertions, 984 deletions
diff --git a/doc/dev/osd_internals/osd_throttles.rst b/doc/dev/osd_internals/osd_throttles.rst new file mode 100644 index 00000000000..4fa3044f986 --- /dev/null +++ b/doc/dev/osd_internals/osd_throttles.rst @@ -0,0 +1,21 @@ + Messenger throttle (number and size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + FileStore op_queue throttle (number and size) + |--------------------------------------------------------| + WBThrottle + |---------------------------------------------------------------------------------------------------------| + Journal (size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------| + |----------------------------------------------------------------------------------------------------> flushed ----------------> synced + | +Op: Read Header --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> op_applied -------------------------------------------------------------> Complete + | | +SubOp: --Messenger--> ReadHeader --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> sub_op_applied - + | + |-----------------------------> flushed ----------------> synced + |------------------------------------------------------------------------------------------| + Journal (size) + |---------------------------------| + WBThrottle + |-----------------------------------------------------| + FileStore op_queue throttle (number and size) diff --git a/doc/dev/osd_internals/wbthrottle.rst b/doc/dev/osd_internals/wbthrottle.rst new file mode 100644 index 00000000000..14ba0140d4d --- /dev/null +++ b/doc/dev/osd_internals/wbthrottle.rst @@ -0,0 +1,28 @@ +================== +Writeback Throttle +================== + +Previously, the filestore had a problem when handling large numbers of +small ios. We throttle dirty data implicitely via the journal, but +a large number of inodes can be dirtied without filling the journal +resulting in a very long sync time when the sync finally does happen. +The flusher was not an adequate solution to this problem since it +forced writeback of small writes too eagerly killing performance. + +WBThrottle tracks unflushed io per hobject_t and ::fsyncs in lru +order once the start_flusher threshhold is exceeded for any of +dirty bytes, dirty ios, or dirty inodes. While any of these exceed +the hard_limit, we block on throttle() in _do_op. + +See src/os/WBThrottle.h, src/osd/WBThrottle.cc + +To track the open FDs through the writeback process, there is now an +fdcache to cache open fds. lfn_open now returns a cached FDRef which +implicitely closes the fd once all references have expired. + +Filestore syncs have a sideeffect of flushing all outstanding objects +in the wbthrottle. + +lfn_unlink clears the cached FDRef and wbthrottle entries for the +unlinked object when then last link is removed and asserts that all +outstanding FDRefs for that object are dead. diff --git a/src/Makefile.am b/src/Makefile.am index 7a08e1f5a2a..5e176874b11 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1490,7 +1490,8 @@ libos_a_SOURCES = \ os/IndexManager.cc \ os/FlatIndex.cc \ os/DBObjectMap.cc \ - os/LevelDBStore.cc + os/LevelDBStore.cc \ + os/WBThrottle.cc libos_a_CXXFLAGS= ${AM_CXXFLAGS} noinst_LIBRARIES += libos.a @@ -1873,6 +1874,8 @@ noinst_HEADERS = \ messages/MMDSFindInoReply.h\ messages/MMDSFragmentNotify.h\ messages/MMDSMap.h\ + messages/MMDSOpenIno.h \ + messages/MMDSOpenInoReply.h \ messages/MMDSResolve.h\ messages/MMDSResolveAck.h\ messages/MMDSSlaveRequest.h\ @@ -1973,6 +1976,8 @@ noinst_HEADERS = \ os/FileStore.h\ os/FlatIndex.h\ os/HashIndex.h\ + os/FDCache.h\ + os/WBThrottle.h\ os/IndexManager.h\ os/Journal.h\ os/JournalingObjectStore.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 01e8b5a5a15..5e87b2f1782 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -338,6 +338,7 @@ OPTION(mds_kill_openc_at, OPT_INT, 0) OPTION(mds_kill_journal_at, OPT_INT, 0) OPTION(mds_kill_journal_expire_at, OPT_INT, 0) OPTION(mds_kill_journal_replay_at, OPT_INT, 0) +OPTION(mds_open_remote_link_mode, OPT_INT, 0) OPTION(mds_inject_traceless_reply_probability, OPT_DOUBLE, 0) /* percentage of MDS modify replies to skip sending the client a trace on [0-1]*/ @@ -478,6 +479,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) OPTION(filestore, OPT_BOOL, false) +/// filestore wb throttle limits +OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100) +OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100) + // Tests index failure paths OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0) @@ -498,10 +513,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true) OPTION(filestore_btrfs_clone_range, OPT_BOOL, true) OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false) OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap -OPTION(filestore_flusher, OPT_BOOL, true) -OPTION(filestore_flusher_max_fds, OPT_INT, 512) -OPTION(filestore_flush_min, OPT_INT, 65536) -OPTION(filestore_sync_flush, OPT_BOOL, false) OPTION(filestore_journal_parallel, OPT_BOOL, false) OPTION(filestore_journal_writeahead, OPT_BOOL, false) OPTION(filestore_journal_trailing, OPT_BOOL, false) @@ -518,6 +529,7 @@ OPTION(filestore_merge_threshold, OPT_INT, 10) OPTION(filestore_split_multiple, OPT_INT, 2) OPTION(filestore_update_to, OPT_INT, 1000) OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor +OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp index 69a4c06dfbf..178d1001be3 100644 --- a/src/common/shared_cache.hpp +++ b/src/common/shared_cache.hpp @@ -85,12 +85,23 @@ public: assert(weak_refs.empty()); } + void clear(K key) { + VPtr val; // release any ref we have after we drop the lock + { + Mutex::Locker l(lock); + if (weak_refs.count(key)) { + val = weak_refs[key].lock(); + } + lru_remove(key); + } + } + void set_size(size_t new_size) { list<VPtr> to_release; { Mutex::Locker l(lock); max_size = new_size; - trim_cache(to_release); + trim_cache(&to_release); } } diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index 4ef6e8f19fa..211cec08b4f 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -1055,7 +1055,7 @@ void CDir::assimilate_dirty_rstat_inodes_finish(Mutation *mut, EMetaBlob *blob) mut->add_projected_inode(in); in->clear_dirty_rstat(); - blob->add_primary_dentry(dn, true, in); + blob->add_primary_dentry(dn, in, true); } if (!dirty_rstat_inodes.empty()) @@ -1651,7 +1651,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn) dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl; dn->mark_clean(); - if (dn->get_linkage()->get_inode()) { + if (dn->get_linkage()->is_primary()) { assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version); dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl; dn->get_linkage()->get_inode()->mark_clean(); @@ -1728,11 +1728,11 @@ public: class C_Dir_Committed : public Context { CDir *dir; - version_t version, last_renamed_version; + version_t version; public: - C_Dir_Committed(CDir *d, version_t v, version_t lrv) : dir(d), version(v), last_renamed_version(lrv) { } + C_Dir_Committed(CDir *d, version_t v) : dir(d), version(v) { } void finish(int r) { - dir->_committed(version, last_renamed_version); + dir->_committed(version); } }; @@ -1993,12 +1993,9 @@ void CDir::_commit(version_t want) if (committed_dn == items.end()) cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL, - new C_Dir_Committed(this, get_version(), - inode->inode.last_renamed_version)); + new C_Dir_Committed(this, get_version())); else { // send in a different Context - C_GatherBuilder gather(g_ceph_context, - new C_Dir_Committed(this, get_version(), - inode->inode.last_renamed_version)); + C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version())); while (committed_dn != items.end()) { ObjectOperation n = ObjectOperation(); committed_dn = _commit_partial(n, snaps, max_write_size, committed_dn); @@ -2027,9 +2024,9 @@ void CDir::_commit(version_t want) * * @param v version i just committed */ -void CDir::_committed(version_t v, version_t lrv) +void CDir::_committed(version_t v) { - dout(10) << "_committed v " << v << " (last renamed " << lrv << ") on " << *this << dendl; + dout(10) << "_committed v " << v << " on " << *this << dendl; assert(is_auth()); bool stray = inode->is_stray(); @@ -2142,6 +2139,7 @@ void CDir::encode_export(bufferlist& bl) void CDir::finish_export(utime_t now) { + state &= MASK_STATE_EXPORT_KEPT; pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree); pop_me.zero(now); pop_auth_subtree.zero(now); diff --git a/src/mds/CDir.h b/src/mds/CDir.h index 7e1db73af06..87c79c2af1b 100644 --- a/src/mds/CDir.h +++ b/src/mds/CDir.h @@ -494,7 +494,7 @@ private: unsigned max_write_size=-1, map_t::iterator last_committed_dn=map_t::iterator()); void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps); - void _committed(version_t v, version_t last_renamed_version); + void _committed(version_t v); void wait_for_commit(Context *c, version_t v=0); // -- dirtyness -- diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 781ed727f5f..0e1429377f8 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -127,6 +127,7 @@ ostream& operator<<(ostream& out, CInode& in) if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH"; if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover"; if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering"; + if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent"; if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance; if (in.is_frozen_inode()) out << " FROZEN"; if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN"; @@ -328,9 +329,14 @@ void CInode::pop_and_dirty_projected_inode(LogSegment *ls) assert(!projected_nodes.empty()); dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode << " v" << projected_nodes.front()->inode->version << dendl; + int64_t old_pool = inode.layout.fl_pg_pool; + mark_dirty(projected_nodes.front()->inode->version, ls); inode = *projected_nodes.front()->inode; + if (inode.is_backtrace_updated()) + _mark_dirty_parent(ls, old_pool != inode.layout.fl_pg_pool); + map<string,bufferptr> *px = projected_nodes.front()->xattrs; if (px) { xattrs = *px; @@ -967,67 +973,134 @@ void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin) delete fin; } -class C_CInode_FetchedBacktrace : public Context { - CInode *in; - inode_backtrace_t *backtrace; - Context *fin; -public: - bufferlist bl; - C_CInode_FetchedBacktrace(CInode *i, inode_backtrace_t *bt, Context *f) : - in(i), backtrace(bt), fin(f) {} - - void finish(int r) { - if (r == 0) { - in->_fetched_backtrace(&bl, backtrace, fin); - } else { - fin->finish(r); - } - } -}; - -void CInode::fetch_backtrace(inode_backtrace_t *bt, Context *fin) +void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt) { - object_t oid = get_object_name(ino(), frag_t(), ""); - object_locator_t oloc(inode.layout.fl_pg_pool); - - SnapContext snapc; - C_CInode_FetchedBacktrace *c = new C_CInode_FetchedBacktrace(this, bt, fin); - mdcache->mds->objecter->getxattr(oid, oloc, "parent", CEPH_NOSNAP, &c->bl, 0, c); -} - -void CInode::_fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin) -{ - ::decode(*bt, *bl); - if (fin) { - fin->finish(0); - } -} - -void CInode::build_backtrace(int64_t location, inode_backtrace_t* bt) -{ - bt->ino = inode.ino; - bt->ancestors.clear(); + bt.ino = inode.ino; + bt.ancestors.clear(); + bt.pool = pool; CInode *in = this; CDentry *pdn = get_parent_dn(); while (pdn) { CInode *diri = pdn->get_dir()->get_inode(); - bt->ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version)); + bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version)); in = diri; pdn = in->get_parent_dn(); } vector<int64_t>::iterator i = inode.old_pools.begin(); while(i != inode.old_pools.end()) { // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0) - if (*i == location) { + if (*i == pool) { ++i; continue; } - bt->old_pools.insert(*i); + bt.old_pools.insert(*i); ++i; } } +struct C_Inode_StoredBacktrace : public Context { + CInode *in; + version_t version; + Context *fin; + C_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : in(i), version(v), fin(f) {} + void finish(int r) { + in->_stored_backtrace(version, fin); + } +}; + +void CInode::store_backtrace(Context *fin) +{ + dout(10) << "store_backtrace on " << *this << dendl; + assert(is_dirty_parent()); + + auth_pin(this); + + int64_t pool; + if (is_dir()) + pool = mdcache->mds->mdsmap->get_metadata_pool(); + else + pool = inode.layout.fl_pg_pool; + + inode_backtrace_t bt; + build_backtrace(pool, bt); + bufferlist bl; + ::encode(bt, bl); + + ObjectOperation op; + op.create(false); + op.setxattr("parent", bl); + + SnapContext snapc; + object_t oid = get_object_name(ino(), frag_t(), ""); + object_locator_t oloc(pool); + Context *fin2 = new C_Inode_StoredBacktrace(this, inode.backtrace_version, fin); + + if (!state_test(STATE_DIRTYPOOL)) { + mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context), + 0, NULL, fin2); + return; + } + + C_GatherBuilder gather(g_ceph_context, fin2); + mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context), + 0, NULL, gather.new_sub()); + + set<int64_t> old_pools; + for (vector<int64_t>::iterator p = inode.old_pools.begin(); + p != inode.old_pools.end(); + ++p) { + if (*p == pool || old_pools.count(*p)) + continue; + + ObjectOperation op; + op.create(false); + op.setxattr("parent", bl); + + object_locator_t oloc(*p); + mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context), + 0, NULL, gather.new_sub()); + old_pools.insert(*p); + } + gather.activate(); +} + +void CInode::_stored_backtrace(version_t v, Context *fin) +{ + dout(10) << "_stored_backtrace" << dendl; + + if (v == inode.backtrace_version) + clear_dirty_parent(); + auth_unpin(this); + if (fin) + fin->complete(0); +} + +void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool) +{ + if (!state_test(STATE_DIRTYPARENT)) { + dout(10) << "mark_dirty_parent" << dendl; + state_set(STATE_DIRTYPARENT); + get(PIN_DIRTYPARENT); + assert(ls); + } + if (dirty_pool) + state_set(STATE_DIRTYPOOL); + if (ls) + ls->dirty_parent_inodes.push_back(&item_dirty_parent); +} + +void CInode::clear_dirty_parent() +{ + if (state_test(STATE_DIRTYPARENT)) { + dout(10) << "clear_dirty_parent" << dendl; + state_clear(STATE_DIRTYPARENT); + state_clear(STATE_DIRTYPOOL); + put(PIN_DIRTYPARENT); + item_dirty_parent.remove_myself(); + } +} + // ------------------ // parent dir @@ -2989,11 +3062,10 @@ void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<Context*>& waite void CInode::encode_export(bufferlist& bl) { - ENCODE_START(3, 3, bl) + ENCODE_START(4, 4, bl) _encode_base(bl); - bool dirty = is_dirty(); - ::encode(dirty, bl); + ::encode(state, bl); ::encode(pop, bl); @@ -3024,6 +3096,8 @@ void CInode::encode_export(bufferlist& bl) void CInode::finish_export(utime_t now) { + state &= MASK_STATE_EXPORT_KEPT; + pop.zero(now); // just in case! @@ -3037,14 +3111,21 @@ void CInode::finish_export(utime_t now) void CInode::decode_import(bufferlist::iterator& p, LogSegment *ls) { - DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p); + DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, p); _decode_base(p); - bool dirty; - ::decode(dirty, p); - if (dirty) + unsigned s; + ::decode(s, p); + state |= (s & MASK_STATE_EXPORTED); + if (is_dirty()) { + get(PIN_DIRTY); _mark_dirty(ls); + } + if (is_dirty_parent()) { + get(PIN_DIRTYPARENT); + _mark_dirty_parent(ls); + } ::decode(pop, ceph_clock_now(g_ceph_context), p); diff --git a/src/mds/CInode.h b/src/mds/CInode.h index 7c63593c73c..779bb63f485 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -151,9 +151,16 @@ public: static const int STATE_NEEDSRECOVER = (1<<11); static const int STATE_RECOVERING = (1<<12); static const int STATE_PURGING = (1<<13); + static const int STATE_DIRTYPARENT = (1<<14); static const int STATE_DIRTYRSTAT = (1<<15); static const int STATE_STRAYPINNED = (1<<16); static const int STATE_FROZENAUTHPIN = (1<<17); + static const int STATE_DIRTYPOOL = (1<<18); + + static const int MASK_STATE_EXPORTED = + (STATE_DIRTY|STATE_NEEDSRECOVER|STATE_DIRTYPARENT|STATE_DIRTYPOOL); + static const int MASK_STATE_EXPORT_KEPT = + (STATE_FROZEN|STATE_AMBIGUOUSAUTH|STATE_EXPORTINGCAPS); // -- waiters -- static const uint64_t WAIT_DIR = (1<<0); @@ -364,7 +371,7 @@ public: protected: // file capabilities map<client_t, Capability*> client_caps; // client -> caps - map<int, int> mds_caps_wanted; // [auth] mds -> caps wanted + map<int32_t, int32_t> mds_caps_wanted; // [auth] mds -> caps wanted int replica_caps_wanted; // [replica] what i've requested from auth map<int, set<client_t> > client_snap_caps; // [auth] [snap] dirty metadata we still need from the head @@ -384,6 +391,7 @@ public: elist<CInode*>::item item_dirty; elist<CInode*>::item item_caps; elist<CInode*>::item item_open_file; + elist<CInode*>::item item_dirty_parent; elist<CInode*>::item item_dirty_dirfrag_dir; elist<CInode*>::item item_dirty_dirfrag_nest; elist<CInode*>::item item_dirty_dirfrag_dirfragtree; @@ -424,7 +432,7 @@ private: parent(0), inode_auth(CDIR_AUTH_DEFAULT), replica_caps_wanted(0), - item_dirty(this), item_caps(this), item_open_file(this), + item_dirty(this), item_caps(this), item_open_file(this), item_dirty_parent(this), item_dirty_dirfrag_dir(this), item_dirty_dirfrag_nest(this), item_dirty_dirfrag_dirfragtree(this), @@ -527,10 +535,13 @@ private: void fetch(Context *fin); void _fetched(bufferlist& bl, bufferlist& bl2, Context *fin); - void fetch_backtrace(inode_backtrace_t *bt, Context *fin); - void _fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin); - - void build_backtrace(int64_t location, inode_backtrace_t* bt); + void build_backtrace(int64_t pool, inode_backtrace_t& bt); + void store_backtrace(Context *fin); + void _stored_backtrace(version_t v, Context *fin); + void _mark_dirty_parent(LogSegment *ls, bool dirty_pool=false); + void clear_dirty_parent(); + bool is_dirty_parent() { return state_test(STATE_DIRTYPARENT); } + bool is_dirty_pool() { return state_test(STATE_DIRTYPOOL); } void encode_store(bufferlist& bl); void decode_store(bufferlist::iterator& bl); @@ -704,7 +715,7 @@ public: bool is_any_caps() { return !client_caps.empty(); } bool is_any_nonstale_caps() { return count_nonstale_caps(); } - map<int,int>& get_mds_caps_wanted() { return mds_caps_wanted; } + map<int32_t,int32_t>& get_mds_caps_wanted() { return mds_caps_wanted; } map<client_t,Capability*>& get_client_caps() { return client_caps; } Capability *get_client_cap(client_t client) { diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 4a23e0bc47f..57154b3d9f6 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -327,6 +327,14 @@ bool Locker::acquire_locks(MDRequest *mdr, p != mustpin_remote.end(); ++p) { dout(10) << "requesting remote auth_pins from mds." << p->first << dendl; + + // wait for active auth + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) { + dout(10) << " mds." << p->first << " is not active" << dendl; + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN); @@ -1332,10 +1340,11 @@ void Locker::remote_wrlock_start(SimpleLock *lock, int target, MDRequest *mut) { dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl; - // wait for single auth - if (lock->get_parent()->is_ambiguous_auth()) { - lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, - new C_MDS_RetryRequest(mdcache, mut)); + // wait for active target + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) { + dout(7) << " mds." << target << " is not active" << dendl; + if (mut->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut)); return; } @@ -1422,8 +1431,16 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequest *mut) return false; } - // send lock request + // wait for active auth int auth = lock->get_parent()->authority().first; + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { + dout(7) << " mds." << auth << " is not active" << dendl; + if (mut->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut)); + return false; + } + + // send lock request mut->more()->slaves.insert(auth); mut->start_locking(lock, auth); MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt, @@ -1915,8 +1932,7 @@ void Locker::request_inode_file_caps(CInode *in) } int auth = in->authority().first; - if (in->is_rejoining() && - mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { + if (mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in)); return; } @@ -1937,7 +1953,7 @@ void Locker::request_inode_file_caps(CInode *in) void Locker::handle_inode_file_caps(MInodeFileCaps *m) { // nobody should be talking to us during recovery. - assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); + assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); // ok CInode *in = mdcache->get_inode(m->get_ino()); @@ -2112,7 +2128,7 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY); // no cow, here! CDentry *parent = in->get_projected_parent_dn(); - metablob->add_primary_dentry(parent, true, in); + metablob->add_primary_dentry(parent, in, true); } else { metablob->add_dir_context(in->get_projected_parent_dn()->get_dir()); mdcache->journal_dirty_inode(mut, metablob, in); @@ -2183,8 +2199,11 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) } CInode *cur = cap->get_inode(); - if (!cur->is_auth()) + if (!cur->is_auth()) { + request_inode_file_caps(cur); return; + } + if (cap->wanted() == 0) { if (cur->item_open_file.is_on_list() && !cur->is_any_caps_wanted()) { @@ -2203,7 +2222,6 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) mds->mdlog->submit_entry(le); } } - } @@ -2903,41 +2921,65 @@ void Locker::handle_client_cap_release(MClientCapRelease *m) return; } - for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) { - inodeno_t ino((uint64_t)p->ino); - CInode *in = mdcache->get_inode(ino); - if (!in) { - dout(10) << " missing ino " << ino << dendl; - continue; - } - Capability *cap = in->get_client_cap(client); - if (!cap) { - dout(10) << " no cap on " << *in << dendl; - continue; - } - if (cap->get_cap_id() != p->cap_id) { - dout(7) << " ignoring client capid " << p->cap_id << " != my " << cap->get_cap_id() << " on " << *in << dendl; - continue; - } - if (ceph_seq_cmp(p->migrate_seq, cap->get_mseq()) < 0) { - dout(7) << " mseq " << p->migrate_seq << " < " << cap->get_mseq() - << " on " << *in << dendl; - continue; - } - if (p->seq != cap->get_last_issue()) { - dout(10) << " issue_seq " << p->seq << " != " << cap->get_last_issue() << " on " << *in << dendl; - - // clean out any old revoke history - cap->clean_revoke_from(p->seq); - eval_cap_gather(in); - continue; - } + for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) + _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq); + + m->put(); +} + +class C_Locker_RetryCapRelease : public Context { + Locker *locker; + client_t client; + inodeno_t ino; + uint64_t cap_id; + ceph_seq_t migrate_seq; + ceph_seq_t issue_seq; +public: + C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id, + ceph_seq_t mseq, ceph_seq_t seq) : + locker(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {} + void finish(int r) { + locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq); + } +}; - dout(7) << "removing cap on " << *in << dendl; - remove_client_cap(in, client); +void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, + ceph_seq_t mseq, ceph_seq_t seq) +{ + CInode *in = mdcache->get_inode(ino); + if (!in) { + dout(7) << "_do_cap_release missing ino " << ino << dendl; + return; + } + Capability *cap = in->get_client_cap(client); + if (!cap) { + dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl; + return; } - m->put(); + dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl; + if (cap->get_cap_id() != cap_id) { + dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl; + return; + } + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl; + return; + } + if (should_defer_client_cap_frozen(in)) { + dout(7) << " freezing|frozen, deferring" << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, + new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq)); + return; + } + if (seq != cap->get_last_issue()) { + dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl; + // clean out any old revoke history + cap->clean_revoke_from(seq); + eval_cap_gather(in); + return; + } + remove_client_cap(in, client); } /* This function DOES put the passed message before returning */ @@ -4108,6 +4150,10 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue) if (lock->get_parent()->is_freezing_or_frozen()) return; + // wait for scan + if (lock->get_state() == LOCK_SCAN) + return; + // excl -> *? if (lock->get_state() == LOCK_EXCL) { dout(20) << " is excl" << dendl; diff --git a/src/mds/Locker.h b/src/mds/Locker.h index f4d9861a384..b97307d6cb2 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -225,6 +225,7 @@ public: bool _do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, MClientCaps *m, MClientCaps *ack=0); void handle_client_cap_release(class MClientCapRelease *m); + void _do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, ceph_seq_t mseq, ceph_seq_t seq); // local @@ -284,6 +285,7 @@ private: friend class C_MDL_CheckMaxSize; friend class C_MDL_RequestInodeFileCaps; friend class C_Locker_FileUpdate_finish; + friend class C_Locker_RetryCapRelease; // -- client leases -- diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index 8cf58a18306..44c79425738 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -33,19 +33,6 @@ class CDentry; class MDS; class MDSlaveUpdate; -// The backtrace info struct here is used to maintain the backtrace in -// a queue that we will eventually want to write out (on journal segment -// expiry). -class BacktraceInfo { -public: - int64_t location; - int64_t pool; - struct inode_backtrace_t bt; - elist<BacktraceInfo*>::item item_logseg; - BacktraceInfo(int64_t l, CInode *i, LogSegment *ls, int64_t p = -1); - ~BacktraceInfo(); -}; - class LogSegment { public: uint64_t offset, end; @@ -58,12 +45,11 @@ class LogSegment { elist<CDentry*> dirty_dentries; elist<CInode*> open_files; + elist<CInode*> dirty_parent_inodes; elist<CInode*> dirty_dirfrag_dir; elist<CInode*> dirty_dirfrag_nest; elist<CInode*> dirty_dirfrag_dirfragtree; - elist<BacktraceInfo*> update_backtraces; - elist<MDSlaveUpdate*> slave_updates; set<CInode*> truncating_inodes; @@ -90,20 +76,13 @@ class LogSegment { dirty_inodes(member_offset(CInode, item_dirty)), dirty_dentries(member_offset(CDentry, item_dirty)), open_files(member_offset(CInode, item_open_file)), + dirty_parent_inodes(member_offset(CInode, item_dirty_parent)), dirty_dirfrag_dir(member_offset(CInode, item_dirty_dirfrag_dir)), dirty_dirfrag_nest(member_offset(CInode, item_dirty_dirfrag_nest)), dirty_dirfrag_dirfragtree(member_offset(CInode, item_dirty_dirfrag_dirfragtree)), - update_backtraces(member_offset(BacktraceInfo, item_logseg)), slave_updates(0), // passed to begin() manually inotablev(0), sessionmapv(0) { } - - // backtrace handling - void queue_backtrace_update(CInode *in, int64_t location, int64_t pool = -1); - void remove_pending_backtraces(inodeno_t ino, int64_t pool); - void store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin); - void _stored_backtrace(BacktraceInfo *info, Context *fin); - unsigned encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info); }; #endif diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index f0a2cc2a7f0..0c279b66a91 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -79,6 +79,9 @@ #include "messages/MMDSFindIno.h" #include "messages/MMDSFindInoReply.h" +#include "messages/MMDSOpenIno.h" +#include "messages/MMDSOpenInoReply.h" + #include "messages/MClientRequest.h" #include "messages/MClientCaps.h" #include "messages/MClientSnap.h" @@ -235,6 +238,8 @@ void MDCache::remove_inode(CInode *o) if (o->is_dirty()) o->mark_clean(); + if (o->is_dirty_parent()) + o->clear_dirty_parent(); o->filelock.remove_dirty(); o->nestlock.remove_dirty(); @@ -461,7 +466,7 @@ void MDCache::_create_system_file(CDir *dir, const char *name, CInode *in, Conte if (!in->is_mdsdir()) { predirty_journal_parents(mut, &le->metablob, in, dir, PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); - le->metablob.add_primary_dentry(dn, true, in); + le->metablob.add_primary_dentry(dn, in, true); } else { predirty_journal_parents(mut, &le->metablob, in, dir, PREDIRTY_DIR, 1); journal_dirty_inode(mut, &le->metablob, in); @@ -1552,7 +1557,7 @@ void MDCache::journal_cow_dentry(Mutation *mut, EMetaBlob *metablob, CDentry *dn CDentry *olddn = dn->dir->add_primary_dentry(dn->name, oldin, oldfirst, follows); oldin->inode.version = olddn->pre_dirty(); dout(10) << " olddn " << *olddn << dendl; - metablob->add_primary_dentry(olddn, true, 0); + metablob->add_primary_dentry(olddn, 0, true); mut->add_cow_dentry(olddn); } else { assert(dnl->is_remote()); @@ -1585,7 +1590,13 @@ void MDCache::journal_dirty_inode(Mutation *mut, EMetaBlob *metablob, CInode *in CDentry *dn = in->get_projected_parent_dn(); if (!dn->get_projected_linkage()->is_null()) // no need to cow a null dentry journal_cow_dentry(mut, metablob, dn, follows); - metablob->add_primary_dentry(dn, true, in); + if (in->get_projected_inode()->is_backtrace_updated()) { + bool dirty_pool = in->get_projected_inode()->layout.fl_pg_pool != + in->get_previous_projected_inode()->layout.fl_pg_pool; + metablob->add_primary_dentry(dn, in, true, true, dirty_pool); + } else { + metablob->add_primary_dentry(dn, in, true); + } } } @@ -2144,32 +2155,27 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob, struct C_MDC_CommittedMaster : public Context { MDCache *cache; metareqid_t reqid; - LogSegment *ls; - list<Context*> waiters; - C_MDC_CommittedMaster(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) : - cache(s), reqid(r), ls(l) { - waiters.swap(w); - } + C_MDC_CommittedMaster(MDCache *s, metareqid_t r) : cache(s), reqid(r) {} void finish(int r) { - cache->_logged_master_commit(reqid, ls, waiters); + cache->_logged_master_commit(reqid); } }; void MDCache::log_master_commit(metareqid_t reqid) { dout(10) << "log_master_commit " << reqid << dendl; + uncommitted_masters[reqid].committing = true; mds->mdlog->start_submit_entry(new ECommitted(reqid), - new C_MDC_CommittedMaster(this, reqid, - uncommitted_masters[reqid].ls, - uncommitted_masters[reqid].waiters)); - mds->mdcache->uncommitted_masters.erase(reqid); + new C_MDC_CommittedMaster(this, reqid)); } -void MDCache::_logged_master_commit(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters) +void MDCache::_logged_master_commit(metareqid_t reqid) { dout(10) << "_logged_master_commit " << reqid << dendl; - ls->uncommitted_masters.erase(reqid); - mds->queue_waiters(waiters); + assert(uncommitted_masters.count(reqid)); + uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid); + mds->queue_waiters(uncommitted_masters[reqid].waiters); + uncommitted_masters.erase(reqid); } // while active... @@ -2179,7 +2185,7 @@ void MDCache::committed_master_slave(metareqid_t r, int from) dout(10) << "committed_master_slave mds." << from << " on " << r << dendl; assert(uncommitted_masters.count(r)); uncommitted_masters[r].slaves.erase(from); - if (uncommitted_masters[r].slaves.empty()) + if (!uncommitted_masters[r].recovering && uncommitted_masters[r].slaves.empty()) log_master_commit(r); } @@ -2196,20 +2202,20 @@ void MDCache::logged_master_update(metareqid_t reqid) } /* - * The mds could crash after receiving all slaves' commit acknowledgement, - * but before journalling the ECommitted. + * Master may crash after receiving all slaves' commit acks, but before journalling + * the final commit. Slaves may crash after journalling the slave commit, but before + * sending commit ack to the master. Commit masters with no uncommitted slave when + * resolve finishes. */ void MDCache::finish_committed_masters() { - map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin(); - while (p != uncommitted_masters.end()) { - if (p->second.slaves.empty()) { - metareqid_t reqid = p->first; - dout(10) << "finish_committed_masters " << reqid << dendl; - ++p; - log_master_commit(reqid); - } else { - ++p; + for (map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin(); + p != uncommitted_masters.end(); + ++p) { + p->second.recovering = false; + if (!p->second.committing && p->second.slaves.empty()) { + dout(10) << "finish_committed_masters " << p->first << dendl; + log_master_commit(p->first); } } } @@ -2450,8 +2456,6 @@ void MDCache::resolve_start() adjust_subtree_auth(rootdir, CDIR_AUTH_UNKNOWN); } resolve_gather = recovery_set; - resolve_gather.erase(mds->get_nodeid()); - rejoin_gather = resolve_gather; } void MDCache::send_resolves() @@ -2705,6 +2709,16 @@ void MDCache::handle_mds_failure(int who) } } + for (map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin(); + p != uncommitted_masters.end(); + ++p) { + // The failed MDS may have already committed the slave update + if (p->second.slaves.count(who)) { + p->second.recovering = true; + p->second.slaves.erase(who); + } + } + while (!finish.empty()) { dout(10) << "cleaning up slave request " << *finish.front() << dendl; request_finish(finish.front()); @@ -2712,6 +2726,7 @@ void MDCache::handle_mds_failure(int who) } kick_find_ino_peers(who); + kick_open_ino_peers(who); show_subtrees(); } @@ -2771,7 +2786,7 @@ void MDCache::handle_mds_recovery(int who) } kick_discovers(who); - + kick_open_ino_peers(who); kick_find_ino_peers(who); // queue them up. @@ -2964,17 +2979,17 @@ void MDCache::maybe_resolve_finish() dout(10) << "maybe_resolve_finish still waiting for resolves (" << resolve_gather << ")" << dendl; return; + } + + dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl; + disambiguate_imports(); + finish_committed_masters(); + if (mds->is_resolve()) { + trim_unlinked_inodes(); + recalc_auth_bits(); + mds->resolve_done(); } else { - dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl; - disambiguate_imports(); - if (mds->is_resolve()) { - trim_unlinked_inodes(); - recalc_auth_bits(); - trim_non_auth(); - mds->resolve_done(); - } else { - maybe_send_pending_rejoins(); - } + maybe_send_pending_rejoins(); } } @@ -3397,6 +3412,8 @@ void MDCache::recalc_auth_bits() dnl->get_inode()->state_clear(CInode::STATE_AUTH); if (dnl->get_inode()->is_dirty()) dnl->get_inode()->mark_clean(); + if (dnl->get_inode()->is_dirty_parent()) + dnl->get_inode()->clear_dirty_parent(); // avoid touching scatterlocks for our subtree roots! if (subtree_inodes.count(dnl->get_inode()) == 0) dnl->get_inode()->clear_scatter_dirty(); @@ -3451,6 +3468,15 @@ void MDCache::recalc_auth_bits() * after recovery. */ +void MDCache::rejoin_start() +{ + dout(10) << "rejoin_start" << dendl; + + rejoin_gather = recovery_set; + // need finish opening cap inodes before sending cache rejoins + rejoin_gather.insert(mds->get_nodeid()); + process_imported_caps(); +} /* * rejoin phase! @@ -3467,6 +3493,11 @@ void MDCache::rejoin_send_rejoins() { dout(10) << "rejoin_send_rejoins with recovery_set " << recovery_set << dendl; + if (rejoin_gather.count(mds->get_nodeid())) { + dout(7) << "rejoin_send_rejoins still processing imported caps, delaying" << dendl; + rejoins_pending = true; + return; + } if (!resolve_gather.empty()) { dout(7) << "rejoin_send_rejoins still waiting for resolves (" << resolve_gather << ")" << dendl; @@ -3476,12 +3507,6 @@ void MDCache::rejoin_send_rejoins() map<int, MMDSCacheRejoin*> rejoins; - // encode cap list once. - bufferlist cap_export_bl; - if (mds->is_rejoin()) { - ::encode(cap_exports, cap_export_bl); - ::encode(cap_export_paths, cap_export_bl); - } // if i am rejoining, send a rejoin to everyone. // otherwise, just send to others who are rejoining. @@ -3490,12 +3515,20 @@ void MDCache::rejoin_send_rejoins() ++p) { if (*p == mds->get_nodeid()) continue; // nothing to myself! if (rejoin_sent.count(*p)) continue; // already sent a rejoin to this node! - if (mds->is_rejoin()) { + if (mds->is_rejoin()) rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_WEAK); - rejoins[*p]->copy_cap_exports(cap_export_bl); - } else if (mds->mdsmap->is_rejoin(*p)) + else if (mds->mdsmap->is_rejoin(*p)) rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_STRONG); - } + } + + if (mds->is_rejoin()) { + for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = cap_exports.begin(); + p != cap_exports.end(); + p++) { + assert(cap_export_targets.count(p->first)); + rejoins[cap_export_targets[p->first]]->cap_exports[p->first] = p->second; + } + } assert(!migrator->is_importing()); assert(!migrator->is_exporting()); @@ -3821,7 +3854,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak) p != weak->cap_exports.end(); ++p) { CInode *in = get_inode(p->first); - if (!in || !in->is_auth()) continue; + assert(!in || in->is_auth()); for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin(); q != p->second.end(); ++q) { @@ -3838,16 +3871,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak) p != weak->cap_exports.end(); ++p) { CInode *in = get_inode(p->first); - if (in && !in->is_auth()) - continue; - filepath& path = weak->cap_export_paths[p->first]; - if (!in) { - if (!path_is_mine(path)) - continue; - cap_import_paths[p->first] = path; - dout(10) << " noting cap import " << p->first << " path " << path << dendl; - } - + assert(in && in->is_auth()); // note for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin(); q != p->second.end(); @@ -4016,6 +4040,7 @@ public: } }; +#if 0 /** * parallel_fetch -- make a pass at fetching a bunch of paths in parallel * @@ -4134,9 +4159,7 @@ bool MDCache::parallel_fetch_traverse_dir(inodeno_t ino, filepath& path, missing.insert(ino); return true; } - - - +#endif /* * rejoin_scour_survivor_replica - remove source from replica list on unmentioned objects @@ -4505,7 +4528,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) int from = ack->get_source().num(); // for sending cache expire message - list<CInode*> isolated_inodes; + set<CInode*> isolated_inodes; // dirs for (map<dirfrag_t, MMDSCacheRejoin::dirfrag_strong>::iterator p = ack->strong_dirfrags.begin(); @@ -4521,19 +4544,20 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) diri = new CInode(this, false); diri->inode.ino = p->first.ino; diri->inode.mode = S_IFDIR; - if (MDS_INO_MDSDIR(p->first.ino)) { + add_inode(diri); + if (MDS_INO_MDSDIR(from) == p->first.ino) { diri->inode_auth = pair<int,int>(from, CDIR_AUTH_UNKNOWN); - add_inode(diri); dout(10) << " add inode " << *diri << dendl; } else { - diri->inode_auth = CDIR_AUTH_UNDEF; - isolated_inodes.push_back(diri); + diri->inode_auth = CDIR_AUTH_DEFAULT; + isolated_inodes.insert(diri); dout(10) << " unconnected dirfrag " << p->first << dendl; } } // barebones dirfrag; the full dirfrag loop below will clean up. dir = diri->add_dirfrag(new CDir(diri, p->first.frag, this, false)); - if (dir->authority().first != from) + if (dir->authority() != CDIR_AUTH_UNDEF && + dir->authority().first != from) adjust_subtree_auth(dir, from); dout(10) << " add dirfrag " << *dir << dendl; } @@ -4598,6 +4622,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) in->get_parent_dir()->unlink_inode(in->get_parent_dn()); } dn->dir->link_primary_inode(dn, in); + isolated_inodes.erase(in); } } @@ -4659,20 +4684,9 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack) dout(10) << " got inode locks " << *in << dendl; } - // trim unconnected subtree - if (!isolated_inodes.empty()) { - map<int, MCacheExpire*> expiremap; - for (list<CInode*>::iterator p = isolated_inodes.begin(); - p != isolated_inodes.end(); - ++p) { - list<CDir*> ls; - (*p)->get_dirfrags(ls); - trim_dirfrag(*ls.begin(), 0, expiremap); - assert((*p)->get_num_ref() == 0); - delete *p; - } - send_expire_messages(expiremap); - } + // FIXME: This can happen if entire subtree, together with the inode subtree root + // belongs to, were trimmed between sending cache rejoin and receiving rejoin ack. + assert(isolated_inodes.empty()); // done? assert(rejoin_ack_gather.count(from)); @@ -4840,16 +4854,9 @@ void MDCache::rejoin_gather_finish() if (open_undef_inodes_dirfrags()) return; - // fetch paths? - // do this before ack, since some inodes we may have already gotten - // from surviving MDSs. - if (!cap_import_paths.empty()) { - if (parallel_fetch(cap_import_paths, cap_imports_missing)) { - return; - } - } - - process_imported_caps(); + if (process_imported_caps()) + return; + choose_lock_states_and_reconnect_caps(); identify_files_to_recover(rejoin_recover_q, rejoin_check_q); @@ -4867,34 +4874,123 @@ void MDCache::rejoin_gather_finish() } } -void MDCache::process_imported_caps() +class C_MDC_RejoinOpenInoFinish: public Context { + MDCache *cache; + inodeno_t ino; +public: + C_MDC_RejoinOpenInoFinish(MDCache *c, inodeno_t i) : cache(c), ino(i) {} + void finish(int r) { + cache->rejoin_open_ino_finish(ino, r); + } +}; + +void MDCache::rejoin_open_ino_finish(inodeno_t ino, int ret) +{ + dout(10) << "open_caps_inode_finish ino " << ino << " ret " << ret << dendl; + + if (ret < 0) { + cap_imports_missing.insert(ino); + } else if (ret == mds->get_nodeid()) { + assert(get_inode(ino)); + } else { + map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > >::iterator p; + p = cap_imports.find(ino); + assert(p != cap_imports.end()); + for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + assert(q->second.count(-1)); + assert(q->second.size() == 1); + rejoin_export_caps(p->first, q->first, q->second[-1], ret); + } + cap_imports.erase(p); + } + + assert(cap_imports_num_opening > 0); + cap_imports_num_opening--; + + if (cap_imports_num_opening == 0) { + if (rejoin_gather.count(mds->get_nodeid())) + process_imported_caps(); + else + rejoin_gather_finish(); + } +} + +bool MDCache::process_imported_caps() { dout(10) << "process_imported_caps" << dendl; - // process cap imports - // ino -> client -> frommds -> capex - map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin(); - while (p != cap_imports.end()) { + map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p; + for (p = cap_imports.begin(); p != cap_imports.end(); ++p) { CInode *in = get_inode(p->first); - if (!in) { - dout(10) << "process_imported_caps still missing " << p->first - << ", will try again after replayed client requests" - << dendl; - ++p; + if (in) { + assert(in->is_auth()); + cap_imports_missing.erase(p->first); continue; } - for (map<client_t, map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin(); - q != p->second.end(); - ++q) - for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin(); + if (cap_imports_missing.count(p->first) > 0) + continue; + + cap_imports_num_opening++; + dout(10) << " opening missing ino " << p->first << dendl; + open_ino(p->first, (int64_t)-1, new C_MDC_RejoinOpenInoFinish(this, p->first), false); + } + + if (cap_imports_num_opening > 0) + return true; + + // called by rejoin_gather_finish() ? + if (rejoin_gather.count(mds->get_nodeid()) == 0) { + // process cap imports + // ino -> client -> frommds -> capex + p = cap_imports.begin(); + while (p != cap_imports.end()) { + CInode *in = get_inode(p->first); + if (!in) { + dout(10) << " still missing ino " << p->first + << ", will try again after replayed client requests" << dendl; + ++p; + continue; + } + assert(in->is_auth()); + for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin(); + q != p->second.end(); + ++q) + for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin(); + r != q->second.end(); + ++r) { + dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl; + add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm)); + rejoin_import_cap(in, q->first, r->second, r->first); + } + cap_imports.erase(p++); // remove and move on + } + } else { + for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator q = cap_exports.begin(); + q != cap_exports.end(); + q++) { + for (map<client_t,ceph_mds_cap_reconnect>::iterator r = q->second.begin(); r != q->second.end(); ++r) { - dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl; - add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm)); - rejoin_import_cap(in, q->first, r->second, r->first); + dout(10) << " exporting caps for client." << r->first << " ino " << q->first << dendl; + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(r->first.v)); + assert(session); + // mark client caps stale. + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, q->first, 0, 0, 0); + mds->send_message_client_counted(m, session); } - cap_imports.erase(p++); // remove and move on + } + + trim_non_auth(); + + rejoin_gather.erase(mds->get_nodeid()); + maybe_send_pending_rejoins(); + + if (rejoin_gather.empty() && rejoin_ack_gather.count(mds->get_nodeid())) + rejoin_gather_finish(); } + return false; } void MDCache::check_realm_past_parents(SnapRealm *realm) @@ -5056,9 +5152,12 @@ void MDCache::export_remaining_imported_caps() { dout(10) << "export_remaining_imported_caps" << dendl; + stringstream warn_str; + for (map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin(); p != cap_imports.end(); ++p) { + warn_str << " ino " << p->first << "\n"; for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin(); q != p->second.end(); ++q) { @@ -5072,6 +5171,11 @@ void MDCache::export_remaining_imported_caps() } cap_imports.clear(); + + if (warn_str.peek() != EOF) { + mds->clog.warn() << "failed to reconnect caps for missing inodes:" << "\n"; + mds->clog.warn(warn_str); + } } void MDCache::try_reconnect_cap(CInode *in, Session *session) @@ -5216,9 +5320,22 @@ void MDCache::open_snap_parents() gather.set_finisher(new C_MDC_OpenSnapParents(this)); gather.activate(); } else { + if (!reconnected_snaprealms.empty()) { + stringstream warn_str; + for (map<inodeno_t,map<client_t,snapid_t> >::iterator p = reconnected_snaprealms.begin(); + p != reconnected_snaprealms.end(); + ++p) { + warn_str << " unconnected snaprealm " << p->first << "\n"; + for (map<client_t,snapid_t>::iterator q = p->second.begin(); + q != p->second.end(); + ++q) + warn_str << " client." << q->first << " snapid " << q->second << "\n"; + } + mds->clog.warn() << "open_snap_parents has:" << "\n"; + mds->clog.warn(warn_str); + } assert(rejoin_waiters.empty()); assert(missing_snap_parents.empty()); - //assert(reconnected_snaprealms.empty()); // FIXME: need to properly address #5031 dout(10) << "open_snap_parents - all open" << dendl; do_delayed_cap_imports(); @@ -5504,7 +5621,7 @@ void MDCache::queue_file_recover(CInode *in) } in->parent->first = in->first; - le->metablob.add_primary_dentry(in->parent, true, in); + le->metablob.add_primary_dentry(in->parent, in, true); mds->mdlog->submit_entry(le, new C_MDC_QueuedCow(this, in, mut)); mds->mdlog->flush(); } @@ -5784,7 +5901,7 @@ void MDCache::truncate_inode_finish(CInode *in, LogSegment *ls) EUpdate *le = new EUpdate(mds->mdlog, "truncate finish"); mds->mdlog->start_entry(le); le->metablob.add_dir_context(in->get_parent_dir()); - le->metablob.add_primary_dentry(in->get_projected_parent_dn(), true, in); + le->metablob.add_primary_dentry(in->get_projected_parent_dn(), in, true); le->metablob.add_truncate_finish(in->ino(), ls->offset); journal_dirty_inode(mut, &le->metablob, in); @@ -6133,7 +6250,6 @@ void MDCache::trim_inode(CDentry *dn, CInode *in, CDir *con, map<int, MCacheExpi void MDCache::trim_non_auth() { dout(7) << "trim_non_auth" << dendl; - stringstream warn_str_dirs; // temporarily pin all subtree roots for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin(); @@ -6167,22 +6283,18 @@ void MDCache::trim_non_auth() assert(dir); // unlink the dentry - dout(15) << "trim_non_auth removing " << *dn << dendl; + dout(10) << " removing " << *dn << dendl; if (dnl->is_remote()) { dir->unlink_inode(dn); } else if (dnl->is_primary()) { CInode *in = dnl->get_inode(); + dout(10) << " removing " << *in << dendl; list<CDir*> ls; - warn_str_dirs << in->get_parent_dn()->get_name() << "\n"; in->get_dirfrags(ls); for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) { CDir *subdir = *p; - filepath fp; - subdir->get_inode()->make_path(fp); - warn_str_dirs << fp << "\n"; - if (subdir->is_subtree_root()) - remove_subtree(subdir); + assert(!subdir->is_subtree_root()); in->close_dirfrag(subdir->dirfrag().frag); } dir->unlink_inode(dn); @@ -6221,18 +6333,13 @@ void MDCache::trim_non_auth() for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) { - dout(0) << " ... " << **p << dendl; - CInode *diri = (*p)->get_inode(); - filepath fp; - diri->make_path(fp); - warn_str_dirs << fp << "\n"; + dout(10) << " removing " << **p << dendl; assert((*p)->get_num_ref() == 1); // SUBTREE remove_subtree((*p)); in->close_dirfrag((*p)->dirfrag().frag); } - dout(0) << " ... " << *in << dendl; - if (in->get_parent_dn()) - warn_str_dirs << in->get_parent_dn()->get_name() << "\n"; + dout(10) << " removing " << *in << dendl; + assert(!in->get_parent_dn()); assert(in->get_num_ref() == 0); remove_inode(in); } @@ -6241,10 +6348,6 @@ void MDCache::trim_non_auth() } show_subtrees(); - if (warn_str_dirs.peek() != EOF) { - mds->clog.info() << "trim_non_auth has deleted paths: " << "\n"; - mds->clog.info(warn_str_dirs); - } } /** @@ -7024,6 +7127,13 @@ void MDCache::dispatch(Message *m) case MSG_MDS_FINDINOREPLY: handle_find_ino_reply(static_cast<MMDSFindInoReply *>(m)); break; + + case MSG_MDS_OPENINO: + handle_open_ino(static_cast<MMDSOpenIno *>(m)); + break; + case MSG_MDS_OPENINOREPLY: + handle_open_ino_reply(static_cast<MMDSOpenInoReply *>(m)); + break; default: dout(7) << "cache unknown message " << m->get_type() << dendl; @@ -7232,8 +7342,8 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh } else { dout(7) << "remote link to " << dnl->get_remote_ino() << ", which i don't have" << dendl; assert(mdr); // we shouldn't hit non-primary dentries doing a non-mdr traversal! - open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin), - (null_okay && depth == path.depth() - 1)); + open_remote_dentry(dn, true, _get_waiter(mdr, req, fin), + (null_okay && depth == path.depth() - 1)); if (mds->logger) mds->logger->inc(l_mds_trino); return 1; } @@ -7390,6 +7500,7 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh return 0; } +#if 0 /** * Find out if the MDS is auth for a given path. * @@ -7422,6 +7533,7 @@ bool MDCache::path_is_mine(filepath& path) return cur->is_auth(); } +#endif CInode *MDCache::cache_traverse(const filepath& fp) { @@ -7678,36 +7790,51 @@ void MDCache::open_remote_ino_2(inodeno_t ino, vector<Anchor>& anchortrace, bool struct C_MDC_OpenRemoteDentry : public Context { MDCache *mdc; CDentry *dn; - bool projected; + inodeno_t ino; Context *onfinish; - C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, bool p, Context *f) : - mdc(m), dn(d), projected(p), onfinish(f) {} + bool want_xlocked; + int mode; + C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, inodeno_t i, Context *f, + bool wx, int md) : + mdc(m), dn(d), ino(i), onfinish(f), want_xlocked(wx), mode(md) {} void finish(int r) { - mdc->_open_remote_dentry_finish(r, dn, projected, onfinish); + mdc->_open_remote_dentry_finish(dn, ino, onfinish, want_xlocked, mode, r); } }; -void MDCache::open_remote_dentry(CDentry *dn, bool projected, Context *fin) +void MDCache::open_remote_dentry(CDentry *dn, bool projected, Context *fin, bool want_xlocked) { dout(10) << "open_remote_dentry " << *dn << dendl; CDentry::linkage_t *dnl = projected ? dn->get_projected_linkage() : dn->get_linkage(); - open_remote_ino(dnl->get_remote_ino(), - new C_MDC_OpenRemoteDentry(this, dn, projected, fin)); + inodeno_t ino = dnl->get_remote_ino(); + int mode = g_conf->mds_open_remote_link_mode; + Context *fin2 = new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked, mode); + if (mode == 0) + open_remote_ino(ino, fin2, want_xlocked); // anchor + else + open_ino(ino, -1, fin2, true, want_xlocked); // backtrace } -void MDCache::_open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin) +void MDCache::_open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin, + bool want_xlocked, int mode, int r) { - if (r == -ENOENT) { - dout(0) << "open_remote_dentry_finish bad remote dentry " << *dn << dendl; - dn->state_set(CDentry::STATE_BADREMOTEINO); - } else if (r != 0) - assert(0); - fin->finish(r); - delete fin; + if (r < 0) { + if (mode == 0) { + dout(0) << "open_remote_dentry_finish bad remote dentry " << *dn << dendl; + dn->state_set(CDentry::STATE_BADREMOTEINO); + } else { + dout(7) << "open_remote_dentry_finish failed to open ino " << ino + << " for " << *dn << ", retry using anchortable" << dendl; + assert(mode == 1); + Context *fin2 = new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked, 0); + open_remote_ino(ino, fin2, want_xlocked); + return; + } + } + fin->complete(r < 0 ? r : 0); } - void MDCache::make_trace(vector<CDentry*>& trace, CInode *in) { // empty trace if we're a base inode @@ -7724,6 +7851,443 @@ void MDCache::make_trace(vector<CDentry*>& trace, CInode *in) } +// ------------------------------------------------------------------------------- +// Open inode by inode number + +class C_MDC_OpenInoBacktraceFetched : public Context { + MDCache *cache; + inodeno_t ino; + public: + bufferlist bl; + C_MDC_OpenInoBacktraceFetched(MDCache *c, inodeno_t i) : + cache(c), ino(i) {} + void finish(int r) { + cache->_open_ino_backtrace_fetched(ino, bl, r); + } +}; + +struct C_MDC_OpenInoTraverseDir : public Context { + MDCache *cache; + inodeno_t ino; + public: + C_MDC_OpenInoTraverseDir(MDCache *c, inodeno_t i) : cache(c), ino(i) {} + void finish(int r) { + assert(cache->opening_inodes.count(ino)); + cache->_open_ino_traverse_dir(ino, cache->opening_inodes[ino], r); + } +}; + +struct C_MDC_OpenInoParentOpened : public Context { + MDCache *cache; + inodeno_t ino; + public: + C_MDC_OpenInoParentOpened(MDCache *c, inodeno_t i) : cache(c), ino(i) {} + void finish(int r) { + cache->_open_ino_parent_opened(ino, r); + } +}; + +void MDCache::_open_ino_backtrace_fetched(inodeno_t ino, bufferlist& bl, int err) +{ + dout(10) << "_open_ino_backtrace_fetched ino " << ino << " errno " << err << dendl; + + assert(opening_inodes.count(ino)); + open_ino_info_t& info = opening_inodes[ino]; + + CInode *in = get_inode(ino); + if (in) { + dout(10) << " found cached " << *in << dendl; + open_ino_finish(ino, info, in->authority().first); + return; + } + + inode_backtrace_t backtrace; + if (err == 0) { + ::decode(backtrace, bl); + if (backtrace.pool != info.pool) { + dout(10) << " old object in pool " << info.pool + << ", retrying pool " << backtrace.pool << dendl; + info.pool = backtrace.pool; + C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino); + fetch_backtrace(ino, info.pool, fin->bl, fin); + return; + } + } else if (err == -ENOENT) { + int64_t meta_pool = mds->mdsmap->get_metadata_pool(); + if (info.pool != meta_pool) { + dout(10) << " no object in pool " << info.pool + << ", retrying pool " << meta_pool << dendl; + info.pool = meta_pool; + C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino); + fetch_backtrace(ino, info.pool, fin->bl, fin); + return; + } + } + + if (err == 0) { + if (backtrace.ancestors.empty()) { + dout(10) << " got empty backtrace " << dendl; + err = -EIO; + } else if (!info.ancestors.empty()) { + if (info.ancestors[0] == backtrace.ancestors[0]) { + dout(10) << " got same parents " << info.ancestors[0] << " 2 times" << dendl; + err = -EINVAL; + } + } + } + if (err) { + dout(10) << " failed to open ino " << ino << dendl; + open_ino_finish(ino, info, err); + return; + } + + dout(10) << " got backtrace " << backtrace << dendl; + info.ancestors = backtrace.ancestors; + + _open_ino_traverse_dir(ino, info, 0); +} + +void MDCache::_open_ino_parent_opened(inodeno_t ino, int ret) +{ + dout(10) << "_open_ino_parent_opened ino " << ino << " ret " << ret << dendl; + + assert(opening_inodes.count(ino)); + open_ino_info_t& info = opening_inodes[ino]; + + CInode *in = get_inode(ino); + if (in) { + dout(10) << " found cached " << *in << dendl; + open_ino_finish(ino, info, in->authority().first); + return; + } + + if (ret == mds->get_nodeid()) { + _open_ino_traverse_dir(ino, info, 0); + } else { + if (ret >= 0) { + info.check_peers = true; + info.auth_hint = ret; + info.checked.erase(ret); + } + do_open_ino(ino, info, ret); + } +} + +Context* MDCache::_open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m) +{ + if (m) + return new C_MDS_RetryMessage(mds, m); + else + return new C_MDC_OpenInoTraverseDir(this, ino); +} + +void MDCache::_open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int ret) +{ + dout(10) << "_open_ino_trvserse_dir ino " << ino << " ret " << ret << dendl; + + CInode *in = get_inode(ino); + if (in) { + dout(10) << " found cached " << *in << dendl; + open_ino_finish(ino, info, in->authority().first); + return; + } + + if (ret) { + do_open_ino(ino, info, ret); + return; + } + + int hint = info.auth_hint; + ret = open_ino_traverse_dir(ino, NULL, info.ancestors, + info.discover, info.want_xlocked, &hint); + if (ret > 0) + return; + if (hint != mds->get_nodeid()) + info.auth_hint = hint; + do_open_ino(ino, info, ret); +} + +int MDCache::open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m, + vector<inode_backpointer_t>& ancestors, + bool discover, bool want_xlocked, int *hint) +{ + dout(10) << "open_ino_traverse_dir ino " << ino << " " << ancestors << dendl; + int err = 0; + for (unsigned i = 0; i < ancestors.size(); i++) { + CInode *diri = get_inode(ancestors[i].dirino); + + if (!diri) { + if (discover && MDS_INO_IS_MDSDIR(ancestors[i].dirino)) { + open_foreign_mdsdir(ancestors[i].dirino, _open_ino_get_waiter(ino, m)); + return 1; + } + continue; + } + + if (diri->state_test(CInode::STATE_REJOINUNDEF)) + continue; + + if (!diri->is_dir()) { + dout(10) << " " << *diri << " is not dir" << dendl; + if (i == 0) + err = -ENOTDIR; + break; + } + + string &name = ancestors[i].dname; + frag_t fg = diri->pick_dirfrag(name); + CDir *dir = diri->get_dirfrag(fg); + if (!dir) { + if (diri->is_auth()) { + if (diri->is_frozen()) { + dout(10) << " " << *diri << " is frozen, waiting " << dendl; + diri->add_waiter(CDir::WAIT_UNFREEZE, _open_ino_get_waiter(ino, m)); + return 1; + } + dir = diri->get_or_open_dirfrag(this, fg); + } else if (discover) { + open_remote_dirfrag(diri, fg, _open_ino_get_waiter(ino, m)); + return 1; + } + } + if (dir) { + inodeno_t next_ino = i > 0 ? ancestors[i - 1].dirino : ino; + if (dir->is_auth()) { + CDentry *dn = dir->lookup(name); + CDentry::linkage_t *dnl = dn ? dn->get_linkage() : NULL; + + if (dnl && dnl->is_primary() && + dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF)) { + dout(10) << " fetching undef " << *dnl->get_inode() << dendl; + dir->fetch(_open_ino_get_waiter(ino, m)); + return 1; + } + + if (!dnl && !dir->is_complete() && + (!dir->has_bloom() || dir->is_in_bloom(name))) { + dout(10) << " fetching incomplete " << *dir << dendl; + dir->fetch(_open_ino_get_waiter(ino, m)); + return 1; + } + + dout(10) << " no ino " << next_ino << " in " << *dir << dendl; + if (i == 0) + err = -ENOENT; + } else if (discover) { + discover_ino(dir, next_ino, _open_ino_get_waiter(ino, m), + (i == 0 && want_xlocked)); + return 1; + } + } + if (hint && i == 0) + *hint = dir ? dir->authority().first : diri->authority().first; + break; + } + return err; +} + +void MDCache::open_ino_finish(inodeno_t ino, open_ino_info_t& info, int ret) +{ + dout(10) << "open_ino_finish ino " << ino << " ret " << ret << dendl; + + finish_contexts(g_ceph_context, info.waiters, ret); + opening_inodes.erase(ino); +} + +void MDCache::do_open_ino(inodeno_t ino, open_ino_info_t& info, int err) +{ + if (err < 0) { + info.checked.clear(); + info.checked.insert(mds->get_nodeid()); + info.checking = -1; + info.check_peers = true; + info.fetch_backtrace = true; + if (info.discover) { + info.discover = false; + info.ancestors.clear(); + } + } + + if (info.check_peers) { + info.check_peers = false; + info.checking = -1; + do_open_ino_peer(ino, info); + } else if (info.fetch_backtrace) { + info.check_peers = true; + info.fetch_backtrace = false; + info.checking = mds->get_nodeid(); + info.checked.clear(); + info.checked.insert(mds->get_nodeid()); + C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino); + fetch_backtrace(ino, info.pool, fin->bl, fin); + } else { + assert(!info.ancestors.empty()); + info.checking = mds->get_nodeid(); + open_ino(info.ancestors[0].dirino, mds->mdsmap->get_metadata_pool(), + new C_MDC_OpenInoParentOpened(this, ino), info.want_replica); + } +} + +void MDCache::do_open_ino_peer(inodeno_t ino, open_ino_info_t& info) +{ + set<int> all, active; + mds->mdsmap->get_mds_set(all); + mds->mdsmap->get_clientreplay_or_active_or_stopping_mds_set(active); + if (mds->get_state() == MDSMap::STATE_REJOIN) + mds->mdsmap->get_mds_set(active, MDSMap::STATE_REJOIN); + + dout(10) << "do_open_ino_peer " << ino << " active " << active + << " all " << all << " checked " << info.checked << dendl; + + int peer = -1; + if (info.auth_hint >= 0) { + if (active.count(info.auth_hint)) { + peer = info.auth_hint; + info.auth_hint = -1; + } + } else { + for (set<int>::iterator p = active.begin(); p != active.end(); ++p) + if (*p != mds->get_nodeid() && info.checked.count(*p) == 0) { + peer = *p; + break; + } + } + if (peer < 0) { + if (all.size() > active.size() && all != info.checked) { + dout(10) << " waiting for more peers to be active" << dendl; + } else { + dout(10) << " all MDS peers have been checked " << dendl; + do_open_ino(ino, info, 0); + } + } else { + info.checking = peer; + mds->send_message_mds(new MMDSOpenIno(info.tid, ino, info.ancestors), peer); + } +} + +void MDCache::handle_open_ino(MMDSOpenIno *m) +{ + dout(10) << "handle_open_ino " << *m << dendl; + + inodeno_t ino = m->ino; + MMDSOpenInoReply *reply; + CInode *in = get_inode(ino); + if (in) { + dout(10) << " have " << *in << dendl; + reply = new MMDSOpenInoReply(m->get_tid(), ino, 0); + if (in->is_auth()) { + touch_inode(in); + while (1) { + CDentry *pdn = in->get_parent_dn(); + if (!pdn) + break; + CInode *diri = pdn->get_dir()->get_inode(); + reply->ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, + in->inode.version)); + in = diri; + } + } else { + reply->hint = in->authority().first; + } + } else { + int hint = -1; + int ret = open_ino_traverse_dir(ino, m, m->ancestors, false, false, &hint); + if (ret > 0) + return; + reply = new MMDSOpenInoReply(m->get_tid(), ino, hint, ret); + } + mds->messenger->send_message(reply, m->get_connection()); + m->put(); +} + +void MDCache::handle_open_ino_reply(MMDSOpenInoReply *m) +{ + dout(10) << "handle_open_ino_reply " << *m << dendl; + + inodeno_t ino = m->ino; + int from = m->get_source().num(); + if (opening_inodes.count(ino)) { + open_ino_info_t& info = opening_inodes[ino]; + + if (info.checking == from) + info.checking = -1; + info.checked.insert(from); + + CInode *in = get_inode(ino); + if (in) { + dout(10) << " found cached " << *in << dendl; + open_ino_finish(ino, info, in->authority().first); + } else if (!m->ancestors.empty()) { + dout(10) << " found ino " << ino << " on mds." << from << dendl; + if (!info.want_replica) { + open_ino_finish(ino, info, from); + return; + } + + info.ancestors = m->ancestors; + info.auth_hint = from; + info.checking = mds->get_nodeid(); + info.discover = true; + _open_ino_traverse_dir(ino, info, 0); + } else if (m->error) { + dout(10) << " error " << m->error << " from mds." << from << dendl; + do_open_ino(ino, info, m->error); + } else { + if (m->hint >= 0 && m->hint != mds->get_nodeid()) { + info.auth_hint = m->hint; + info.checked.erase(m->hint); + } + do_open_ino_peer(ino, info); + } + } + m->put(); +} + +void MDCache::kick_open_ino_peers(int who) +{ + dout(10) << "kick_open_ino_peers mds." << who << dendl; + + for (map<inodeno_t, open_ino_info_t>::iterator p = opening_inodes.begin(); + p != opening_inodes.end(); + ++p) { + open_ino_info_t& info = p->second; + if (info.checking == who) { + dout(10) << " kicking ino " << p->first << " who was checking mds." << who << dendl; + info.checking = -1; + do_open_ino_peer(p->first, info); + } else if (info.checking == -1) { + dout(10) << " kicking ino " << p->first << " who was waiting" << dendl; + do_open_ino_peer(p->first, info); + } + } +} + +void MDCache::open_ino(inodeno_t ino, int64_t pool, Context* fin, + bool want_replica, bool want_xlocked) +{ + dout(10) << "open_ino " << ino << " pool " << pool << " want_replica " + << want_replica << dendl; + + if (opening_inodes.count(ino)) { + open_ino_info_t& info = opening_inodes[ino]; + if (want_replica) { + info.want_replica = true; + if (want_xlocked) + info.want_xlocked = true; + } + info.waiters.push_back(fin); + } else { + open_ino_info_t& info = opening_inodes[ino]; + info.checked.insert(mds->get_nodeid()); + info.want_replica = want_replica; + info.want_xlocked = want_xlocked; + info.tid = ++open_ino_last_tid; + info.pool = pool >= 0 ? pool : mds->mdsmap->get_first_data_pool(); + info.waiters.push_back(fin); + do_open_ino(ino, info, 0); + } +} + /* ---------------------------- */ /* @@ -8388,7 +8952,7 @@ void MDCache::snaprealm_create(MDRequest *mdr, CInode *in) predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY); journal_cow_inode(mut, &le->metablob, in); - le->metablob.add_primary_dentry(in->get_projected_parent_dn(), true, in); + le->metablob.add_primary_dentry(in->get_projected_parent_dn(), in, true); mds->mdlog->submit_entry(le, new C_MDC_snaprealm_create_finish(this, mdr, mut, in)); mds->mdlog->flush(); @@ -8631,6 +9195,20 @@ void MDCache::eval_remote(CDentry *dn) } } +void MDCache::fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin) +{ + object_t oid = CInode::get_object_name(ino, frag_t(), ""); + mds->objecter->getxattr(oid, object_locator_t(pool), "parent", CEPH_NOSNAP, &bl, 0, fin); +} + +void MDCache::remove_backtrace(inodeno_t ino, int64_t pool, Context *fin) +{ + SnapContext snapc; + object_t oid = CInode::get_object_name(ino, frag_t(), ""); + mds->objecter->removexattr(oid, object_locator_t(pool), "parent", snapc, + ceph_clock_now(g_ceph_context), 0, NULL, fin); +} + class C_MDC_PurgeStrayPurged : public Context { MDCache *cache; CDentry *dn; @@ -8645,13 +9223,12 @@ public: class C_MDC_PurgeForwardingPointers : public Context { MDCache *cache; CDentry *dn; - Context *fin; public: - inode_backtrace_t backtrace; - C_MDC_PurgeForwardingPointers(MDCache *c, CDentry *d, Context *f) : - cache(c), dn(d), fin(f) {} + bufferlist bl; + C_MDC_PurgeForwardingPointers(MDCache *c, CDentry *d) : + cache(c), dn(d) {} void finish(int r) { - cache->_purge_forwarding_pointers(&backtrace, dn, r, fin); + cache->_purge_forwarding_pointers(bl, dn, r); } }; @@ -8666,18 +9243,22 @@ public: } }; -void MDCache::_purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *d, int r, Context *fin) +void MDCache::_purge_forwarding_pointers(bufferlist& bl, CDentry *dn, int r) { assert(r == 0 || r == -ENOENT || r == -ENODATA); + inode_backtrace_t backtrace; + if (r == 0) + ::decode(backtrace, bl); + // setup gathering context C_GatherBuilder gather_bld(g_ceph_context); // remove all the objects with forwarding pointer backtraces (aka sentinels) - for (set<int64_t>::const_iterator i = backtrace->old_pools.begin(); - i != backtrace->old_pools.end(); + for (set<int64_t>::const_iterator i = backtrace.old_pools.begin(); + i != backtrace.old_pools.end(); ++i) { SnapContext snapc; - object_t oid = CInode::get_object_name(backtrace->ino, frag_t(), ""); + object_t oid = CInode::get_object_name(backtrace.ino, frag_t(), ""); object_locator_t oloc(*i); mds->objecter->remove(oid, oloc, snapc, ceph_clock_now(g_ceph_context), 0, @@ -8685,10 +9266,10 @@ void MDCache::_purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry * } if (gather_bld.has_subs()) { - gather_bld.set_finisher(fin); + gather_bld.set_finisher(new C_MDC_PurgeStray(this, dn)); gather_bld.activate(); } else { - fin->finish(r); + _purge_stray(dn, r); } } @@ -8752,17 +9333,12 @@ void MDCache::purge_stray(CDentry *dn) if (in->is_dir()) { dout(10) << "purge_stray dir ... implement me!" << dendl; // FIXME XXX // remove the backtrace - SnapContext snapc; - object_t oid = CInode::get_object_name(in->ino(), frag_t(), ""); - object_locator_t oloc(mds->mdsmap->get_metadata_pool()); - - mds->objecter->removexattr(oid, oloc, "parent", snapc, ceph_clock_now(g_ceph_context), 0, - NULL, new C_MDC_PurgeStrayPurged(this, dn)); + remove_backtrace(in->ino(), mds->mdsmap->get_metadata_pool(), + new C_MDC_PurgeStrayPurged(this, dn)); } else if (in->is_file()) { // get the backtrace before blowing away the object - C_MDC_PurgeStray *strayfin = new C_MDC_PurgeStray(this, dn); - C_MDC_PurgeForwardingPointers *fpfin = new C_MDC_PurgeForwardingPointers(this, dn, strayfin); - in->fetch_backtrace(&fpfin->backtrace, fpfin); + C_MDC_PurgeForwardingPointers *fin = new C_MDC_PurgeForwardingPointers(this, dn); + fetch_backtrace(in->ino(), in->get_inode().layout.fl_pg_pool, fin->bl, fin); } else { // not a dir or file; purged! _purge_stray_purged(dn); @@ -8837,7 +9413,7 @@ void MDCache::_purge_stray_purged(CDentry *dn, int r) pi->version = in->pre_dirty(); le->metablob.add_dir_context(dn->dir); - le->metablob.add_primary_dentry(dn, true, in); + le->metablob.add_primary_dentry(dn, in, true); mds->mdlog->submit_entry(le, new C_MDC_PurgeStrayLoggedTruncate(this, dn, mds->mdlog->get_current_segment())); } @@ -9178,7 +9754,8 @@ void MDCache::handle_discover(MDiscover *dis) snapid_t snapid = dis->get_snapid(); // get started. - if (MDS_INO_IS_BASE(dis->get_base_ino())) { + if (MDS_INO_IS_BASE(dis->get_base_ino()) && + !dis->wants_base_dir() && dis->get_want().depth() == 0) { // wants root dout(7) << "handle_discover from mds." << from << " wants base + " << dis->get_want().get_path() diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index d837586a3ac..3da8a36f799 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -53,6 +53,8 @@ class MDentryUnlink; class MLock; class MMDSFindIno; class MMDSFindInoReply; +class MMDSOpenIno; +class MMDSOpenInoReply; class Message; class MClientRequest; @@ -291,7 +293,7 @@ public: } void log_master_commit(metareqid_t reqid); void logged_master_update(metareqid_t reqid); - void _logged_master_commit(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters); + void _logged_master_commit(metareqid_t reqid); void committed_master_slave(metareqid_t r, int from); void finish_committed_masters(); @@ -323,6 +325,9 @@ protected: LogSegment *ls; list<Context*> waiters; bool safe; + bool committing; + bool recovering; + umaster() : committing(false), recovering(false) {} }; map<metareqid_t, umaster> uncommitted_masters; // master: req -> slave set @@ -407,11 +412,12 @@ protected: set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex - map<inodeno_t,filepath> cap_export_paths; + map<inodeno_t,int> cap_export_targets; // ino -> auth mds map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > > cap_imports; // ino -> client -> frommds -> capex map<inodeno_t,filepath> cap_import_paths; set<inodeno_t> cap_imports_missing; + int cap_imports_num_opening; set<CInode*> rejoin_undef_inodes; set<CInode*> rejoin_potential_updated_scatterlocks; @@ -426,7 +432,6 @@ protected: void handle_cache_rejoin_weak(MMDSCacheRejoin *m); CInode* rejoin_invent_inode(inodeno_t ino, snapid_t last); CDir* rejoin_invent_dirfrag(dirfrag_t df); - bool rejoin_fetch_dirfrags(MMDSCacheRejoin *m); void handle_cache_rejoin_strong(MMDSCacheRejoin *m); void rejoin_scour_survivor_replicas(int from, MMDSCacheRejoin *ack, set<SimpleLock *>& gather_locks, @@ -442,11 +447,13 @@ protected: rejoin_send_rejoins(); } public: + void rejoin_start(); void rejoin_gather_finish(); void rejoin_send_rejoins(); - void rejoin_export_caps(inodeno_t ino, client_t client, cap_reconnect_t& icr) { - cap_exports[ino][client] = icr.capinfo; - cap_export_paths[ino] = filepath(icr.path, (uint64_t)icr.capinfo.pathbase); + void rejoin_export_caps(inodeno_t ino, client_t client, ceph_mds_cap_reconnect& capinfo, + int target=-1) { + cap_exports[ino][client] = capinfo; + cap_export_targets[ino] = target; } void rejoin_recovered_caps(inodeno_t ino, client_t client, cap_reconnect_t& icr, int frommds=-1) { @@ -477,7 +484,10 @@ public: void add_reconnected_snaprealm(client_t client, inodeno_t ino, snapid_t seq) { reconnected_snaprealms[ino][client] = seq; } - void process_imported_caps(); + + friend class C_MDC_RejoinOpenInoFinish; + void rejoin_open_ino_finish(inodeno_t ino, int ret); + bool process_imported_caps(); void choose_lock_states_and_reconnect_caps(); void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino, map<client_t,MClientSnap*>& splits); @@ -744,15 +754,59 @@ public: void open_remote_ino_2(inodeno_t ino, vector<Anchor>& anchortrace, bool want_xlocked, inodeno_t hadino, version_t hadv, Context *onfinish); - void open_remote_dentry(CDentry *dn, bool projected, Context *fin); - void _open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin); bool parallel_fetch(map<inodeno_t,filepath>& pathmap, set<inodeno_t>& missing); bool parallel_fetch_traverse_dir(inodeno_t ino, filepath& path, set<CDir*>& fetch_queue, set<inodeno_t>& missing, C_GatherBuilder &gather_bld); + void open_remote_dentry(CDentry *dn, bool projected, Context *fin, + bool want_xlocked=false); + void _open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin, + bool want_xlocked, int mode, int r); + void make_trace(vector<CDentry*>& trace, CInode *in); + +protected: + struct open_ino_info_t { + vector<inode_backpointer_t> ancestors; + set<int> checked; + int checking; + int auth_hint; + bool check_peers; + bool fetch_backtrace; + bool discover; + bool want_replica; + bool want_xlocked; + version_t tid; + int64_t pool; + list<Context*> waiters; + open_ino_info_t() : checking(-1), auth_hint(-1), + check_peers(true), fetch_backtrace(true), discover(false) {} + }; + tid_t open_ino_last_tid; + map<inodeno_t,open_ino_info_t> opening_inodes; + + void _open_ino_backtrace_fetched(inodeno_t ino, bufferlist& bl, int err); + void _open_ino_parent_opened(inodeno_t ino, int ret); + void _open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int err); + Context* _open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m); + int open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m, + vector<inode_backpointer_t>& ancestors, + bool discover, bool want_xlocked, int *hint); + void open_ino_finish(inodeno_t ino, open_ino_info_t& info, int err); + void do_open_ino(inodeno_t ino, open_ino_info_t& info, int err); + void do_open_ino_peer(inodeno_t ino, open_ino_info_t& info); + void handle_open_ino(MMDSOpenIno *m); + void handle_open_ino_reply(MMDSOpenInoReply *m); + friend class C_MDC_OpenInoBacktraceFetched; + friend class C_MDC_OpenInoTraverseDir; + friend class C_MDC_OpenInoParentOpened; + +public: + void kick_open_ino_peers(int who); + void open_ino(inodeno_t ino, int64_t pool, Context *fin, + bool want_replica=true, bool want_xlocked=false); // -- find_ino_peer -- struct find_ino_peer_info_t { @@ -817,12 +871,15 @@ public: eval_stray(dn); } protected: - void _purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *dn, int r, Context *fin); + void fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin); + void remove_backtrace(inodeno_t ino, int64_t pool, Context *fin); + void _purge_forwarding_pointers(bufferlist& bl, CDentry *dn, int r); void _purge_stray(CDentry *dn, int r); void purge_stray(CDentry *dn); void _purge_stray_purged(CDentry *dn, int r=0); void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls); void _purge_stray_logged_truncate(CDentry *dn, LogSegment *ls); + friend class C_MDC_FetchedBacktrace; friend class C_MDC_PurgeForwardingPointers; friend class C_MDC_PurgeStray; friend class C_MDC_PurgeStrayLogged; diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 53897432522..c4773131d3c 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -619,10 +619,10 @@ void MDLog::standby_trim_segments() seg->dirty_inodes.clear_list(); seg->dirty_dentries.clear_list(); seg->open_files.clear_list(); + seg->dirty_parent_inodes.clear_list(); seg->dirty_dirfrag_dir.clear_list(); seg->dirty_dirfrag_nest.clear_list(); seg->dirty_dirfrag_dirfragtree.clear_list(); - seg->update_backtraces.clear_list(); remove_oldest_segment(); removed_segment = true; } diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index d5d6001151e..552f103f126 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -976,6 +976,8 @@ void MDS::handle_mds_map(MMDSMap *m) resolve_start(); } else if (is_reconnect()) { reconnect_start(); + } else if (is_rejoin()) { + rejoin_start(); } else if (is_clientreplay()) { clientreplay_start(); } else if (is_creating()) { @@ -1012,12 +1014,7 @@ void MDS::handle_mds_map(MMDSMap *m) if (g_conf->mds_dump_cache_after_rejoin && oldmap->is_rejoining() && !mdsmap->is_rejoining()) mdcache->dump_cache(); // for DEBUG only - } - if (oldmap->is_degraded() && !mdsmap->is_degraded() && state >= MDSMap::STATE_ACTIVE) - dout(1) << "cluster recovered." << dendl; - // did someone go active? - if (is_clientreplay() || is_active() || is_stopping()) { // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them. set<int> olddis, dis; oldmap->get_mds_set(olddis, MDSMap::STATE_ACTIVE); @@ -1028,9 +1025,17 @@ void MDS::handle_mds_map(MMDSMap *m) mdsmap->get_mds_set(dis, MDSMap::STATE_REJOIN); for (set<int>::iterator p = dis.begin(); p != dis.end(); ++p) if (*p != whoami && // not me - olddis.count(*p) == 0) // newly so? + olddis.count(*p) == 0) { // newly so? mdcache->kick_discovers(*p); + mdcache->kick_open_ino_peers(*p); + } + } + if (oldmap->is_degraded() && !mdsmap->is_degraded() && state >= MDSMap::STATE_ACTIVE) + dout(1) << "cluster recovered." << dendl; + + // did someone go active? + if (is_clientreplay() || is_active() || is_stopping()) { set<int> oldactive, active; oldmap->get_mds_set(oldactive, MDSMap::STATE_ACTIVE); oldmap->get_mds_set(oldactive, MDSMap::STATE_CLIENTREPLAY); @@ -1461,9 +1466,13 @@ void MDS::reconnect_done() void MDS::rejoin_joint_start() { dout(1) << "rejoin_joint_start" << dendl; - mdcache->finish_committed_masters(); mdcache->rejoin_send_rejoins(); } +void MDS::rejoin_start() +{ + dout(1) << "rejoin_start" << dendl; + mdcache->rejoin_start(); +} void MDS::rejoin_done() { dout(1) << "rejoin_done" << dendl; diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 88d9fe2931e..4e69dcaf8f9 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -35,7 +35,7 @@ #include "SessionMap.h" -#define CEPH_MDS_PROTOCOL 16 /* cluster internal */ +#define CEPH_MDS_PROTOCOL 17 /* cluster internal */ enum { @@ -376,6 +376,7 @@ class MDS : public Dispatcher { void reconnect_start(); void reconnect_done(); void rejoin_joint_start(); + void rejoin_start(); void rejoin_done(); void recovery_done(); void clientreplay_start(); diff --git a/src/mds/MDSMap.h b/src/mds/MDSMap.h index c5bc1c36460..3e2f67e01de 100644 --- a/src/mds/MDSMap.h +++ b/src/mds/MDSMap.h @@ -308,6 +308,13 @@ public: if (p->second.state >= STATE_REPLAY && p->second.state <= STATE_STOPPING) s.insert(p->second.rank); } + void get_clientreplay_or_active_or_stopping_mds_set(set<int>& s) { + for (map<uint64_t,mds_info_t>::const_iterator p = mds_info.begin(); + p != mds_info.end(); + ++p) + if (p->second.state >= STATE_CLIENTREPLAY && p->second.state <= STATE_STOPPING) + s.insert(p->second.rank); + } void get_mds_set(set<int>& s, int state) { for (map<uint64_t,mds_info_t>::const_iterator p = mds_info.begin(); p != mds_info.end(); diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index e7f4e27fecb..92962424e46 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -397,7 +397,7 @@ void Migrator::handle_mds_failure_or_stop(int who) cache->get_subtree_bounds(dir, bounds); import_remove_pins(dir, bounds); - // adjust auth back to me + // adjust auth back to the exporter cache->adjust_subtree_auth(dir, import_peer[df]); cache->try_subtree_merge(dir); // NOTE: may journal subtree_map as side-effect @@ -1026,6 +1026,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, map<client_t,Capability::Export> cap_map; in->export_client_caps(cap_map); ::encode(cap_map, bl); + ::encode(in->get_mds_caps_wanted(), bl); in->state_set(CInode::STATE_EXPORTINGCAPS); in->get(CInode::PIN_EXPORTINGCAPS); @@ -1067,10 +1068,6 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini { dout(12) << "finish_export_inode " << *in << dendl; - in->finish_export(now); - - finish_export_inode_caps(in); - // clean if (in->is_dirty()) in->mark_clean(); @@ -1102,9 +1099,15 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini in->item_open_file.remove_myself(); + in->clear_dirty_parent(); + // waiters in->take_waiting(CInode::WAIT_ANY_MASK, finished); + + in->finish_export(now); + finish_export_inode_caps(in); + // *** other state too? // move to end of LRU so we drop out of cache quickly! @@ -1219,9 +1222,6 @@ void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t no if (dir->is_dirty()) dir->mark_clean(); - - // discard most dir state - dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things. // suck up all waiters dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters @@ -1587,27 +1587,26 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) dout(7) << "handle_export_discover on " << m->get_path() << dendl; - if (!mds->mdcache->is_open()) { - dout(5) << " waiting for root" << dendl; - mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m)); - return; - } - // note import state dirfrag_t df = m->get_dirfrag(); - // only start discovering on this message once. if (!m->started) { m->started = true; + import_pending_msg[df] = m; import_state[df] = IMPORT_DISCOVERING; import_peer[df] = from; + } else { + // am i retrying after ancient path_traverse results? + if (import_pending_msg.count(df) == 0 || import_pending_msg[df] != m) { + dout(7) << " dropping obsolete message" << dendl; + m->put(); + return; + } } - // am i retrying after ancient path_traverse results? - if (import_state.count(df) == 0 || - import_state[df] != IMPORT_DISCOVERING) { - dout(7) << "hmm import_state is off, i must be obsolete lookup" << dendl; - m->put(); + if (!mds->mdcache->is_open()) { + dout(5) << " waiting for root" << dendl; + mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m)); return; } @@ -1633,6 +1632,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl; import_state[m->get_dirfrag()] = IMPORT_DISCOVERED; + import_pending_msg.erase(m->get_dirfrag()); // pin inode in the cache (for now) assert(in->is_dir()); @@ -1647,6 +1647,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) void Migrator::import_reverse_discovering(dirfrag_t df) { + import_pending_msg.erase(df); import_state.erase(df); import_peer.erase(df); } @@ -1661,6 +1662,7 @@ void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri) void Migrator::import_reverse_prepping(CDir *dir) { + import_pending_msg.erase(dir->dirfrag()); set<CDir*> bounds; cache->map_dirfrag_set(import_bound_ls[dir], bounds); import_remove_pins(dir, bounds); @@ -1685,6 +1687,12 @@ void Migrator::handle_export_cancel(MExportDirCancel *m) } else if (import_state[df] == IMPORT_PREPPED) { CDir *dir = mds->mdcache->get_dirfrag(df); assert(dir); + set<CDir*> bounds; + cache->get_subtree_bounds(dir, bounds); + import_remove_pins(dir, bounds); + // adjust auth back to the exportor + cache->adjust_subtree_auth(dir, import_peer[df]); + cache->try_subtree_merge(dir); import_reverse_unfreeze(dir); } else { assert(0 == "got export_cancel in weird state"); @@ -1698,32 +1706,29 @@ void Migrator::handle_export_prep(MExportDirPrep *m) int oldauth = m->get_source().num(); assert(oldauth != mds->get_nodeid()); - // make sure we didn't abort - if (import_state.count(m->get_dirfrag()) == 0 || - (import_state[m->get_dirfrag()] != IMPORT_DISCOVERED && - import_state[m->get_dirfrag()] != IMPORT_PREPPING) || - import_peer[m->get_dirfrag()] != oldauth) { - dout(10) << "handle_export_prep import has aborted, dropping" << dendl; - m->put(); - return; - } - - CInode *diri = cache->get_inode(m->get_dirfrag().ino); - assert(diri); - + CDir *dir; + CInode *diri; list<Context*> finished; // assimilate root dir. - CDir *dir; - if (!m->did_assim()) { + diri = cache->get_inode(m->get_dirfrag().ino); + assert(diri); bufferlist::iterator p = m->basedir.begin(); dir = cache->add_replica_dir(p, diri, oldauth, finished); dout(7) << "handle_export_prep on " << *dir << " (first pass)" << dendl; } else { + if (import_pending_msg.count(m->get_dirfrag()) == 0 || + import_pending_msg[m->get_dirfrag()] != m) { + dout(7) << "handle_export_prep obsolete message, dropping" << dendl; + m->put(); + return; + } + dir = cache->get_dirfrag(m->get_dirfrag()); assert(dir); dout(7) << "handle_export_prep on " << *dir << " (subsequent pass)" << dendl; + diri = dir->get_inode(); } assert(dir->is_auth() == false); @@ -1742,16 +1747,17 @@ void Migrator::handle_export_prep(MExportDirPrep *m) if (!m->did_assim()) { dout(7) << "doing assim on " << *dir << dendl; m->mark_assim(); // only do this the first time! + import_pending_msg[dir->dirfrag()] = m; + + // change import state + import_state[dir->dirfrag()] = IMPORT_PREPPING; + import_bound_ls[dir] = m->get_bounds(); + assert(g_conf->mds_kill_import_at != 3); // move pin to dir diri->put(CInode::PIN_IMPORTING); dir->get(CDir::PIN_IMPORTING); dir->state_set(CDir::STATE_IMPORTING); - - // change import state - import_state[dir->dirfrag()] = IMPORT_PREPPING; - assert(g_conf->mds_kill_import_at != 3); - import_bound_ls[dir] = m->get_bounds(); // bystander list import_bystanders[dir] = m->get_bystanders(); @@ -1868,6 +1874,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m) // note new state import_state[dir->dirfrag()] = IMPORT_PREPPED; + import_pending_msg.erase(dir->dirfrag()); assert(g_conf->mds_kill_import_at != 4); // done m->put(); @@ -2072,6 +2079,8 @@ void Migrator::import_reverse(CDir *dir) if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) in->clear_scatter_dirty(); + in->clear_dirty_parent(); + in->authlock.clear_gather(); in->linklock.clear_gather(); in->dirfragtreelock.clear_gather(); @@ -2379,7 +2388,8 @@ void Migrator::decode_import_inode_caps(CInode *in, { map<client_t,Capability::Export> cap_map; ::decode(cap_map, blp); - if (!cap_map.empty()) { + ::decode(in->get_mds_caps_wanted(), blp); + if (!cap_map.empty() || !in->get_mds_caps_wanted().empty()) { cap_imports[in].swap(cap_map); in->get(CInode::PIN_IMPORTINGCAPS); } @@ -2388,8 +2398,6 @@ void Migrator::decode_import_inode_caps(CInode *in, void Migrator::finish_import_inode_caps(CInode *in, int from, map<client_t,Capability::Export> &cap_map) { - assert(!cap_map.empty()); - for (map<client_t,Capability::Export>::iterator it = cap_map.begin(); it != cap_map.end(); ++it) { @@ -2406,6 +2414,7 @@ void Migrator::finish_import_inode_caps(CInode *in, int from, mds->mdcache->do_cap_import(session, in, cap); } + in->replica_caps_wanted = 0; in->put(CInode::PIN_IMPORTINGCAPS); } @@ -2514,7 +2523,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp, // add dentry to journal entry if (le) - le->metablob.add_dentry(dn, dn->is_dirty()); + le->metablob.add_import_dentry(dn); } #ifdef MDS_VERIFY_FRAGSTAT diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h index f395bc1d237..70b59bc0f97 100644 --- a/src/mds/Migrator.h +++ b/src/mds/Migrator.h @@ -116,6 +116,7 @@ public: protected: map<dirfrag_t,int> import_state; // FIXME make these dirfrags map<dirfrag_t,int> import_peer; + map<dirfrag_t,Message*> import_pending_msg; map<CDir*,set<int> > import_bystanders; map<CDir*,list<dirfrag_t> > import_bound_ls; map<CDir*,list<ScatterLock*> > import_updated_scatterlocks; diff --git a/src/mds/Mutation.cc b/src/mds/Mutation.cc index 4e4f69cf31e..3916b2a1a33 100644 --- a/src/mds/Mutation.cc +++ b/src/mds/Mutation.cc @@ -30,6 +30,13 @@ void Mutation::pin(MDSCacheObject *o) } } +void Mutation::unpin(MDSCacheObject *o) +{ + assert(pins.count(o)); + o->put(MDSCacheObject::PIN_REQUEST); + pins.erase(o); +} + void Mutation::set_stickydirs(CInode *in) { if (stickydirs.count(in) == 0) { diff --git a/src/mds/Mutation.h b/src/mds/Mutation.h index de122a57552..c0bea19d16e 100644 --- a/src/mds/Mutation.h +++ b/src/mds/Mutation.h @@ -113,6 +113,7 @@ struct Mutation { // pin items in cache void pin(MDSCacheObject *o); + void unpin(MDSCacheObject *o); void set_stickydirs(CInode *in); void drop_pins(); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 7889a2cb73c..98dafc3e285 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -635,25 +635,16 @@ void Server::handle_client_reconnect(MClientReconnect *m) continue; } - filepath path(p->second.path, (uint64_t)p->second.capinfo.pathbase); if (in && !in->is_auth()) { // not mine. - dout(0) << "non-auth " << p->first << " " << path - << ", will pass off to authority" << dendl; - - // mark client caps stale. - MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0); - //stale->head.migrate_seq = 0; // FIXME ****** - mds->send_message_client_counted(stale, session); - + dout(10) << "non-auth " << *in << ", will pass off to authority" << dendl; // add to cap export list. - mdcache->rejoin_export_caps(p->first, from, p->second); + mdcache->rejoin_export_caps(p->first, from, p->second.capinfo, + in->authority().first); } else { // don't know if the inode is mine - dout(0) << "missing " << p->first << " " << path - << " will load or export later" << dendl; + dout(10) << "missing ino " << p->first << ", will load later" << dendl; mdcache->rejoin_recovered_caps(p->first, from, p->second, -1); - mdcache->rejoin_export_caps(p->first, from, p->second); } } @@ -1797,6 +1788,24 @@ CDentry* Server::prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dn return dn; } +CDentry* Server::prepare_stray_dentry(MDRequest *mdr, CInode *in) +{ + CDentry *straydn = mdr->straydn; + if (straydn) { + string name; + in->name_stray_dentry(name); + if (straydn->get_name() == name) + return straydn; + + assert(!mdr->done_locking); + mdr->unpin(straydn); + } + + straydn = mdcache->get_or_create_stray_dentry(in); + mdr->straydn = straydn; + mdr->pin(straydn); + return straydn; +} /** prepare_new_inode * @@ -2670,6 +2679,7 @@ public: // dirty inode, dn, dir newi->inode.version--; // a bit hacky, see C_MDS_mknod_finish newi->mark_dirty(newi->inode.version+1, mdr->ls); + newi->_mark_dirty_parent(mdr->ls); mdr->apply(); @@ -2679,8 +2689,6 @@ public: mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR); - mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool); - MClientReply *reply = new MClientReply(mdr->client_request, 0); reply->set_extra_bl(mdr->reply_extra_bl); mds->server->reply_request(mdr, reply); @@ -2803,6 +2811,7 @@ void Server::handle_client_openc(MDRequest *mdr) dn->push_projected_linkage(in); in->inode.version = dn->pre_dirty(); + in->inode.update_backtrace(); if (cmode & CEPH_FILE_MODE_WR) { in->inode.client_ranges[client].range.first = 0; in->inode.client_ranges[client].range.last = in->inode.get_layout_size_increment(); @@ -2821,7 +2830,7 @@ void Server::handle_client_openc(MDRequest *mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); journal_allocated_inos(mdr, &le->metablob); mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); - le->metablob.add_primary_dentry(dn, true, in); + le->metablob.add_primary_dentry(dn, in, true, true); // do the open mds->locker->issue_new_caps(in, cmode, mdr->session, realm, req->is_replay()); @@ -3086,8 +3095,6 @@ public: void finish(int r) { assert(r == 0); - int64_t old_pool = in->inode.layout.fl_pg_pool; - // apply in->pop_and_dirty_projected_inode(mdr->ls); mdr->apply(); @@ -3104,16 +3111,6 @@ public: if (changed_ranges) mds->locker->share_inode_max_size(in); - - // if pool changed, queue a new backtrace and set forward pointer on old - if (old_pool != in->inode.layout.fl_pg_pool) { - mdr->ls->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool); - mdr->ls->queue_backtrace_update(in, in->inode.layout.fl_pg_pool); - - // set forwarding pointer on old backtrace - mdr->ls->remove_pending_backtraces(in->ino(), old_pool); - mdr->ls->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool); - } } }; @@ -3494,8 +3491,6 @@ void Server::handle_client_setlayout(MDRequest *mdr) EUpdate *le = new EUpdate(mdlog, "setlayout"); mdlog->start_entry(le); le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); - // add the old pool to the metablob to indicate the pool changed with this event - le->metablob.add_old_pool(old_pool); mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false); mdcache->journal_dirty_inode(mdr, &le->metablob, cur); @@ -3753,16 +3748,14 @@ void Server::handle_set_vxattr(MDRequest *mdr, CInode *cur, } pi->version = cur->pre_dirty(); + if (cur->is_file()) + pi->update_backtrace(); // log + wait mdr->ls = mdlog->get_current_segment(); EUpdate *le = new EUpdate(mdlog, "set vxattr layout"); mdlog->start_entry(le); le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); - if (cur->is_file()) { - assert(old_pool != -1); - le->metablob.add_old_pool(old_pool); - } mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false); mdcache->journal_dirty_inode(mdr, &le->metablob, cur); @@ -3995,6 +3988,7 @@ public: // a new version of hte inode since it's just been created) newi->inode.version--; newi->mark_dirty(newi->inode.version + 1, mdr->ls); + newi->_mark_dirty_parent(mdr->ls); // mkdir? if (newi->inode.is_dir()) { @@ -4014,15 +4008,6 @@ public: // hit pop mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR); - // store the backtrace on the 'parent' xattr - if (newi->inode.is_dir()) { - // if its a dir, put it in the metadata pool - mdr->ls->queue_backtrace_update(newi, mds->mdsmap->get_metadata_pool()); - } else { - // if its a file, put it in the data pool for that file - mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool); - } - // reply MClientReply *reply = new MClientReply(mdr->client_request, 0); reply->set_result(0); @@ -4077,6 +4062,7 @@ void Server::handle_client_mknod(MDRequest *mdr) newi->inode.mode |= S_IFREG; newi->inode.version = dn->pre_dirty(); newi->inode.rstat.rfiles = 1; + newi->inode.update_backtrace(); // if the client created a _regular_ file via MKNOD, it's highly likely they'll // want to write to it (e.g., if they are reexporting NFS) @@ -4117,7 +4103,7 @@ void Server::handle_client_mknod(MDRequest *mdr) mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); - le->metablob.add_primary_dentry(dn, true, newi); + le->metablob.add_primary_dentry(dn, newi, true, true); journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(mds, mdr, dn, newi, follows)); } @@ -4157,6 +4143,7 @@ void Server::handle_client_mkdir(MDRequest *mdr) newi->inode.version = dn->pre_dirty(); newi->inode.rstat.rsubdirs = 1; + newi->inode.update_backtrace(); dout(12) << " follows " << follows << dendl; if (follows >= dn->first) @@ -4175,7 +4162,7 @@ void Server::handle_client_mkdir(MDRequest *mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); journal_allocated_inos(mdr, &le->metablob); mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); - le->metablob.add_primary_dentry(dn, true, newi); + le->metablob.add_primary_dentry(dn, newi, true, true); le->metablob.add_new_dir(newdir); // dirty AND complete AND new // issue a cap on the directory @@ -4233,6 +4220,7 @@ void Server::handle_client_symlink(MDRequest *mdr) newi->inode.rstat.rbytes = newi->inode.size; newi->inode.rstat.rfiles = 1; newi->inode.version = dn->pre_dirty(); + newi->inode.update_backtrace(); if (follows >= dn->first) dn->first = follows + 1; @@ -4245,7 +4233,7 @@ void Server::handle_client_symlink(MDRequest *mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); journal_allocated_inos(mdr, &le->metablob); mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); - le->metablob.add_primary_dentry(dn, true, newi); + le->metablob.add_primary_dentry(dn, newi, true, true); journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(mds, mdr, dn, newi, follows)); } @@ -4435,8 +4423,14 @@ void Server::_link_remote(MDRequest *mdr, bool inc, CDentry *dn, CInode *targeti // 1. send LinkPrepare to dest (journal nlink++ prepare) int linkauth = targeti->authority().first; if (mdr->more()->witnessed.count(linkauth) == 0) { - dout(10) << " targeti auth must prepare nlink++/--" << dendl; + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(linkauth)) { + dout(10) << " targeti auth mds." << linkauth << " is not active" << dendl; + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(linkauth, new C_MDS_RetryRequest(mdcache, mdr)); + return; + } + dout(10) << " targeti auth must prepare nlink++/--" << dendl; int op; if (inc) op = MMDSSlaveRequest::OP_LINKPREP; @@ -4777,7 +4771,7 @@ void Server::do_link_rollback(bufferlist &rbl, int master, MDRequest *mdr) mdlog->start_entry(le); le->commit.add_dir_context(parent); le->commit.add_dir(parent, true); - le->commit.add_primary_dentry(in->get_projected_parent_dn(), true, 0); + le->commit.add_primary_dentry(in->get_projected_parent_dn(), 0, true); mdlog->submit_entry(le, new C_MDS_LoggedLinkRollback(this, mut, mdr)); mdlog->flush(); @@ -4899,18 +4893,14 @@ void Server::handle_client_unlink(MDRequest *mdr) } // -- create stray dentry? -- - CDentry *straydn = mdr->straydn; + CDentry *straydn = NULL; if (dnl->is_primary()) { - if (!straydn) { - straydn = mdcache->get_or_create_stray_dentry(dnl->get_inode()); - mdr->pin(straydn); - mdr->straydn = straydn; - } - } else if (straydn) - straydn = NULL; - if (straydn) + straydn = prepare_stray_dentry(mdr, dnl->get_inode()); dout(10) << " straydn is " << *straydn << dendl; - + } else if (mdr->straydn) { + mdr->unpin(mdr->straydn); + mdr->straydn = NULL; + } // lock set<SimpleLock*> rdlocks, wrlocks, xlocks; @@ -4996,7 +4986,8 @@ void Server::handle_client_unlink(MDRequest *mdr) } else if (mdr->more()->waiting_on_slave.count(*p)) { dout(10) << " already waiting on witness mds." << *p << dendl; } else { - _rmdir_prepare_witness(mdr, *p, dn, straydn); + if (!_rmdir_prepare_witness(mdr, *p, dn, straydn)) + return; } } if (!mdr->more()->waiting_on_slave.empty()) @@ -5075,7 +5066,8 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn) if (in->snaprealm || follows + 1 > dn->first) in->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm()); - le->metablob.add_primary_dentry(straydn, true, in); + pi->update_backtrace(); + le->metablob.add_primary_dentry(straydn, in, true, true); } else { // remote link. update remote inode. mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_DIR, -1); @@ -5158,10 +5150,16 @@ void Server::_unlink_local_finish(MDRequest *mdr, dn->get_dir()->try_remove_unlinked_dn(dn); } -void Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn) +bool Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn) { - dout(10) << "_rmdir_prepare_witness mds." << who << " for " << *mdr << dendl; + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) { + dout(10) << "_rmdir_prepare_witness mds." << who << " is not active" << dendl; + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(who, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + dout(10) << "_rmdir_prepare_witness mds." << who << dendl; MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_RMDIRPREP); dn->make_path(req->srcdnpath); @@ -5174,6 +5172,7 @@ void Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentr assert(mdr->more()->waiting_on_slave.count(who) == 0); mdr->more()->waiting_on_slave.insert(who); + return true; } struct C_MDS_SlaveRmdirPrep : public Context { @@ -5228,7 +5227,7 @@ void Server::handle_slave_rmdir_prep(MDRequest *mdr) le->rollback = mdr->more()->rollback_bl; le->commit.add_dir_context(straydn->get_dir()); - le->commit.add_primary_dentry(straydn, true, in); + le->commit.add_primary_dentry(straydn, in, true); // slave: no need to journal original dentry dout(10) << " noting renamed (unlinked) dir ino " << in->ino() << " in metablob" << dendl; @@ -5362,7 +5361,7 @@ void Server::do_rmdir_rollback(bufferlist &rbl, int master, MDRequest *mdr) mdlog->start_entry(le); le->commit.add_dir_context(dn->get_dir()); - le->commit.add_primary_dentry(dn, true, in); + le->commit.add_primary_dentry(dn, in, true); // slave: no need to journal straydn dout(10) << " noting renamed (unlinked) dir ino " << in->ino() << " in metablob" << dendl; @@ -5654,17 +5653,14 @@ void Server::handle_client_rename(MDRequest *mdr) dout(10) << " this is a link merge" << dendl; // -- create stray dentry? -- - CDentry *straydn = mdr->straydn; + CDentry *straydn = NULL; if (destdnl->is_primary() && !linkmerge) { - if (!straydn) { - straydn = mdcache->get_or_create_stray_dentry(destdnl->get_inode()); - mdr->pin(straydn); - mdr->straydn = straydn; - } - } else if (straydn) - straydn = NULL; - if (straydn) + straydn = prepare_stray_dentry(mdr, destdnl->get_inode()); dout(10) << " straydn is " << *straydn << dendl; + } else if (mdr->straydn) { + mdr->unpin(mdr->straydn); + mdr->straydn = NULL; + } // -- prepare witness list -- /* @@ -5873,7 +5869,8 @@ void Server::handle_client_rename(MDRequest *mdr) } else if (mdr->more()->waiting_on_slave.count(*p)) { dout(10) << " already waiting on witness mds." << *p << dendl; } else { - _rename_prepare_witness(mdr, *p, witnesses, srcdn, destdn, straydn); + if (!_rename_prepare_witness(mdr, *p, witnesses, srcdn, destdn, straydn)) + return; } } if (!mdr->more()->waiting_on_slave.empty()) @@ -5951,20 +5948,6 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe // did we import srci? if so, explicitly ack that import that, before we unlock and reply. assert(g_conf->mds_kill_rename_at != 7); - // backtrace - if (destdnl->inode->is_dir()) { - // replace previous backtrace on this inode with myself - mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), mds->mdsmap->get_metadata_pool()); - // queue an updated backtrace - mdr->ls->queue_backtrace_update(destdnl->inode, mds->mdsmap->get_metadata_pool()); - - } else { - // remove all pending backtraces going to the same pool - mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), destdnl->inode->inode.layout.fl_pg_pool); - // queue an updated backtrace - mdr->ls->queue_backtrace_update(destdnl->inode, destdnl->inode->inode.layout.fl_pg_pool); - } - assert(g_conf->mds_kill_rename_at != 8); // reply MClientReply *reply = new MClientReply(mdr->client_request, 0); @@ -5979,9 +5962,16 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe // helpers -void Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse, +bool Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse, CDentry *srcdn, CDentry *destdn, CDentry *straydn) { + if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) { + dout(10) << "_rename_prepare_witness mds." << who << " is not active" << dendl; + if (mdr->more()->waiting_on_slave.empty()) + mds->wait_for_active_peer(who, new C_MDS_RetryRequest(mdcache, mdr)); + return false; + } + dout(10) << "_rename_prepare_witness mds." << who << dendl; MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_RENAMEPREP); @@ -5999,6 +5989,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse assert(mdr->more()->waiting_on_slave.count(who) == 0); mdr->more()->waiting_on_slave.insert(who); + return true; } version_t Server::_rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferlist *client_map_bl) @@ -6133,6 +6124,7 @@ void Server::_rename_prepare(MDRequest *mdr, if (destdn->is_auth()) { tpi = oldin->project_inode(); //project_snaprealm tpi->version = straydn->pre_dirty(tpi->version); + tpi->update_backtrace(); } straydn->push_projected_linkage(oldin); } else if (destdnl->is_remote()) { @@ -6187,6 +6179,7 @@ void Server::_rename_prepare(MDRequest *mdr, pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary // & srcdnl->snaprealm pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv); + pi->update_backtrace(); } destdn->push_projected_linkage(srci); } @@ -6198,7 +6191,6 @@ void Server::_rename_prepare(MDRequest *mdr, if (!silent) { if (pi) { - pi->last_renamed_version = pi->version; pi->ctime = mdr->now; if (linkmerge) pi->nlink--; @@ -6252,11 +6244,11 @@ void Server::_rename_prepare(MDRequest *mdr, if (oldin->snaprealm || src_realm->get_newest_seq() + 1 > srcdn->first) oldin->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm()); straydn->first = MAX(oldin->first, next_dest_snap); - metablob->add_primary_dentry(straydn, true, oldin); + metablob->add_primary_dentry(straydn, oldin, true, true); } else if (force_journal_stray) { dout(10) << " forced journaling straydn " << *straydn << dendl; metablob->add_dir_context(straydn->get_dir()); - metablob->add_primary_dentry(straydn, true, oldin); + metablob->add_primary_dentry(straydn, oldin, true); } } else if (destdnl->is_remote()) { if (oldin->is_auth()) { @@ -6264,7 +6256,7 @@ void Server::_rename_prepare(MDRequest *mdr, metablob->add_dir_context(oldin->get_projected_parent_dir()); mdcache->journal_cow_dentry(mdr, metablob, oldin->get_projected_parent_dn(), CEPH_NOSNAP, 0, destdnl); - metablob->add_primary_dentry(oldin->get_projected_parent_dn(), true, oldin); + metablob->add_primary_dentry(oldin->get_projected_parent_dn(), oldin, true); } } } @@ -6282,7 +6274,7 @@ void Server::_rename_prepare(MDRequest *mdr, if (srci->get_projected_parent_dn()->is_auth()) { // it's remote metablob->add_dir_context(srci->get_projected_parent_dir()); mdcache->journal_cow_dentry(mdr, metablob, srci->get_projected_parent_dn(), CEPH_NOSNAP, 0, srcdnl); - metablob->add_primary_dentry(srci->get_projected_parent_dn(), true, srci); + metablob->add_primary_dentry(srci->get_projected_parent_dn(), srci, true); } } else { if (destdn->is_auth() && !destdnl->is_null()) @@ -6291,7 +6283,7 @@ void Server::_rename_prepare(MDRequest *mdr, destdn->first = MAX(destdn->first, next_dest_snap); if (destdn->is_auth()) - metablob->add_primary_dentry(destdn, true, destdnl->get_inode()); + metablob->add_primary_dentry(destdn, destdnl->get_inode(), true, true); } } else if (srcdnl->is_primary()) { // project snap parent update? @@ -6305,11 +6297,21 @@ void Server::_rename_prepare(MDRequest *mdr, destdn->first = MAX(destdn->first, next_dest_snap); if (destdn->is_auth()) - metablob->add_primary_dentry(destdn, true, srci); + metablob->add_primary_dentry(destdn, srci, true, true); else if (force_journal_dest) { dout(10) << " forced journaling destdn " << *destdn << dendl; metablob->add_dir_context(destdn->get_dir()); - metablob->add_primary_dentry(destdn, true, srci); + metablob->add_primary_dentry(destdn, srci, true); + if (srcdn->is_auth() && srci->is_dir()) { + // journal new subtrees root dirfrags + list<CDir*> ls; + srci->get_dirfrags(ls); + for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) { + CDir *dir = *p; + if (dir->is_auth()) + metablob->add_dir(dir, true); + } + } } } @@ -6321,7 +6323,7 @@ void Server::_rename_prepare(MDRequest *mdr, // both primary and NULL dentries. Because during journal replay, null dentry is // processed after primary dentry. if (srcdnl->is_primary() && !srci->is_dir() && !destdn->is_auth()) - metablob->add_primary_dentry(srcdn, true, srci); + metablob->add_primary_dentry(srcdn, srci, true); metablob->add_null_dentry(srcdn, true); } else dout(10) << " NOT journaling srcdn " << *srcdn << dendl; @@ -6341,8 +6343,6 @@ void Server::_rename_prepare(MDRequest *mdr, if (srci->is_dir()) mdcache->project_subtree_rename(srci, srcdn->get_dir(), destdn->get_dir()); - // always update the backtrace - metablob->update_backtrace(); } @@ -6789,23 +6789,10 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, mdlog->flush(); } else { if (srcdn->is_auth() && destdnl->is_primary()) { - dout(10) << " reversing inode export of " << *destdnl->get_inode() << dendl; destdnl->get_inode()->abort_export(); - - // unfreeze - assert(destdnl->get_inode()->is_frozen_inode()); - destdnl->get_inode()->unfreeze_inode(finished); } - // singleauth - if (mdr->more()->is_ambiguous_auth) { - mdr->more()->rename_inode->clear_ambiguous_auth(finished); - mdr->more()->is_ambiguous_auth = false; - } - - mds->queue_waiters(finished); - // abort // rollback_bl may be empty if we froze the inode but had to provide an expanded // witness list from the master, and they failed before we tried prep again. @@ -6813,11 +6800,20 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, if (mdcache->is_ambiguous_slave_update(mdr->reqid, mdr->slave_to_mds)) { mdcache->remove_ambiguous_slave_update(mdr->reqid, mdr->slave_to_mds); // rollback but preserve the slave request - do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, NULL); + do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr, false); } else - do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr); + do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr, true); } else { dout(10) << " rollback_bl empty, not rollback back rename (master failed after getting extra witnesses?)" << dendl; + // singleauth + if (mdr->more()->is_ambiguous_auth) { + if (srcdn->is_auth()) + mdr->more()->rename_inode->unfreeze_inode(finished); + + mdr->more()->rename_inode->clear_ambiguous_auth(finished); + mdr->more()->is_ambiguous_auth = false; + } + mds->queue_waiters(finished); mds->mdcache->request_finish(mdr); } } @@ -6862,15 +6858,20 @@ struct C_MDS_LoggedRenameRollback : public Context { version_t srcdnpv; CDentry *destdn; CDentry *straydn; + bool finish_mdr; C_MDS_LoggedRenameRollback(Server *s, Mutation *m, MDRequest *r, - CDentry *sd, version_t pv, CDentry *dd, CDentry *st) : - server(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd), straydn(st) {} + CDentry *sd, version_t pv, CDentry *dd, + CDentry *st, bool f) : + server(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd), + straydn(st), finish_mdr(f) {} void finish(int r) { - server->_rename_rollback_finish(mut, mdr, srcdn, srcdnpv, destdn, straydn); + server->_rename_rollback_finish(mut, mdr, srcdn, srcdnpv, + destdn, straydn, finish_mdr); } }; -void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) +void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, + bool finish_mdr) { rename_rollback rollback; bufferlist::iterator p = rbl.begin(); @@ -7000,7 +7001,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) } if (straydn) - destdn->push_projected_linkage(); + straydn->push_projected_linkage(); if (target) { inode_t *ti = NULL; @@ -7032,7 +7033,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) if (srcdn && (srcdn->authority().first == whoami || force_journal_src)) { le->commit.add_dir_context(srcdir); if (rollback.orig_src.ino) - le->commit.add_primary_dentry(srcdn, true); + le->commit.add_primary_dentry(srcdn, 0, true); else le->commit.add_remote_dentry(srcdn, true); } @@ -7040,7 +7041,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) if (force_journal_dest) { assert(rollback.orig_dest.ino); le->commit.add_dir_context(destdir); - le->commit.add_primary_dentry(destdn, true); + le->commit.add_primary_dentry(destdn, 0, true); } // slave: no need to journal straydn @@ -7048,7 +7049,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) if (target && target->authority().first == whoami) { assert(rollback.orig_dest.remote_ino); le->commit.add_dir_context(target->get_projected_parent_dir()); - le->commit.add_primary_dentry(target->get_projected_parent_dn(), true, target); + le->commit.add_primary_dentry(target->get_projected_parent_dn(), target, true); } if (force_journal_dest) { @@ -7069,15 +7070,16 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr) mdcache->project_subtree_rename(in, destdir, srcdir); } - mdlog->submit_entry(le, new C_MDS_LoggedRenameRollback(this, mut, mdr, - srcdn, srcdnpv, destdn, straydn)); + mdlog->submit_entry(le, new C_MDS_LoggedRenameRollback(this, mut, mdr, srcdn, srcdnpv, + destdn, straydn, finish_mdr)); mdlog->flush(); } void Server::_rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn, - version_t srcdnpv, CDentry *destdn, CDentry *straydn) + version_t srcdnpv, CDentry *destdn, + CDentry *straydn, bool finish_mdr) { - dout(10) << "_rename_rollback_finish" << mut->reqid << dendl; + dout(10) << "_rename_rollback_finish " << mut->reqid << dendl; if (straydn) { straydn->get_dir()->unlink_inode(straydn); @@ -7123,8 +7125,19 @@ void Server::_rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *src mdcache->try_trim_non_auth_subtree(root); } - if (mdr) - mds->mdcache->request_finish(mdr); + if (mdr) { + list<Context*> finished; + if (mdr->more()->is_ambiguous_auth) { + if (srcdn->is_auth()) + mdr->more()->rename_inode->unfreeze_inode(finished); + + mdr->more()->rename_inode->clear_ambiguous_auth(finished); + mdr->more()->is_ambiguous_auth = false; + } + mds->queue_waiters(finished); + if (finish_mdr) + mds->mdcache->request_finish(mdr); + } mds->mdcache->finish_rollback(mut->reqid); diff --git a/src/mds/Server.h b/src/mds/Server.h index 15c8077c984..35a405b58eb 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -120,6 +120,7 @@ public: CDir *validate_dentry_dir(MDRequest *mdr, CInode *diri, const string& dname); CDir *traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, filepath refpath); CDentry *prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dname, bool okexist=false); + CDentry *prepare_stray_dentry(MDRequest *mdr, CInode *in); CInode* prepare_new_inode(MDRequest *mdr, CDir *dir, inodeno_t useino, unsigned mode, ceph_file_layout *layout=NULL); void journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob); @@ -206,7 +207,7 @@ public: void _unlink_local_finish(MDRequest *mdr, CDentry *dn, CDentry *straydn, version_t); - void _rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn); + bool _rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn); void handle_slave_rmdir_prep(MDRequest *mdr); void _logged_slave_rmdir(MDRequest *mdr, CDentry *srcdn, CDentry *straydn); void _commit_slave_rmdir(MDRequest *mdr, int r); @@ -226,7 +227,7 @@ public: void _rmsnap_finish(MDRequest *mdr, CInode *diri, snapid_t snapid); // helpers - void _rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse, + bool _rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse, CDentry *srcdn, CDentry *destdn, CDentry *straydn); version_t _rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferlist *client_map_bl); bool _need_force_journal(CInode *diri, bool empty); @@ -243,9 +244,9 @@ public: void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m); void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn); - void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr); - void _rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn, - version_t srcdnpv, CDentry *destdn, CDentry *staydn); + void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, bool finish_mdr=false); + void _rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn, version_t srcdnpv, + CDentry *destdn, CDentry *staydn, bool finish_mdr); }; diff --git a/src/mds/events/EMetaBlob.h b/src/mds/events/EMetaBlob.h index 439bd78bc8f..b91303a1328 100644 --- a/src/mds/events/EMetaBlob.h +++ b/src/mds/events/EMetaBlob.h @@ -59,6 +59,9 @@ public: * the struct_v in the encode function! */ struct fullbit { + static const int STATE_DIRTY = (1<<0); + static const int STATE_DIRTYPARENT = (1<<1); + static const int STATE_DIRTYPOOL = (1<<2); string dn; // dentry snapid_t dnfirst, dnlast; version_t dnv; @@ -67,7 +70,7 @@ public: map<string,bufferptr> xattrs; string symlink; bufferlist snapbl; - bool dirty; + __u8 state; typedef map<snapid_t, old_inode_t> old_inodes_t; old_inodes_t old_inodes; @@ -79,7 +82,7 @@ public: fullbit(const string& d, snapid_t df, snapid_t dl, version_t v, const inode_t& i, const fragtree_t &dft, const map<string,bufferptr> &xa, const string& sym, - const bufferlist &sbl, bool dr, + const bufferlist &sbl, __u8 st, const old_inodes_t *oi = NULL) : //dn(d), dnfirst(df), dnlast(dl), dnv(v), //inode(i), dirfragtree(dft), xattrs(xa), symlink(sym), snapbl(sbl), dirty(dr) @@ -97,7 +100,7 @@ public: ::encode(dft, _enc); ::encode(sbl, _enc); } - ::encode(dr, _enc); + ::encode(st, _enc); ::encode(oi ? true : false, _enc); if (oi) ::encode(*oi, _enc); @@ -114,11 +117,28 @@ public: static void generate_test_instances(list<EMetaBlob::fullbit*>& ls); void update_inode(MDS *mds, CInode *in); + bool is_dirty() const { return (state & STATE_DIRTY); } + bool is_dirty_parent() const { return (state & STATE_DIRTYPARENT); } + bool is_dirty_pool() const { return (state & STATE_DIRTYPOOL); } void print(ostream& out) const { out << " fullbit dn " << dn << " [" << dnfirst << "," << dnlast << "] dnv " << dnv << " inode " << inode.ino - << " dirty=" << dirty << std::endl; + << " state=" << state << std::endl; + } + string state_string() const { + string state_string; + bool marked_already = false; + if (is_dirty()) { + state_string.append("dirty"); + marked_already = true; + } + if (is_dirty_parent()) { + state_string.append(marked_already ? "+dirty_parent" : "dirty_parent"); + if (is_dirty_pool()) + state_string.append("+dirty_pool"); + } + return state_string; } }; WRITE_CLASS_ENCODER(fullbit) @@ -318,9 +338,6 @@ private: // idempotent op(s) list<pair<metareqid_t,uint64_t> > client_reqs; - int64_t old_pool; - bool update_bt; - public: void encode(bufferlist& bl) const; void decode(bufferlist::iterator& bl); @@ -414,11 +431,15 @@ private: } // return remote pointer to to-be-journaled inode - void add_primary_dentry(CDentry *dn, bool dirty, CInode *in=0) { - add_primary_dentry(add_dir(dn->get_dir(), false), - dn, dirty, in); + void add_primary_dentry(CDentry *dn, CInode *in, bool dirty, + bool dirty_parent=false, bool dirty_pool=false) { + __u8 state = 0; + if (dirty) state |= fullbit::STATE_DIRTY; + if (dirty_parent) state |= fullbit::STATE_DIRTYPARENT; + if (dirty_pool) state |= fullbit::STATE_DIRTYPOOL; + add_primary_dentry(add_dir(dn->get_dir(), false), dn, in, state); } - void add_primary_dentry(dirlump& lump, CDentry *dn, bool dirty, CInode *in=0) { + void add_primary_dentry(dirlump& lump, CDentry *dn, CInode *in, __u8 state) { if (!in) in = dn->get_projected_linkage()->get_inode(); @@ -439,16 +460,26 @@ private: *pi, in->dirfragtree, *in->get_projected_xattrs(), in->symlink, snapbl, - dirty, + state, &in->old_inodes))); } // convenience: primary or remote? figure it out. void add_dentry(CDentry *dn, bool dirty) { dirlump& lump = add_dir(dn->get_dir(), false); - add_dentry(lump, dn, dirty); + add_dentry(lump, dn, dirty, false, false); + } + void add_import_dentry(CDentry *dn) { + bool dirty_parent = false; + bool dirty_pool = false; + if (dn->get_linkage()->is_primary()) { + dirty_parent = dn->get_linkage()->get_inode()->is_dirty_parent(); + dirty_pool = dn->get_linkage()->get_inode()->is_dirty_pool(); + } + dirlump& lump = add_dir(dn->get_dir(), false); + add_dentry(lump, dn, dn->is_dirty(), dirty_parent, dirty_pool); } - void add_dentry(dirlump& lump, CDentry *dn, bool dirty) { + void add_dentry(dirlump& lump, CDentry *dn, bool dirty, bool dirty_parent, bool dirty_pool) { // primary or remote if (dn->get_projected_linkage()->is_remote()) { add_remote_dentry(dn, dirty); @@ -458,7 +489,7 @@ private: return; } assert(dn->get_projected_linkage()->is_primary()); - add_primary_dentry(dn, dirty); + add_primary_dentry(dn, 0, dirty, dirty_parent, dirty_pool); } void add_root(bool dirty, CInode *in, inode_t *pi=0, fragtree_t *pdft=0, bufferlist *psnapbl=0, @@ -484,9 +515,9 @@ private: } string empty; - roots.push_back(std::tr1::shared_ptr<fullbit>(new fullbit(empty, in->first, in->last, - 0, *pi, *pdft, *px, in->symlink, - snapbl, dirty, + roots.push_back(std::tr1::shared_ptr<fullbit>(new fullbit(empty, in->first, in->last, 0, *pi, + *pdft, *px, in->symlink, snapbl, + dirty ? fullbit::STATE_DIRTY : 0, &in->old_inodes))); } @@ -522,13 +553,6 @@ private: static const int TO_ROOT = 1; void add_dir_context(CDir *dir, int mode = TO_AUTH_SUBTREE_ROOT); - - void add_old_pool(int64_t pool) { - old_pool = pool; - } - void update_backtrace() { - update_bt = true; - } void print(ostream& out) const { out << "[metablob"; diff --git a/src/mds/events/EOpen.h b/src/mds/events/EOpen.h index 792540ef5da..1267cf0af72 100644 --- a/src/mds/events/EOpen.h +++ b/src/mds/events/EOpen.h @@ -34,7 +34,7 @@ public: void add_clean_inode(CInode *in) { if (!in->is_base()) { metablob.add_dir_context(in->get_projected_parent_dn()->get_dir()); - metablob.add_primary_dentry(in->get_projected_parent_dn(), false, 0); + metablob.add_primary_dentry(in->get_projected_parent_dn(), 0, false); inos.push_back(in->ino()); } } diff --git a/src/mds/inode_backtrace.h b/src/mds/inode_backtrace.h index d223f724a99..2d80ae3efad 100644 --- a/src/mds/inode_backtrace.h +++ b/src/mds/inode_backtrace.h @@ -35,6 +35,10 @@ struct inode_backpointer_t { }; WRITE_CLASS_ENCODER(inode_backpointer_t) +inline bool operator==(const inode_backpointer_t& l, const inode_backpointer_t& r) { + return l.dirino == r.dirino && l.version == r.version && l.dname == r.dname; +} + inline ostream& operator<<(ostream& out, const inode_backpointer_t& ib) { return out << "<" << ib.dirino << "/" << ib.dname << " v" << ib.version << ">"; } diff --git a/src/mds/journal.cc b/src/mds/journal.cc index b8139e3a05b..9eb0e73feba 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -185,9 +185,16 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld) assert(g_conf->mds_kill_journal_expire_at != 3); // backtraces to be stored/updated - for (elist<BacktraceInfo*>::iterator p = update_backtraces.begin(); !p.end(); ++p) { - BacktraceInfo *btinfo = *p; - store_backtrace_update(mds, btinfo, gather_bld.new_sub()); + for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) { + CInode *in = *p; + assert(in->is_auth()); + if (in->can_auth_pin()) { + dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl; + in->store_backtrace(gather_bld.new_sub()); + } else { + dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl; + in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub()); + } } assert(g_conf->mds_kill_journal_expire_at != 4); @@ -267,101 +274,6 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld) } } -// ---------------------------- -// backtrace handling - -// BacktraceInfo is used for keeping the -// current state of the backtrace to be stored later on -// logsegment expire. Constructing a BacktraceInfo -// automatically puts it on the LogSegment list that is passed in, -// after building the backtrace based on the current state of the inode. We -// construct the backtrace here to avoid keeping a ref to the inode. -BacktraceInfo::BacktraceInfo( - int64_t l, CInode *i, LogSegment *ls, int64_t p) : - location(l), pool(p) { - - // on setlayout cases, forward pointers mean - // pool != location, but for all others it does - if (pool == -1) pool = location; - - bt.pool = pool; - i->build_backtrace(l, &bt); - ls->update_backtraces.push_back(&item_logseg); -} - -// When the info_t is destroyed, it just needs to remove itself -// from the LogSegment list -BacktraceInfo::~BacktraceInfo() { - item_logseg.remove_myself(); -} - -// Queue a backtrace for later -void LogSegment::queue_backtrace_update(CInode *inode, int64_t location, int64_t pool) { - // allocating a pointer here and not setting it to anything - // might look strange, but the constructor adds itself to the backtraces - // list of this LogSegment, which is how we keep track of it - new BacktraceInfo(location, inode, this, pool); -} - -void LogSegment::remove_pending_backtraces(inodeno_t ino, int64_t pool) { - elist<BacktraceInfo*>::iterator i = update_backtraces.begin(); - while(!i.end()) { - ++i; - if((*i)->bt.ino == ino && (*i)->location == pool) { - delete (*i); - } - } -} - -unsigned LogSegment::encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info) -{ - bufferlist parent; - ::encode(info->bt, parent); - m.setxattr("parent", parent); - return parent.length(); -} - -struct C_LogSegment_StoredBacktrace : public Context { - LogSegment *ls; - BacktraceInfo *info; - Context *fin; - C_LogSegment_StoredBacktrace(LogSegment *l, BacktraceInfo *c, - Context *f) : ls(l), info(c), fin(f) {} - void finish(int r) { - ls->_stored_backtrace(info, fin); - } -}; - -void LogSegment::store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin) -{ - ObjectOperation m; - // prev_pool will be the target pool on create,mkdir,etc. - encode_parent_mutation(m, info); - - // write it. - SnapContext snapc; - - object_t oid = CInode::get_object_name(info->bt.ino, frag_t(), ""); - - dout(10) << "store_parent for oid " << oid << " location " << info->location << " pool " << info->pool << dendl; - - // store the backtrace in the specified pool - object_locator_t oloc(info->location); - - mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, - NULL, new C_LogSegment_StoredBacktrace(this, info, fin) ); - -} - -void LogSegment::_stored_backtrace(BacktraceInfo *info, Context *fin) -{ - delete info; - if (fin) { - fin->finish(0); - delete fin; - } -} - #undef DOUT_COND #define DOUT_COND(cct, l) (l<=cct->_conf->debug_mds || l <= cct->_conf->debug_mds_log) @@ -372,8 +284,6 @@ void LogSegment::_stored_backtrace(BacktraceInfo *info, Context *fin) EMetaBlob::EMetaBlob(MDLog *mdlog) : opened_ino(0), renamed_dirino(0), inotablev(0), sessionmapv(0), allocated_ino(0), - old_pool(-1), - update_bt(false), last_subtree_map(mdlog ? mdlog->get_last_segment_offset() : 0), my_offset(mdlog ? mdlog->get_write_pos() : 0) //, _segment(0) { } @@ -406,7 +316,7 @@ void EMetaBlob::add_dir_context(CDir *dir, int mode) if (mode == TO_AUTH_SUBTREE_ROOT) { // subtree root? - if (dir->is_subtree_root()) { + if (dir->is_subtree_root() && !dir->state_test(CDir::STATE_EXPORTBOUND)) { if (dir->is_auth() && !dir->is_ambiguous_auth()) { // it's an auth subtree, we don't need maybe (if any), and we're done. dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached unambig auth subtree, don't need " << maybe @@ -485,10 +395,10 @@ void EMetaBlob::update_segment(LogSegment *ls) // EMetaBlob::fullbit void EMetaBlob::fullbit::encode(bufferlist& bl) const { - ENCODE_START(5, 5, bl); + ENCODE_START(6, 5, bl); if (!_enc.length()) { fullbit copy(dn, dnfirst, dnlast, dnv, inode, dirfragtree, xattrs, symlink, - snapbl, dirty, &old_inodes); + snapbl, state, &old_inodes); bl.append(copy._enc); } else { bl.append(_enc); @@ -497,7 +407,7 @@ void EMetaBlob::fullbit::encode(bufferlist& bl) const { } void EMetaBlob::fullbit::decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(5, 5, 5, bl); + DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl); ::decode(dn, bl); ::decode(dnfirst, bl); ::decode(dnlast, bl); @@ -519,7 +429,14 @@ void EMetaBlob::fullbit::decode(bufferlist::iterator &bl) { } } } - ::decode(dirty, bl); + if (struct_v >= 6) { + ::decode(state, bl); + } else { + bool dirty; + ::decode(dirty, bl); + state = dirty ? EMetaBlob::fullbit::STATE_DIRTY : 0; + } + if (struct_v >= 3) { bool old_inodes_present; ::decode(old_inodes_present, bl); @@ -571,7 +488,7 @@ void EMetaBlob::fullbit::dump(Formatter *f) const f->close_section(); // file layout policy } } - f->dump_string("dirty", dirty ? "true" : "false"); + f->dump_string("state", state_string()); if (!old_inodes.empty()) { f->open_array_section("old inodes"); for (old_inodes_t::const_iterator iter = old_inodes.begin(); @@ -824,7 +741,7 @@ void EMetaBlob::dirlump::generate_test_instances(list<dirlump*>& ls) */ void EMetaBlob::encode(bufferlist& bl) const { - ENCODE_START(6, 5, bl); + ENCODE_START(7, 5, bl); ::encode(lump_order, bl); ::encode(lump_map, bl); ::encode(roots, bl); @@ -842,13 +759,18 @@ void EMetaBlob::encode(bufferlist& bl) const ::encode(client_reqs, bl); ::encode(renamed_dirino, bl); ::encode(renamed_dir_frags, bl); - ::encode(old_pool, bl); - ::encode(update_bt, bl); + { + // make MDS use v6 format happy + int64_t i = -1; + bool b = false; + ::encode(i, bl); + ::encode(b, bl); + } ENCODE_FINISH(bl); } void EMetaBlob::decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl); + DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl); ::decode(lump_order, bl); ::decode(lump_map, bl); if (struct_v >= 4) { @@ -887,8 +809,11 @@ void EMetaBlob::decode(bufferlist::iterator &bl) ::decode(renamed_dir_frags, bl); } if (struct_v >= 6) { - ::decode(old_pool, bl); - ::decode(update_bt, bl); + // ignore + int64_t i; + bool b; + ::decode(i, bl); + ::decode(b, bl); } DECODE_FINISH(bl); } @@ -1004,7 +929,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) if (isnew) mds->mdcache->add_inode(in); - if ((*p)->dirty) in->_mark_dirty(logseg); + if ((*p)->is_dirty()) in->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl; } @@ -1106,11 +1031,11 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) if (!dn) { dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast); dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(logseg); + if (p->is_dirty()) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *dn << dendl; } else { dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(logseg); + if (p->is_dirty()) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl; dn->first = p->dnfirst; assert(dn->last == p->dnlast); @@ -1135,7 +1060,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) if (unlinked.count(in)) linked.insert(in); dir->link_primary_inode(dn, in); - if (p->dirty) in->_mark_dirty(logseg); + if (p->is_dirty()) in->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *in << dendl; } else { if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) { @@ -1146,7 +1071,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) if (in->get_parent_dn() && in->inode.anchored != p->inode.anchored) in->get_parent_dn()->adjust_nested_anchors( (int)p->inode.anchored - (int)in->inode.anchored ); p->update_inode(mds, in); - if (p->dirty) in->_mark_dirty(logseg); + if (p->is_dirty()) in->_mark_dirty(logseg); if (dn->get_linkage()->get_inode() != in) { if (!dn->get_linkage()->is_null()) { // note: might be remote. as with stray reintegration. if (dn->get_linkage()->is_primary()) { @@ -1171,35 +1096,8 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) } assert(g_conf->mds_kill_journal_replay_at != 2); - - // store backtrace for allocated inos (create, mkdir, symlink, mknod) - if (allocated_ino || used_preallocated_ino) { - if (in->inode.is_dir()) { - logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool()); - } else { - logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool); - } - } - // handle change of pool with backtrace update - if (old_pool != -1 && old_pool != in->inode.layout.fl_pg_pool) { - // update backtrace on new data pool - logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool); - - // set forwarding pointer on old backtrace - logseg->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool); - } - // handle backtrace update if specified (used by rename) - if (update_bt) { - if (in->is_dir()) { - // replace previous backtrace on this inode with myself - logseg->remove_pending_backtraces(in->ino(), mds->mdsmap->get_metadata_pool()); - logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool()); - } else { - // remove all pending backtraces going to the same pool - logseg->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool); - logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool); - } - } + if (p->is_dirty_parent()) + in->_mark_dirty_parent(logseg, p->is_dirty_pool()); } // remote dentries @@ -1280,7 +1178,8 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) list<frag_t> leaves; renamed_diri->dirfragtree.get_leaves(leaves); for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p) { - CDir *dir = renamed_diri->get_or_open_dirfrag(mds->mdcache, *p); + CDir *dir = renamed_diri->get_dirfrag(*p); + assert(dir); // preserve subtree bound until slave commit if (dir->get_dir_auth() == CDIR_AUTH_UNDEF) slaveup->olddirs.insert(dir); diff --git a/src/mds/locks.c b/src/mds/locks.c index c7dd5bec0ee..90310874411 100644 --- a/src/mds/locks.c +++ b/src/mds/locks.c @@ -97,8 +97,8 @@ const struct sm_state_t filelock[LOCK_MAX] = { [LOCK_XSYN_SYNC] = { LOCK_SYNC, true, LOCK_LOCK, AUTH, 0, AUTH,0, 0, 0, 0, 0,CEPH_CAP_GCACHE,0,0 }, [LOCK_LOCK] = { 0, false, LOCK_LOCK, AUTH, 0, REQ, AUTH,0, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,0 }, - [LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, CEPH_CAP_GCACHE,0,0,CEPH_CAP_GCACHE }, - [LOCK_EXCL_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,CEPH_CAP_GCACHE }, + [LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, CEPH_CAP_GCACHE,0,0,0 }, + [LOCK_EXCL_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,0 }, [LOCK_MIX_LOCK] = { LOCK_LOCK, false, LOCK_MIX, AUTH, 0, REQ, 0, 0, 0, 0, 0,0,0,0 }, [LOCK_MIX_LOCK2] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, 0,0,0,0 }, diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc index b1ce640a539..6886786f27e 100644 --- a/src/mds/mdstypes.cc +++ b/src/mds/mdstypes.cc @@ -236,7 +236,7 @@ void inode_t::encode(bufferlist &bl) const ::encode(version, bl); ::encode(file_data_version, bl); ::encode(xattr_version, bl); - ::encode(last_renamed_version, bl); + ::encode(backtrace_version, bl); ::encode(old_pools, bl); ENCODE_FINISH(bl); @@ -291,7 +291,7 @@ void inode_t::decode(bufferlist::iterator &p) ::decode(file_data_version, p); ::decode(xattr_version, p); if (struct_v >= 2) - ::decode(last_renamed_version, p); + ::decode(backtrace_version, p); if (struct_v >= 7) ::decode(old_pools, p); @@ -357,7 +357,7 @@ void inode_t::dump(Formatter *f) const f->dump_unsigned("version", version); f->dump_unsigned("file_data_version", file_data_version); f->dump_unsigned("xattr_version", xattr_version); - f->dump_unsigned("last_renamed_version", last_renamed_version); + f->dump_unsigned("backtrace_version", backtrace_version); } void inode_t::generate_test_instances(list<inode_t*>& ls) diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index aa9d165b53d..5537407a75d 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -347,7 +347,7 @@ struct inode_t { version_t file_data_version; // auth only version_t xattr_version; - version_t last_renamed_version; // when i was last renamed + version_t backtrace_version; inode_t() : ino(0), rdev(0), mode(0), uid(0), gid(0), @@ -355,7 +355,7 @@ struct inode_t { size(0), truncate_seq(0), truncate_size(0), truncate_from(0), truncate_pending(0), time_warp_seq(0), - version(0), file_data_version(0), xattr_version(0), last_renamed_version(0) { + version(0), file_data_version(0), xattr_version(0), backtrace_version(0) { clear_layout(); memset(&dir_layout, 0, sizeof(dir_layout)); } @@ -425,7 +425,15 @@ struct inode_t { } } + bool is_backtrace_updated() { + return backtrace_version == version; + } + void update_backtrace() { + backtrace_version = version; + } + void add_old_pool(int64_t l) { + backtrace_version = version; old_pools.push_back(l); } diff --git a/src/messages/MMDSCacheRejoin.h b/src/messages/MMDSCacheRejoin.h index dc8a1afe114..3ae83553dad 100644 --- a/src/messages/MMDSCacheRejoin.h +++ b/src/messages/MMDSCacheRejoin.h @@ -167,9 +167,7 @@ class MMDSCacheRejoin : public Message { map<vinodeno_t, inode_strong> strong_inodes; // open - bufferlist cap_export_bl; map<inodeno_t,map<client_t, ceph_mds_cap_reconnect> > cap_exports; - map<inodeno_t,filepath> cap_export_paths; // full bufferlist inode_base; @@ -258,10 +256,6 @@ public: in->encode_lock_state(CEPH_LOCK_IDFT, inode_scatterlocks[in->ino()].dft); } - void copy_cap_exports(bufferlist &bl) { - cap_export_bl = bl; - } - // dirfrags void add_strong_dirfrag(dirfrag_t df, int n, int dr) { strong_dirfrags[df] = dirfrag_strong(n, dr); @@ -304,7 +298,7 @@ public: ::encode(frozen_authpin_inodes, payload); ::encode(xlocked_inodes, payload); ::encode(wrlocked_inodes, payload); - ::encode(cap_export_bl, payload); + ::encode(cap_exports, payload); ::encode(strong_dirfrags, payload); ::encode(dirfrag_bases, payload); ::encode(weak, payload); @@ -325,12 +319,7 @@ public: ::decode(frozen_authpin_inodes, p); ::decode(xlocked_inodes, p); ::decode(wrlocked_inodes, p); - ::decode(cap_export_bl, p); - if (cap_export_bl.length()) { - bufferlist::iterator q = cap_export_bl.begin(); - ::decode(cap_exports, q); - ::decode(cap_export_paths, q); - } + ::decode(cap_exports, p); ::decode(strong_dirfrags, p); ::decode(dirfrag_bases, p); ::decode(weak, p); diff --git a/src/messages/MMDSOpenIno.h b/src/messages/MMDSOpenIno.h new file mode 100644 index 00000000000..0918e87e0d9 --- /dev/null +++ b/src/messages/MMDSOpenIno.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MDSOPENINO_H +#define CEPH_MDSOPENINO_H + +#include "msg/Message.h" + +struct MMDSOpenIno : public Message { + inodeno_t ino; + vector<inode_backpointer_t> ancestors; + + MMDSOpenIno() : Message(MSG_MDS_OPENINO) {} + MMDSOpenIno(tid_t t, inodeno_t i, vector<inode_backpointer_t>& a) : + Message(MSG_MDS_OPENINO), ino(i), ancestors(a) { + header.tid = t; + } + + const char *get_type_name() const { return "openino"; } + void print(ostream &out) const { + out << "openino(" << header.tid << " " << ino << " " << ancestors << ")"; + } + + void encode_payload(uint64_t features) { + ::encode(ino, payload); + ::encode(ancestors, payload); + } + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(ino, p); + ::decode(ancestors, p); + } +}; + +#endif diff --git a/src/messages/MMDSOpenInoReply.h b/src/messages/MMDSOpenInoReply.h new file mode 100644 index 00000000000..245027f11f3 --- /dev/null +++ b/src/messages/MMDSOpenInoReply.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MDSOPENINOREPLY_H +#define CEPH_MDSOPENINOREPLY_H + +#include "msg/Message.h" + +struct MMDSOpenInoReply : public Message { + inodeno_t ino; + vector<inode_backpointer_t> ancestors; + int32_t hint; + int32_t error; + + MMDSOpenInoReply() : Message(MSG_MDS_OPENINOREPLY) {} + MMDSOpenInoReply(tid_t t, inodeno_t i, int h=-1, int e=0) : + Message(MSG_MDS_OPENINOREPLY), ino(i), hint(h), error(e) { + header.tid = t; + } + + const char *get_type_name() const { return "openinoreply"; } + void print(ostream &out) const { + out << "openinoreply(" << header.tid << " " + << ino << " " << hint << " " << ancestors << ")"; + } + + void encode_payload(uint64_t features) { + ::encode(ino, payload); + ::encode(ancestors, payload); + ::encode(hint, payload); + ::encode(error, payload); + } + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(ino, p); + ::decode(ancestors, p); + ::decode(hint, p); + ::decode(error, p); + } +}; + +#endif diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 39e3fe9bbe0..338b5195af2 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -926,7 +926,8 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi) << dendl; // already pending failure? - if (pending_inc.new_state[target_osd] & CEPH_OSD_UP) { + if (pending_inc.new_state.count(target_osd) && + pending_inc.new_state[target_osd] & CEPH_OSD_UP) { dout(10) << " already pending failure" << dendl; return true; } @@ -3174,6 +3175,8 @@ bool OSDMonitor::prepare_command(MMonCommand *m) done: dout(10) << " creating osd." << i << dendl; + if (pending_inc.new_state.count(i) == 0) + pending_inc.new_state[i] = 0; pending_inc.new_state[i] |= CEPH_OSD_EXISTS | CEPH_OSD_NEW; if (!uuid.is_zero()) pending_inc.new_uuid[i] = uuid; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 77be03a590b..a6889d39fdf 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -112,6 +112,8 @@ using namespace std; #include "messages/MMDSCacheRejoin.h" #include "messages/MMDSFindIno.h" #include "messages/MMDSFindInoReply.h" +#include "messages/MMDSOpenIno.h" +#include "messages/MMDSOpenInoReply.h" #include "messages/MDirUpdate.h" #include "messages/MDiscover.h" @@ -533,6 +535,13 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot m = new MMDSFindInoReply; break; + case MSG_MDS_OPENINO: + m = new MMDSOpenIno; + break; + case MSG_MDS_OPENINOREPLY: + m = new MMDSOpenInoReply; + break; + case MSG_MDS_FRAGMENTNOTIFY: m = new MMDSFragmentNotify; break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 18a64c1d02e..aca91184141 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -124,6 +124,8 @@ #define MSG_MDS_DENTRYLINK 0x20c #define MSG_MDS_FINDINO 0x20d #define MSG_MDS_FINDINOREPLY 0x20e +#define MSG_MDS_OPENINO 0x20f +#define MSG_MDS_OPENINOREPLY 0x210 #define MSG_MDS_LOCK 0x300 #define MSG_MDS_INODEFILECAPS 0x301 diff --git a/src/os/FDCache.h b/src/os/FDCache.h new file mode 100644 index 00000000000..cf07f860aa5 --- /dev/null +++ b/src/os/FDCache.h @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_FDCACHE_H +#define CEPH_FDCACHE_H + +#include <memory> +#include <errno.h> +#include <cstdio> +#include "hobject.h" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/shared_cache.hpp" +#include "include/compat.h" + +/** + * FD Cache + */ +class FDCache : public md_config_obs_t { + /** + * FD + * + * Wrapper for an fd. Destructor closes the fd. + */ + class FD { + public: + const int fd; + FD(int _fd) : fd(_fd) { + assert(_fd >= 0); + } + int operator*() const { + return fd; + } + ~FD() { + TEMP_FAILURE_RETRY(::close(fd)); + } + }; + + SharedLRU<hobject_t, FD> registry; + CephContext *cct; +public: + FDCache(CephContext *cct) : cct(cct) { + assert(cct); + cct->_conf->add_observer(this); + registry.set_size(cct->_conf->filestore_fd_cache_size); + } + ~FDCache() { + cct->_conf->remove_observer(this); + } + typedef std::tr1::shared_ptr<FD> FDRef; + + FDRef lookup(const hobject_t &hoid) { + return registry.lookup(hoid); + } + + FDRef add(const hobject_t &hoid, int fd) { + return registry.add(hoid, new FD(fd)); + } + + /// clear cached fd for hoid, subsequent lookups will get an empty FD + void clear(const hobject_t &hoid) { + registry.clear(hoid); + assert(!registry.lookup(hoid)); + } + + /// md_config_obs_t + const char** get_tracked_conf_keys() const { + static const char* KEYS[] = { + "filestore_fd_cache_size", + NULL + }; + return KEYS; + } + void handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed) { + if (changed.count("filestore_fd_cache_size")) { + registry.set_size(conf->filestore_fd_cache_size); + } + } + +}; +typedef FDCache::FDRef FDRef; + +#endif diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index b32f2875f71..dca8a1bbfea 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -196,10 +196,22 @@ int FileStore::lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf) return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, +int FileStore::lfn_open(coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, IndexedPath *path, Index *index) { + assert(outfd); + int flags = O_RDWR; + if (create) + flags |= O_CREAT; + Mutex::Locker l(fdcache_lock); + *outfd = fdcache.lookup(oid); + if (*outfd) { + return 0; + } Index index2; IndexedPath path2; if (!path) @@ -224,16 +236,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } - r = ::open((*path)->path(), flags, mode); + r = ::open((*path)->path(), flags, 0644); if (r < 0) { r = -errno; dout(10) << "error opening file " << (*path)->path() << " with flags=" - << flags << " and mode=" << mode << ": " << cpp_strerror(-r) << dendl; + << flags << ": " << cpp_strerror(-r) << dendl; goto fail; } fd = r; - if ((flags & O_CREAT) && (!exist)) { + if (create && (!exist)) { r = (*index)->created(oid, (*path)->path()); if (r < 0) { TEMP_FAILURE_RETRY(::close(fd)); @@ -242,31 +254,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } } - return fd; + *outfd = fdcache.add(oid, fd); + return 0; fail: assert(!m_filestore_fail_eio || r != -EIO); return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, IndexedPath *path) +void FileStore::lfn_close(FDRef fd) { - return lfn_open(cid, oid, flags, mode, path, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode) -{ - return lfn_open(cid, oid, flags, mode, 0, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags) -{ - return lfn_open(cid, oid, flags, 0); -} - -void FileStore::lfn_close(int fd) -{ - TEMP_FAILURE_RETRY(::close(fd)); } int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) @@ -324,6 +321,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos) { + Mutex::Locker l(fdcache_lock); Index index; int r = get_index(cid, &index); if (r < 0) @@ -355,6 +353,8 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, if (g_conf->filestore_debug_inject_read_err) { debug_obj_on_delete(o); } + wbthrottle.clear_object(o); // should be only non-cache ref + fdcache.clear(o); } else { /* Ensure that replay of this op doesn't result in the object_map * going away. @@ -387,6 +387,9 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha sync_entry_timeo_lock("sync_entry_timeo_lock"), timer(g_ceph_context, sync_entry_timeo_lock), stop(false), sync_thread(this), + fdcache_lock("fdcache_lock"), + fdcache(g_ceph_context), + wbthrottle(g_ceph_context), default_osr("default"), op_queue_len(0), op_queue_bytes(0), op_throttle_lock("FileStore::op_throttle_lock"), @@ -394,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"), op_wq(this, g_conf->filestore_op_thread_timeout, g_conf->filestore_op_thread_suicide_timeout, &op_tp), - flusher_queue_len(0), flusher_thread(this), logger(NULL), read_error_lock("FileStore::read_error_lock"), m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range), m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ), m_filestore_commit_timeout(g_conf->filestore_commit_timeout), m_filestore_fiemap(g_conf->filestore_fiemap), - m_filestore_flusher (g_conf->filestore_flusher ), m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data), m_filestore_journal_parallel(g_conf->filestore_journal_parallel ), m_filestore_journal_trailing(g_conf->filestore_journal_trailing), m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead), m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold), - m_filestore_sync_flush(g_conf->filestore_sync_flush), - m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds), - m_filestore_flush_min(g_conf->filestore_flush_min), m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval), m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval), m_filestore_fail_eio(g_conf->filestore_fail_eio), @@ -1067,6 +1065,7 @@ int FileStore::_detect_fs() #if defined(__linux__) if (st.f_type == BTRFS_SUPER_MAGIC) { dout(0) << "mount detected btrfs" << dendl; + wbthrottle.set_fs(WBThrottle::BTRFS); btrfs = true; btrfs_stable_commits = btrfs && m_filestore_btrfs_snap; @@ -1778,7 +1777,6 @@ int FileStore::mount() journal_start(); op_tp.start(); - flusher_thread.create(); op_finisher.start(); ondisk_finisher.start(); @@ -1816,11 +1814,9 @@ int FileStore::umount() lock.Lock(); stop = true; sync_cond.Signal(); - flusher_cond.Signal(); lock.Unlock(); sync_thread.join(); op_tp.stop(); - flusher_thread.join(); journal_stop(); @@ -1968,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o) void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) { + wbthrottle.throttle(); // inject a stall? if (g_conf->filestore_inject_stall) { int orig = g_conf->filestore_inject_stall; @@ -2263,12 +2260,13 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos if (!replaying || btrfs_stable_commits) return 1; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl; return 1; // if file does not exist, there is no guard, and we can replay. } - int ret = _check_replay_guard(fd, spos); + int ret = _check_replay_guard(**fd, spos); lfn_close(fd); return ret; } @@ -2762,22 +2760,24 @@ int FileStore::read( dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " << cpp_strerror(fd) << dendl; - return fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { + dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " + << cpp_strerror(r) << dendl; + return r; } if (len == 0) { struct stat st; memset(&st, 0, sizeof(struct stat)); - int r = ::fstat(fd, &st); + int r = ::fstat(**fd, &st); assert(r == 0); len = st.st_size; } bufferptr bptr(len); // prealloc space for entire read - got = safe_pread(fd, bptr.c_str(), len, offset); + got = safe_pread(**fd, bptr.c_str(), len, offset); if (got < 0) { dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl; lfn_close(fd); @@ -2815,15 +2815,14 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int r; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - r = fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl; } else { uint64_t i; - r = do_fiemap(fd, offset, len, &fiemap); + r = do_fiemap(**fd, offset, len, &fiemap); if (r < 0) goto done; @@ -2865,10 +2864,10 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, } done: - if (fd >= 0) + if (r >= 0) { lfn_close(fd); - if (r >= 0) ::encode(exomap, bl); + } dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << exomap.size() << " " << exomap << dendl; free(fiemap); @@ -2899,14 +2898,13 @@ int FileStore::_touch(coll_t cid, const hobject_t& oid) { dout(15) << "touch " << cid << "/" << oid << dendl; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - int r; - if (fd >= 0) { + FDRef fd; + int r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + return r; + } else { lfn_close(fd); - r = 0; - } else - r = fd; + } dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl; return r; } @@ -2920,17 +2918,17 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, int64_t actual; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - if (fd < 0) { - r = fd; - dout(0) << "write couldn't open " << cid << "/" << oid << " flags " << flags << ": " + FDRef fd; + r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + dout(0) << "write couldn't open " << cid << "/" + << oid << ": " << cpp_strerror(r) << dendl; goto out; } // seek - actual = ::lseek64(fd, offset, SEEK_SET); + actual = ::lseek64(**fd, offset, SEEK_SET); if (actual < 0) { r = -errno; dout(0) << "write lseek64 to " << offset << " failed: " << cpp_strerror(r) << dendl; @@ -2945,43 +2943,13 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, } // write - r = bl.write_fd(fd); + r = bl.write_fd(**fd); if (r == 0) r = bl.length(); // flush? - { - bool should_flush = (ssize_t)len >= m_filestore_flush_min; - bool local_flush = false; -#ifdef HAVE_SYNC_FILE_RANGE - bool async_done = false; - if (!should_flush || - !m_filestore_flusher || - !(async_done = queue_flusher(fd, offset, len, replica))) { - if (should_flush && m_filestore_sync_flush) { - ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE); - local_flush = true; - } - } - //Both lfn_close() and possible posix_fadvise() done by flusher - if (async_done) fd = -1; -#else - // no sync_file_range; (maybe) flush inline and close. - if (should_flush && m_filestore_sync_flush) { - ::fdatasync(fd); - local_flush = true; - } -#endif - if (local_flush && replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } - if (fd >= 0) lfn_close(fd); + wbthrottle.queue_wb(fd, oid, offset, len, replica); + lfn_close(fd); out: dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl; @@ -2996,14 +2964,14 @@ int FileStore::_zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t l #ifdef CEPH_HAVE_FALLOCATE # if !defined(DARWIN) && !defined(__FreeBSD__) // first try to punch a hole. - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - ret = -errno; + FDRef fd; + ret = lfn_open(cid, oid, false, &fd); + if (ret < 0) { goto out; } // first try fallocate - ret = fallocate(fd, FALLOC_FL_PUNCH_HOLE, offset, len); + ret = fallocate(**fd, FALLOC_FL_PUNCH_HOLE, offset, len); if (ret < 0) ret = -errno; lfn_close(fd); @@ -3039,23 +3007,26 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo if (_check_replay_guard(cid, newoid, spos) < 0) return 0; - int o, n, r; + int r; + FDRef o, n; { Index index; IndexedPath from, to; - o = lfn_open(cid, oldoid, O_RDONLY, 0, &from, &index); - if (o < 0) { - r = o; + r = lfn_open(cid, oldoid, false, &o, &from, &index); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_TRUNC|O_WRONLY, 0644, &to, &index); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n, &to, &index); + if (r < 0) { + goto out; + } + r = ::ftruncate(**n, 0); + if (r < 0) { goto out; } struct stat st; - ::fstat(o, &st); - r = _do_clone_range(o, n, 0, st.st_size, 0); + ::fstat(**o, &st); + r = _do_clone_range(**o, **n, 0, st.st_size, 0); if (r < 0) { r = -errno; goto out3; @@ -3068,17 +3039,17 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo { map<string, bufferptr> aset; - r = _fgetattrs(o, aset, false); + r = _fgetattrs(**o, aset, false); if (r < 0) goto out3; - r = _fsetattrs(n, aset); + r = _fsetattrs(**n, aset); if (r < 0) goto out3; } // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); out3: lfn_close(n); @@ -3248,21 +3219,19 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return 0; int r; - int o, n; - o = lfn_open(cid, oldoid, O_RDONLY); - if (o < 0) { - r = o; + FDRef o, n; + r = lfn_open(cid, oldoid, false, &o); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_WRONLY, 0644); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n); + if (r < 0) { goto out; } - r = _do_clone_range(o, n, srcoff, len, dstoff); + r = _do_clone_range(**o, **n, srcoff, len, dstoff); // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); lfn_close(n); out: @@ -3273,89 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return r; } - -bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica) -{ - bool queued; - lock.Lock(); - if (flusher_queue_len < m_filestore_flusher_max_fds) { - flusher_queue.push_back(sync_epoch); - flusher_queue.push_back(fd); - flusher_queue.push_back(off); - flusher_queue.push_back(len); - flusher_queue.push_back(replica); - flusher_queue_len++; - flusher_cond.Signal(); - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << dendl; - queued = true; - } else { - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << " hit flusher_max_fds " << m_filestore_flusher_max_fds - << ", skipping async flush" << dendl; - queued = false; - } - lock.Unlock(); - return queued; -} - -void FileStore::flusher_entry() -{ - lock.Lock(); - dout(20) << "flusher_entry start" << dendl; - while (true) { - if (!flusher_queue.empty()) { -#ifdef HAVE_SYNC_FILE_RANGE - list<uint64_t> q; - q.swap(flusher_queue); - - int num = flusher_queue_len; // see how many we're taking, here - - lock.Unlock(); - while (!q.empty()) { - uint64_t ep = q.front(); - q.pop_front(); - int fd = q.front(); - q.pop_front(); - uint64_t off = q.front(); - q.pop_front(); - uint64_t len = q.front(); - q.pop_front(); - bool replica = q.front(); - q.pop_front(); - if (!stop && ep == sync_epoch) { - dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl; - ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE); - if (replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } else - dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep - << ", sync_epoch=" << sync_epoch << ")" << dendl; - lfn_close(fd); - } - lock.Lock(); - flusher_queue_len -= num; // they're definitely closed, forget -#endif - } else { - if (stop) - break; - dout(20) << "flusher_entry sleeping" << dendl; - flusher_cond.Wait(lock); - dout(20) << "flusher_entry awoke" << dendl; - } - } - dout(20) << "flusher_entry finish" << dendl; - lock.Unlock(); -} - class SyncEntryTimeout : public Context { public: SyncEntryTimeout(int commit_timeo) @@ -3548,6 +3434,7 @@ void FileStore::sync_entry() logger->tinc(l_os_commit_len, dur); apply_manager.commit_finish(); + wbthrottle.clear(); logger->set(l_os_committing, 0); @@ -3856,15 +3743,14 @@ bool FileStore::debug_mdata_eio(const hobject_t &oid) { int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, bufferptr &bp) { dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = _fgetattr(fd, n, bp); + r = _fgetattr(**fd, n, bp); lfn_close(fd); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { map<string, bufferlist> got; @@ -3903,13 +3789,12 @@ int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, buffe int FileStore::getattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset, bool user_only) { dout(15) << "getattrs " << cid << "/" << oid << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, user_only); + r = _fgetattrs(**fd, aset, user_only); lfn_close(fd); if (g_conf->filestore_xattr_use_omap) { set<string> omap_attrs; @@ -3967,14 +3852,13 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> set<string> omap_remove; map<string, bufferptr> inline_set; map<string, bufferptr> inline_to_set; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } if (g_conf->filestore_xattr_use_omap) { - r = _fgetattrs(fd, inline_set, false); + r = _fgetattrs(**fd, inline_set, false); assert(!m_filestore_fail_eio || r != -EIO); } dout(15) << "setattrs " << cid << "/" << oid << dendl; @@ -3988,7 +3872,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> if (p->second.length() > g_conf->filestore_max_inline_xattr_size) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4000,7 +3884,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> inline_set.size() >= g_conf->filestore_max_inline_xattrs) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4015,7 +3899,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> } - r = _fsetattrs(fd, inline_to_set); + r = _fsetattrs(**fd, inline_to_set); if (r < 0) goto out_close; @@ -4050,15 +3934,14 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name, const SequencerPosition &spos) { dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { Index index; r = get_index(cid, &index); @@ -4088,18 +3971,17 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid, dout(15) << "rmattrs " << cid << "/" << oid << dendl; map<string,bufferptr> aset; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, false); + r = _fgetattrs(**fd, aset, false); if (r >= 0) { for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) { char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) break; } @@ -4687,21 +4569,21 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // open guard on object so we don't any previous operations on the // new name that will modify the source inode. - int fd = lfn_open(oldcid, o, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(oldcid, o, 0, &fd); + if (r < 0) { // the source collection/object does not exist. If we are replaying, we // should be safe, so just return 0 and move on. assert(replaying); dout(10) << "collection_add " << c << "/" << o << " from " - << oldcid << "/" << o << " (dne, continue replay) " << dendl; + << oldcid << "/" << o << " (dne, continue replay) " << dendl; return 0; } - assert(fd >= 0); if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress" - _set_replay_guard(fd, spos, &o, true); + _set_replay_guard(**fd, spos, &o, true); } - int r = lfn_link(oldcid, c, o); + r = lfn_link(oldcid, c, o); if (replaying && !btrfs_stable_commits && r == -EEXIST) // crashed between link() and set_replay_guard() r = 0; @@ -4710,7 +4592,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // close guard on object so we don't do this again if (r == 0) { - _close_replay_guard(fd, spos); + _close_replay_guard(**fd, spos); } lfn_close(fd); @@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const "filestore_queue_max_bytes", "filestore_queue_committing_max_ops", "filestore_queue_committing_max_bytes", - "filestore_flusher", - "filestore_flusher_max_fds", - "filestore_sync_flush", "filestore_commit_timeout", "filestore_dump_file", "filestore_kill_at", @@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, changed.count("filestore_queue_max_bytes") || changed.count("filestore_queue_committing_max_ops") || changed.count("filestore_queue_committing_max_bytes") || - changed.count("filestore_flusher") || - changed.count("filestore_flusher_max_fds") || - changed.count("filestore_flush_min") || changed.count("filestore_kill_at") || changed.count("filestore_fail_eio") || changed.count("filestore_replica_fadvise")) { @@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes; m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops; m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes; - m_filestore_flusher = conf->filestore_flusher; - m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds; - m_filestore_flush_min = conf->filestore_flush_min; - m_filestore_sync_flush = conf->filestore_sync_flush; m_filestore_kill_at.set(conf->filestore_kill_at); m_filestore_fail_eio = conf->filestore_fail_eio; m_filestore_replica_fadvise = conf->filestore_replica_fadvise; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index d5ca2a4c237..78668dd92a4 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -40,6 +40,8 @@ using namespace __gnu_cxx; #include "IndexManager.h" #include "ObjectMap.h" #include "SequencerPosition.h" +#include "FDCache.h" +#include "WBThrottle.h" #include "include/uuid.h" @@ -198,6 +200,10 @@ private: friend ostream& operator<<(ostream& out, const OpSequencer& s); + Mutex fdcache_lock; + FDCache fdcache; + WBThrottle wbthrottle; + Sequencer default_osr; deque<OpSequencer*> op_queue; uint64_t op_queue_len, op_queue_bytes; @@ -250,37 +256,22 @@ private: void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend class C_JournaledAhead; - // flusher thread - Cond flusher_cond; - list<uint64_t> flusher_queue; - int flusher_queue_len; - void flusher_entry(); - struct FlusherThread : public Thread { - FileStore *fs; - FlusherThread(FileStore *f) : fs(f) {} - void *entry() { - fs->flusher_entry(); - return 0; - } - } flusher_thread; - bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica); - int open_journal(); - PerfCounters *logger; public: int lfn_find(coll_t cid, const hobject_t& oid, IndexedPath *path); int lfn_truncate(coll_t cid, const hobject_t& oid, off_t length); int lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path, Index *index); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode); - int lfn_open(coll_t cid, const hobject_t& oid, int flags); - void lfn_close(int fd); + int lfn_open( + coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, + IndexedPath *path = 0, + Index *index = 0); + void lfn_close(FDRef fd); int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ; int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos); @@ -510,15 +501,11 @@ private: bool m_filestore_btrfs_snap; float m_filestore_commit_timeout; bool m_filestore_fiemap; - bool m_filestore_flusher; bool m_filestore_fsync_flushes_journal_data; bool m_filestore_journal_parallel; bool m_filestore_journal_trailing; bool m_filestore_journal_writeahead; int m_filestore_fiemap_threshold; - bool m_filestore_sync_flush; - int m_filestore_flusher_max_fds; - int m_filestore_flush_min; double m_filestore_max_sync_interval; double m_filestore_min_sync_interval; bool m_filestore_fail_eio; diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc new file mode 100644 index 00000000000..4673488f833 --- /dev/null +++ b/src/os/WBThrottle.cc @@ -0,0 +1,239 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "os/WBThrottle.h" +#include "common/perf_counters.h" + +WBThrottle::WBThrottle(CephContext *cct) : + cur_ios(0), cur_size(0), + cct(cct), + logger(NULL), + stopping(false), + lock("WBThrottle::lock", false, true, false, cct), + fs(XFS) +{ + { + Mutex::Locker l(lock); + set_from_conf(); + } + assert(cct); + PerfCountersBuilder b( + cct, string("WBThrottle"), + l_wbthrottle_first, l_wbthrottle_last); + b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied"); + b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb"); + b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied"); + b.add_u64(l_wbthrottle_ios_wb, "ios_wb"); + b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied"); + b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i) + logger->set(i, 0); + + cct->_conf->add_observer(this); + create(); +} + +WBThrottle::~WBThrottle() { + assert(cct); + { + Mutex::Locker l(lock); + stopping = true; + cond.Signal(); + } + join(); + cct->get_perfcounters_collection()->remove(logger); + delete logger; + cct->_conf->remove_observer(this); +} + +const char** WBThrottle::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "filestore_wbthrottle_btrfs_bytes_start_flusher", + "filestore_wbthrottle_btrfs_bytes_hard_limit", + "filestore_wbthrottle_btrfs_ios_start_flusher", + "filestore_wbthrottle_btrfs_ios_hard_limit", + "filestore_wbthrottle_btrfs_inodes_start_flusher", + "filestore_wbthrottle_btrfs_inodes_hard_limit", + "filestore_wbthrottle_xfs_bytes_start_flusher", + "filestore_wbthrottle_xfs_bytes_hard_limit", + "filestore_wbthrottle_xfs_ios_start_flusher", + "filestore_wbthrottle_xfs_ios_hard_limit", + "filestore_wbthrottle_xfs_inodes_start_flusher", + "filestore_wbthrottle_xfs_inodes_hard_limit", + NULL + }; + return KEYS; +} + +void WBThrottle::set_from_conf() +{ + assert(lock.is_locked()); + if (fs == BTRFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; + } else if (fs == XFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; + } else { + assert(0 == "invalid value for fs"); + } + cond.Signal(); +} + +void WBThrottle::handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed) +{ + Mutex::Locker l(lock); + for (const char** i = get_tracked_conf_keys(); *i; ++i) { + if (changed.count(*i)) { + set_from_conf(); + return; + } + } +} + +bool WBThrottle::get_next_should_flush( + boost::tuple<hobject_t, FDRef, PendingWB> *next) +{ + assert(lock.is_locked()); + assert(next); + while (!stopping && + cur_ios < io_limits.first && + pending_wbs.size() < fd_limits.first && + cur_size < size_limits.first) + cond.Wait(lock); + if (stopping) + return false; + assert(!pending_wbs.empty()); + hobject_t obj(pop_object()); + + map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(obj); + *next = boost::make_tuple(obj, i->second.second, i->second.first); + pending_wbs.erase(i); + return true; +} + + +void *WBThrottle::entry() +{ + Mutex::Locker l(lock); + boost::tuple<hobject_t, FDRef, PendingWB> wb; + while (get_next_should_flush(&wb)) { + clearing = wb.get<0>(); + lock.Unlock(); + ::fsync(**wb.get<1>()); + if (wb.get<2>().nocache) + posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); + lock.Lock(); + clearing = hobject_t(); + cur_ios -= wb.get<2>().ios; + logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios); + cur_size -= wb.get<2>().size; + logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size); + logger->dec(l_wbthrottle_inodes_dirtied); + cond.Signal(); + wb = boost::tuple<hobject_t, FDRef, PendingWB>(); + } + return 0; +} + +void WBThrottle::queue_wb( + FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len, + bool nocache) +{ + Mutex::Locker l(lock); + map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter = + pending_wbs.find(hoid); + if (wbiter == pending_wbs.end()) { + wbiter = pending_wbs.insert( + make_pair(hoid, + make_pair( + PendingWB(), + fd))).first; + logger->inc(l_wbthrottle_inodes_dirtied); + } else { + remove_object(hoid); + } + + cur_ios++; + logger->inc(l_wbthrottle_ios_dirtied); + cur_size += len; + logger->inc(l_wbthrottle_bytes_dirtied, len); + + wbiter->second.first.add(nocache, len, 1); + insert_object(hoid); + cond.Signal(); +} + +void WBThrottle::clear() +{ + Mutex::Locker l(lock); + for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.begin(); + i != pending_wbs.end(); + ++i) { + cur_ios -= i->second.first.ios; + logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios); + cur_size -= i->second.first.size; + logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size); + logger->dec(l_wbthrottle_inodes_dirtied); + } + pending_wbs.clear(); + lru.clear(); + rev_lru.clear(); + cond.Signal(); + assert(cur_ios == 0); + assert(cur_size == 0); +} + +void WBThrottle::clear_object(const hobject_t &hoid) +{ + Mutex::Locker l(lock); + while (clearing == hoid) + cond.Wait(lock); + map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(hoid); + if (i == pending_wbs.end()) + return; + + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + + pending_wbs.erase(i); + remove_object(hoid); +} + +void WBThrottle::throttle() +{ + Mutex::Locker l(lock); + while (!stopping && !( + cur_ios < io_limits.second && + pending_wbs.size() < fd_limits.second && + cur_size < size_limits.second)) { + cond.Wait(lock); + } +} diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h new file mode 100644 index 00000000000..070de08e123 --- /dev/null +++ b/src/os/WBThrottle.h @@ -0,0 +1,171 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef WBTHROTTLE_H +#define WBTHROTTLE_H + +#include <map> +#include <boost/tuple/tuple.hpp> +#include <tr1/memory> +#include "include/buffer.h" +#include "common/Formatter.h" +#include "os/hobject.h" +#include "include/interval_set.h" +#include "FDCache.h" +#include "common/Thread.h" +#include "common/ceph_context.h" + +class PerfCounters; +enum { + l_wbthrottle_first = 999090, + l_wbthrottle_bytes_dirtied, + l_wbthrottle_bytes_wb, + l_wbthrottle_ios_dirtied, + l_wbthrottle_ios_wb, + l_wbthrottle_inodes_dirtied, + l_wbthrottle_inodes_wb, + l_wbthrottle_last +}; + +/** + * WBThrottle + * + * Tracks, throttles, and flushes outstanding IO + */ +class WBThrottle : Thread, public md_config_obs_t { + hobject_t clearing; + + /* *_limits.first is the start_flusher limit and + * *_limits.second is the hard limit + */ + + /// Limits on unflushed bytes + pair<uint64_t, uint64_t> size_limits; + + /// Limits on unflushed ios + pair<uint64_t, uint64_t> io_limits; + + /// Limits on unflushed objects + pair<uint64_t, uint64_t> fd_limits; + + uint64_t cur_ios; /// Currently unflushed IOs + uint64_t cur_size; /// Currently unflushed bytes + + /** + * PendingWB tracks the ios pending on an object. + */ + class PendingWB { + public: + bool nocache; + uint64_t size; + uint64_t ios; + PendingWB() : nocache(true), size(0), ios(0) {} + void add(bool _nocache, uint64_t _size, uint64_t _ios) { + if (!_nocache) + nocache = false; // only nocache if all writes are nocache + size += _size; + ios += _ios; + } + }; + + CephContext *cct; + PerfCounters *logger; + bool stopping; + Mutex lock; + Cond cond; + + + /** + * Flush objects in lru order + */ + list<hobject_t> lru; + map<hobject_t, list<hobject_t>::iterator> rev_lru; + void remove_object(const hobject_t &hoid) { + assert(lock.is_locked()); + map<hobject_t, list<hobject_t>::iterator>::iterator iter = + rev_lru.find(hoid); + if (iter == rev_lru.end()) + return; + + lru.erase(iter->second); + rev_lru.erase(iter); + } + hobject_t pop_object() { + assert(!lru.empty()); + hobject_t hoid(lru.front()); + lru.pop_front(); + rev_lru.erase(hoid); + return hoid; + } + void insert_object(const hobject_t &hoid) { + assert(rev_lru.find(hoid) == rev_lru.end()); + lru.push_back(hoid); + rev_lru.insert(make_pair(hoid, --lru.end())); + } + + map<hobject_t, pair<PendingWB, FDRef> > pending_wbs; + + /// get next flush to perform + bool get_next_should_flush( + boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush + ); ///< @return false if we are shutting down +public: + enum FS { + BTRFS, + XFS + }; + +private: + FS fs; + + void set_from_conf(); +public: + WBThrottle(CephContext *cct); + ~WBThrottle(); + + /// Set fs as XFS or BTRFS + void set_fs(FS new_fs) { + Mutex::Locker l(lock); + fs = new_fs; + set_from_conf(); + } + + /// Queue wb on hoid, fd taking throttle (does not block) + void queue_wb( + FDRef fd, ///< [in] FDRef to hoid + const hobject_t &hoid, ///< [in] object + uint64_t offset, ///< [in] offset written + uint64_t len, ///< [in] length written + bool nocache ///< [in] try to clear out of cache after write + ); + + /// Clear all wb (probably due to sync) + void clear(); + + /// Clear object + void clear_object(const hobject_t &hoid); + + /// Block until there is throttle available + void throttle(); + + /// md_config_obs_t + const char** get_tracked_conf_keys() const; + void handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed); + + /// Thread + void *entry(); +}; + +#endif diff --git a/src/os/hobject.h b/src/os/hobject.h index 28e0da0d82a..47fcb3dda39 100644 --- a/src/os/hobject.h +++ b/src/os/hobject.h @@ -15,6 +15,9 @@ #ifndef __CEPH_OS_HOBJECT_H #define __CEPH_OS_HOBJECT_H +#include <string.h> +#include "include/types.h" +#include "include/rados.h" #include "include/object.h" #include "include/cmp.h" diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 761a77cd69c..de60a6a9205 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4383,6 +4383,11 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, if (oid.snap == CEPH_SNAPDIR) { // return head or snapdir, whichever exists. ObjectContext *obc = get_object_context(head, oloc, can_create); + if (obc && !obc->obs.exists) { + // ignore it if the obc exists but the object doesn't + put_object_context(obc); + obc = NULL; + } if (!obc) { obc = get_object_context(snapdir, oloc, can_create); } @@ -5288,7 +5293,6 @@ void ReplicatedPG::submit_push_data( void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { - remove_snap_mapped_object(*t, recovery_info.soid); t->collection_move(coll, get_temp_coll(t), recovery_info.soid); for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = recovery_info.clone_subset.begin(); |