summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/dev/osd_internals/osd_throttles.rst21
-rw-r--r--doc/dev/osd_internals/wbthrottle.rst28
-rw-r--r--src/Makefile.am7
-rw-r--r--src/common/config_opts.h20
-rw-r--r--src/common/shared_cache.hpp13
-rw-r--r--src/mds/CDir.cc22
-rw-r--r--src/mds/CDir.h2
-rw-r--r--src/mds/CInode.cc179
-rw-r--r--src/mds/CInode.h25
-rw-r--r--src/mds/Locker.cc132
-rw-r--r--src/mds/Locker.h2
-rw-r--r--src/mds/LogSegment.h25
-rw-r--r--src/mds/MDCache.cc933
-rw-r--r--src/mds/MDCache.h77
-rw-r--r--src/mds/MDLog.cc2
-rw-r--r--src/mds/MDS.cc23
-rw-r--r--src/mds/MDS.h3
-rw-r--r--src/mds/MDSMap.h7
-rw-r--r--src/mds/Migrator.cc97
-rw-r--r--src/mds/Migrator.h1
-rw-r--r--src/mds/Mutation.cc7
-rw-r--r--src/mds/Mutation.h1
-rw-r--r--src/mds/Server.cc271
-rw-r--r--src/mds/Server.h11
-rw-r--r--src/mds/events/EMetaBlob.h74
-rw-r--r--src/mds/events/EOpen.h2
-rw-r--r--src/mds/inode_backtrace.h4
-rw-r--r--src/mds/journal.cc193
-rw-r--r--src/mds/locks.c4
-rw-r--r--src/mds/mdstypes.cc6
-rw-r--r--src/mds/mdstypes.h12
-rw-r--r--src/messages/MMDSCacheRejoin.h15
-rw-r--r--src/messages/MMDSOpenIno.h46
-rw-r--r--src/messages/MMDSOpenInoReply.h53
-rw-r--r--src/mon/OSDMonitor.cc5
-rw-r--r--src/msg/Message.cc9
-rw-r--r--src/msg/Message.h2
-rw-r--r--src/os/FDCache.h95
-rw-r--r--src/os/FileStore.cc364
-rw-r--r--src/os/FileStore.h41
-rw-r--r--src/os/WBThrottle.cc239
-rw-r--r--src/os/WBThrottle.h171
-rw-r--r--src/os/hobject.h3
-rw-r--r--src/osd/ReplicatedPG.cc6
44 files changed, 2269 insertions, 984 deletions
diff --git a/doc/dev/osd_internals/osd_throttles.rst b/doc/dev/osd_internals/osd_throttles.rst
new file mode 100644
index 00000000000..4fa3044f986
--- /dev/null
+++ b/doc/dev/osd_internals/osd_throttles.rst
@@ -0,0 +1,21 @@
+ Messenger throttle (number and size)
+ |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+ FileStore op_queue throttle (number and size)
+ |--------------------------------------------------------|
+ WBThrottle
+ |---------------------------------------------------------------------------------------------------------|
+ Journal (size)
+ |-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+ |----------------------------------------------------------------------------------------------------> flushed ----------------> synced
+ |
+Op: Read Header --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> op_applied -------------------------------------------------------------> Complete
+ | |
+SubOp: --Messenger--> ReadHeader --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> sub_op_applied -
+ |
+ |-----------------------------> flushed ----------------> synced
+ |------------------------------------------------------------------------------------------|
+ Journal (size)
+ |---------------------------------|
+ WBThrottle
+ |-----------------------------------------------------|
+ FileStore op_queue throttle (number and size)
diff --git a/doc/dev/osd_internals/wbthrottle.rst b/doc/dev/osd_internals/wbthrottle.rst
new file mode 100644
index 00000000000..14ba0140d4d
--- /dev/null
+++ b/doc/dev/osd_internals/wbthrottle.rst
@@ -0,0 +1,28 @@
+==================
+Writeback Throttle
+==================
+
+Previously, the filestore had a problem when handling large numbers of
+small ios. We throttle dirty data implicitely via the journal, but
+a large number of inodes can be dirtied without filling the journal
+resulting in a very long sync time when the sync finally does happen.
+The flusher was not an adequate solution to this problem since it
+forced writeback of small writes too eagerly killing performance.
+
+WBThrottle tracks unflushed io per hobject_t and ::fsyncs in lru
+order once the start_flusher threshhold is exceeded for any of
+dirty bytes, dirty ios, or dirty inodes. While any of these exceed
+the hard_limit, we block on throttle() in _do_op.
+
+See src/os/WBThrottle.h, src/osd/WBThrottle.cc
+
+To track the open FDs through the writeback process, there is now an
+fdcache to cache open fds. lfn_open now returns a cached FDRef which
+implicitely closes the fd once all references have expired.
+
+Filestore syncs have a sideeffect of flushing all outstanding objects
+in the wbthrottle.
+
+lfn_unlink clears the cached FDRef and wbthrottle entries for the
+unlinked object when then last link is removed and asserts that all
+outstanding FDRefs for that object are dead.
diff --git a/src/Makefile.am b/src/Makefile.am
index 7a08e1f5a2a..5e176874b11 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1490,7 +1490,8 @@ libos_a_SOURCES = \
os/IndexManager.cc \
os/FlatIndex.cc \
os/DBObjectMap.cc \
- os/LevelDBStore.cc
+ os/LevelDBStore.cc \
+ os/WBThrottle.cc
libos_a_CXXFLAGS= ${AM_CXXFLAGS}
noinst_LIBRARIES += libos.a
@@ -1873,6 +1874,8 @@ noinst_HEADERS = \
messages/MMDSFindInoReply.h\
messages/MMDSFragmentNotify.h\
messages/MMDSMap.h\
+ messages/MMDSOpenIno.h \
+ messages/MMDSOpenInoReply.h \
messages/MMDSResolve.h\
messages/MMDSResolveAck.h\
messages/MMDSSlaveRequest.h\
@@ -1973,6 +1976,8 @@ noinst_HEADERS = \
os/FileStore.h\
os/FlatIndex.h\
os/HashIndex.h\
+ os/FDCache.h\
+ os/WBThrottle.h\
os/IndexManager.h\
os/Journal.h\
os/JournalingObjectStore.h\
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 01e8b5a5a15..5e87b2f1782 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -338,6 +338,7 @@ OPTION(mds_kill_openc_at, OPT_INT, 0)
OPTION(mds_kill_journal_at, OPT_INT, 0)
OPTION(mds_kill_journal_expire_at, OPT_INT, 0)
OPTION(mds_kill_journal_replay_at, OPT_INT, 0)
+OPTION(mds_open_remote_link_mode, OPT_INT, 0)
OPTION(mds_inject_traceless_reply_probability, OPT_DOUBLE, 0) /* percentage
of MDS modify replies to skip sending the
client a trace on [0-1]*/
@@ -478,6 +479,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
OPTION(filestore, OPT_BOOL, false)
+/// filestore wb throttle limits
+OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100)
+OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100)
+
// Tests index failure paths
OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0)
@@ -498,10 +513,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true)
OPTION(filestore_btrfs_clone_range, OPT_BOOL, true)
OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false)
OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap
-OPTION(filestore_flusher, OPT_BOOL, true)
-OPTION(filestore_flusher_max_fds, OPT_INT, 512)
-OPTION(filestore_flush_min, OPT_INT, 65536)
-OPTION(filestore_sync_flush, OPT_BOOL, false)
OPTION(filestore_journal_parallel, OPT_BOOL, false)
OPTION(filestore_journal_writeahead, OPT_BOOL, false)
OPTION(filestore_journal_trailing, OPT_BOOL, false)
@@ -518,6 +529,7 @@ OPTION(filestore_merge_threshold, OPT_INT, 10)
OPTION(filestore_split_multiple, OPT_INT, 2)
OPTION(filestore_update_to, OPT_INT, 1000)
OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor
+OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size
OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps
OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity
OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread
diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp
index 69a4c06dfbf..178d1001be3 100644
--- a/src/common/shared_cache.hpp
+++ b/src/common/shared_cache.hpp
@@ -85,12 +85,23 @@ public:
assert(weak_refs.empty());
}
+ void clear(K key) {
+ VPtr val; // release any ref we have after we drop the lock
+ {
+ Mutex::Locker l(lock);
+ if (weak_refs.count(key)) {
+ val = weak_refs[key].lock();
+ }
+ lru_remove(key);
+ }
+ }
+
void set_size(size_t new_size) {
list<VPtr> to_release;
{
Mutex::Locker l(lock);
max_size = new_size;
- trim_cache(to_release);
+ trim_cache(&to_release);
}
}
diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc
index 4ef6e8f19fa..211cec08b4f 100644
--- a/src/mds/CDir.cc
+++ b/src/mds/CDir.cc
@@ -1055,7 +1055,7 @@ void CDir::assimilate_dirty_rstat_inodes_finish(Mutation *mut, EMetaBlob *blob)
mut->add_projected_inode(in);
in->clear_dirty_rstat();
- blob->add_primary_dentry(dn, true, in);
+ blob->add_primary_dentry(dn, in, true);
}
if (!dirty_rstat_inodes.empty())
@@ -1651,7 +1651,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
dn->mark_clean();
- if (dn->get_linkage()->get_inode()) {
+ if (dn->get_linkage()->is_primary()) {
assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
dn->get_linkage()->get_inode()->mark_clean();
@@ -1728,11 +1728,11 @@ public:
class C_Dir_Committed : public Context {
CDir *dir;
- version_t version, last_renamed_version;
+ version_t version;
public:
- C_Dir_Committed(CDir *d, version_t v, version_t lrv) : dir(d), version(v), last_renamed_version(lrv) { }
+ C_Dir_Committed(CDir *d, version_t v) : dir(d), version(v) { }
void finish(int r) {
- dir->_committed(version, last_renamed_version);
+ dir->_committed(version);
}
};
@@ -1993,12 +1993,9 @@ void CDir::_commit(version_t want)
if (committed_dn == items.end())
cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
- new C_Dir_Committed(this, get_version(),
- inode->inode.last_renamed_version));
+ new C_Dir_Committed(this, get_version()));
else { // send in a different Context
- C_GatherBuilder gather(g_ceph_context,
- new C_Dir_Committed(this, get_version(),
- inode->inode.last_renamed_version));
+ C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version()));
while (committed_dn != items.end()) {
ObjectOperation n = ObjectOperation();
committed_dn = _commit_partial(n, snaps, max_write_size, committed_dn);
@@ -2027,9 +2024,9 @@ void CDir::_commit(version_t want)
*
* @param v version i just committed
*/
-void CDir::_committed(version_t v, version_t lrv)
+void CDir::_committed(version_t v)
{
- dout(10) << "_committed v " << v << " (last renamed " << lrv << ") on " << *this << dendl;
+ dout(10) << "_committed v " << v << " on " << *this << dendl;
assert(is_auth());
bool stray = inode->is_stray();
@@ -2142,6 +2139,7 @@ void CDir::encode_export(bufferlist& bl)
void CDir::finish_export(utime_t now)
{
+ state &= MASK_STATE_EXPORT_KEPT;
pop_auth_subtree_nested.sub(now, cache->decayrate, pop_auth_subtree);
pop_me.zero(now);
pop_auth_subtree.zero(now);
diff --git a/src/mds/CDir.h b/src/mds/CDir.h
index 7e1db73af06..87c79c2af1b 100644
--- a/src/mds/CDir.h
+++ b/src/mds/CDir.h
@@ -494,7 +494,7 @@ private:
unsigned max_write_size=-1,
map_t::iterator last_committed_dn=map_t::iterator());
void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
- void _committed(version_t v, version_t last_renamed_version);
+ void _committed(version_t v);
void wait_for_commit(Context *c, version_t v=0);
// -- dirtyness --
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index 781ed727f5f..0e1429377f8 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -127,6 +127,7 @@ ostream& operator<<(ostream& out, CInode& in)
if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
+ if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
if (in.is_frozen_inode()) out << " FROZEN";
if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
@@ -328,9 +329,14 @@ void CInode::pop_and_dirty_projected_inode(LogSegment *ls)
assert(!projected_nodes.empty());
dout(15) << "pop_and_dirty_projected_inode " << projected_nodes.front()->inode
<< " v" << projected_nodes.front()->inode->version << dendl;
+ int64_t old_pool = inode.layout.fl_pg_pool;
+
mark_dirty(projected_nodes.front()->inode->version, ls);
inode = *projected_nodes.front()->inode;
+ if (inode.is_backtrace_updated())
+ _mark_dirty_parent(ls, old_pool != inode.layout.fl_pg_pool);
+
map<string,bufferptr> *px = projected_nodes.front()->xattrs;
if (px) {
xattrs = *px;
@@ -967,67 +973,134 @@ void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin)
delete fin;
}
-class C_CInode_FetchedBacktrace : public Context {
- CInode *in;
- inode_backtrace_t *backtrace;
- Context *fin;
-public:
- bufferlist bl;
- C_CInode_FetchedBacktrace(CInode *i, inode_backtrace_t *bt, Context *f) :
- in(i), backtrace(bt), fin(f) {}
-
- void finish(int r) {
- if (r == 0) {
- in->_fetched_backtrace(&bl, backtrace, fin);
- } else {
- fin->finish(r);
- }
- }
-};
-
-void CInode::fetch_backtrace(inode_backtrace_t *bt, Context *fin)
+void CInode::build_backtrace(int64_t pool, inode_backtrace_t& bt)
{
- object_t oid = get_object_name(ino(), frag_t(), "");
- object_locator_t oloc(inode.layout.fl_pg_pool);
-
- SnapContext snapc;
- C_CInode_FetchedBacktrace *c = new C_CInode_FetchedBacktrace(this, bt, fin);
- mdcache->mds->objecter->getxattr(oid, oloc, "parent", CEPH_NOSNAP, &c->bl, 0, c);
-}
-
-void CInode::_fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin)
-{
- ::decode(*bt, *bl);
- if (fin) {
- fin->finish(0);
- }
-}
-
-void CInode::build_backtrace(int64_t location, inode_backtrace_t* bt)
-{
- bt->ino = inode.ino;
- bt->ancestors.clear();
+ bt.ino = inode.ino;
+ bt.ancestors.clear();
+ bt.pool = pool;
CInode *in = this;
CDentry *pdn = get_parent_dn();
while (pdn) {
CInode *diri = pdn->get_dir()->get_inode();
- bt->ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
+ bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
in = diri;
pdn = in->get_parent_dn();
}
vector<int64_t>::iterator i = inode.old_pools.begin();
while(i != inode.old_pools.end()) {
// don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
- if (*i == location) {
+ if (*i == pool) {
++i;
continue;
}
- bt->old_pools.insert(*i);
+ bt.old_pools.insert(*i);
++i;
}
}
+struct C_Inode_StoredBacktrace : public Context {
+ CInode *in;
+ version_t version;
+ Context *fin;
+ C_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : in(i), version(v), fin(f) {}
+ void finish(int r) {
+ in->_stored_backtrace(version, fin);
+ }
+};
+
+void CInode::store_backtrace(Context *fin)
+{
+ dout(10) << "store_backtrace on " << *this << dendl;
+ assert(is_dirty_parent());
+
+ auth_pin(this);
+
+ int64_t pool;
+ if (is_dir())
+ pool = mdcache->mds->mdsmap->get_metadata_pool();
+ else
+ pool = inode.layout.fl_pg_pool;
+
+ inode_backtrace_t bt;
+ build_backtrace(pool, bt);
+ bufferlist bl;
+ ::encode(bt, bl);
+
+ ObjectOperation op;
+ op.create(false);
+ op.setxattr("parent", bl);
+
+ SnapContext snapc;
+ object_t oid = get_object_name(ino(), frag_t(), "");
+ object_locator_t oloc(pool);
+ Context *fin2 = new C_Inode_StoredBacktrace(this, inode.backtrace_version, fin);
+
+ if (!state_test(STATE_DIRTYPOOL)) {
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, fin2);
+ return;
+ }
+
+ C_GatherBuilder gather(g_ceph_context, fin2);
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+
+ set<int64_t> old_pools;
+ for (vector<int64_t>::iterator p = inode.old_pools.begin();
+ p != inode.old_pools.end();
+ ++p) {
+ if (*p == pool || old_pools.count(*p))
+ continue;
+
+ ObjectOperation op;
+ op.create(false);
+ op.setxattr("parent", bl);
+
+ object_locator_t oloc(*p);
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+ old_pools.insert(*p);
+ }
+ gather.activate();
+}
+
+void CInode::_stored_backtrace(version_t v, Context *fin)
+{
+ dout(10) << "_stored_backtrace" << dendl;
+
+ if (v == inode.backtrace_version)
+ clear_dirty_parent();
+ auth_unpin(this);
+ if (fin)
+ fin->complete(0);
+}
+
+void CInode::_mark_dirty_parent(LogSegment *ls, bool dirty_pool)
+{
+ if (!state_test(STATE_DIRTYPARENT)) {
+ dout(10) << "mark_dirty_parent" << dendl;
+ state_set(STATE_DIRTYPARENT);
+ get(PIN_DIRTYPARENT);
+ assert(ls);
+ }
+ if (dirty_pool)
+ state_set(STATE_DIRTYPOOL);
+ if (ls)
+ ls->dirty_parent_inodes.push_back(&item_dirty_parent);
+}
+
+void CInode::clear_dirty_parent()
+{
+ if (state_test(STATE_DIRTYPARENT)) {
+ dout(10) << "clear_dirty_parent" << dendl;
+ state_clear(STATE_DIRTYPARENT);
+ state_clear(STATE_DIRTYPOOL);
+ put(PIN_DIRTYPARENT);
+ item_dirty_parent.remove_myself();
+ }
+}
+
// ------------------
// parent dir
@@ -2989,11 +3062,10 @@ void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<Context*>& waite
void CInode::encode_export(bufferlist& bl)
{
- ENCODE_START(3, 3, bl)
+ ENCODE_START(4, 4, bl)
_encode_base(bl);
- bool dirty = is_dirty();
- ::encode(dirty, bl);
+ ::encode(state, bl);
::encode(pop, bl);
@@ -3024,6 +3096,8 @@ void CInode::encode_export(bufferlist& bl)
void CInode::finish_export(utime_t now)
{
+ state &= MASK_STATE_EXPORT_KEPT;
+
pop.zero(now);
// just in case!
@@ -3037,14 +3111,21 @@ void CInode::finish_export(utime_t now)
void CInode::decode_import(bufferlist::iterator& p,
LogSegment *ls)
{
- DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
+ DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, p);
_decode_base(p);
- bool dirty;
- ::decode(dirty, p);
- if (dirty)
+ unsigned s;
+ ::decode(s, p);
+ state |= (s & MASK_STATE_EXPORTED);
+ if (is_dirty()) {
+ get(PIN_DIRTY);
_mark_dirty(ls);
+ }
+ if (is_dirty_parent()) {
+ get(PIN_DIRTYPARENT);
+ _mark_dirty_parent(ls);
+ }
::decode(pop, ceph_clock_now(g_ceph_context), p);
diff --git a/src/mds/CInode.h b/src/mds/CInode.h
index 7c63593c73c..779bb63f485 100644
--- a/src/mds/CInode.h
+++ b/src/mds/CInode.h
@@ -151,9 +151,16 @@ public:
static const int STATE_NEEDSRECOVER = (1<<11);
static const int STATE_RECOVERING = (1<<12);
static const int STATE_PURGING = (1<<13);
+ static const int STATE_DIRTYPARENT = (1<<14);
static const int STATE_DIRTYRSTAT = (1<<15);
static const int STATE_STRAYPINNED = (1<<16);
static const int STATE_FROZENAUTHPIN = (1<<17);
+ static const int STATE_DIRTYPOOL = (1<<18);
+
+ static const int MASK_STATE_EXPORTED =
+ (STATE_DIRTY|STATE_NEEDSRECOVER|STATE_DIRTYPARENT|STATE_DIRTYPOOL);
+ static const int MASK_STATE_EXPORT_KEPT =
+ (STATE_FROZEN|STATE_AMBIGUOUSAUTH|STATE_EXPORTINGCAPS);
// -- waiters --
static const uint64_t WAIT_DIR = (1<<0);
@@ -364,7 +371,7 @@ public:
protected:
// file capabilities
map<client_t, Capability*> client_caps; // client -> caps
- map<int, int> mds_caps_wanted; // [auth] mds -> caps wanted
+ map<int32_t, int32_t> mds_caps_wanted; // [auth] mds -> caps wanted
int replica_caps_wanted; // [replica] what i've requested from auth
map<int, set<client_t> > client_snap_caps; // [auth] [snap] dirty metadata we still need from the head
@@ -384,6 +391,7 @@ public:
elist<CInode*>::item item_dirty;
elist<CInode*>::item item_caps;
elist<CInode*>::item item_open_file;
+ elist<CInode*>::item item_dirty_parent;
elist<CInode*>::item item_dirty_dirfrag_dir;
elist<CInode*>::item item_dirty_dirfrag_nest;
elist<CInode*>::item item_dirty_dirfrag_dirfragtree;
@@ -424,7 +432,7 @@ private:
parent(0),
inode_auth(CDIR_AUTH_DEFAULT),
replica_caps_wanted(0),
- item_dirty(this), item_caps(this), item_open_file(this),
+ item_dirty(this), item_caps(this), item_open_file(this), item_dirty_parent(this),
item_dirty_dirfrag_dir(this),
item_dirty_dirfrag_nest(this),
item_dirty_dirfrag_dirfragtree(this),
@@ -527,10 +535,13 @@ private:
void fetch(Context *fin);
void _fetched(bufferlist& bl, bufferlist& bl2, Context *fin);
- void fetch_backtrace(inode_backtrace_t *bt, Context *fin);
- void _fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin);
-
- void build_backtrace(int64_t location, inode_backtrace_t* bt);
+ void build_backtrace(int64_t pool, inode_backtrace_t& bt);
+ void store_backtrace(Context *fin);
+ void _stored_backtrace(version_t v, Context *fin);
+ void _mark_dirty_parent(LogSegment *ls, bool dirty_pool=false);
+ void clear_dirty_parent();
+ bool is_dirty_parent() { return state_test(STATE_DIRTYPARENT); }
+ bool is_dirty_pool() { return state_test(STATE_DIRTYPOOL); }
void encode_store(bufferlist& bl);
void decode_store(bufferlist::iterator& bl);
@@ -704,7 +715,7 @@ public:
bool is_any_caps() { return !client_caps.empty(); }
bool is_any_nonstale_caps() { return count_nonstale_caps(); }
- map<int,int>& get_mds_caps_wanted() { return mds_caps_wanted; }
+ map<int32_t,int32_t>& get_mds_caps_wanted() { return mds_caps_wanted; }
map<client_t,Capability*>& get_client_caps() { return client_caps; }
Capability *get_client_cap(client_t client) {
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
index 4a23e0bc47f..57154b3d9f6 100644
--- a/src/mds/Locker.cc
+++ b/src/mds/Locker.cc
@@ -327,6 +327,14 @@ bool Locker::acquire_locks(MDRequest *mdr,
p != mustpin_remote.end();
++p) {
dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
+
+ // wait for active auth
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
+ dout(10) << " mds." << p->first << " is not active" << dendl;
+ if (mdr->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
+ return false;
+ }
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_AUTHPIN);
@@ -1332,10 +1340,11 @@ void Locker::remote_wrlock_start(SimpleLock *lock, int target, MDRequest *mut)
{
dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
- // wait for single auth
- if (lock->get_parent()->is_ambiguous_auth()) {
- lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
- new C_MDS_RetryRequest(mdcache, mut));
+ // wait for active target
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
+ dout(7) << " mds." << target << " is not active" << dendl;
+ if (mut->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
return;
}
@@ -1422,8 +1431,16 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequest *mut)
return false;
}
- // send lock request
+ // wait for active auth
int auth = lock->get_parent()->authority().first;
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
+ dout(7) << " mds." << auth << " is not active" << dendl;
+ if (mut->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
+ return false;
+ }
+
+ // send lock request
mut->more()->slaves.insert(auth);
mut->start_locking(lock, auth);
MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
@@ -1915,8 +1932,7 @@ void Locker::request_inode_file_caps(CInode *in)
}
int auth = in->authority().first;
- if (in->is_rejoining() &&
- mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
+ if (mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
return;
}
@@ -1937,7 +1953,7 @@ void Locker::request_inode_file_caps(CInode *in)
void Locker::handle_inode_file_caps(MInodeFileCaps *m)
{
// nobody should be talking to us during recovery.
- assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
+ assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
// ok
CInode *in = mdcache->get_inode(m->get_ino());
@@ -2112,7 +2128,7 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
// no cow, here!
CDentry *parent = in->get_projected_parent_dn();
- metablob->add_primary_dentry(parent, true, in);
+ metablob->add_primary_dentry(parent, in, true);
} else {
metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
mdcache->journal_dirty_inode(mut, metablob, in);
@@ -2183,8 +2199,11 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
}
CInode *cur = cap->get_inode();
- if (!cur->is_auth())
+ if (!cur->is_auth()) {
+ request_inode_file_caps(cur);
return;
+ }
+
if (cap->wanted() == 0) {
if (cur->item_open_file.is_on_list() &&
!cur->is_any_caps_wanted()) {
@@ -2203,7 +2222,6 @@ void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
mds->mdlog->submit_entry(le);
}
}
-
}
@@ -2903,41 +2921,65 @@ void Locker::handle_client_cap_release(MClientCapRelease *m)
return;
}
- for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
- inodeno_t ino((uint64_t)p->ino);
- CInode *in = mdcache->get_inode(ino);
- if (!in) {
- dout(10) << " missing ino " << ino << dendl;
- continue;
- }
- Capability *cap = in->get_client_cap(client);
- if (!cap) {
- dout(10) << " no cap on " << *in << dendl;
- continue;
- }
- if (cap->get_cap_id() != p->cap_id) {
- dout(7) << " ignoring client capid " << p->cap_id << " != my " << cap->get_cap_id() << " on " << *in << dendl;
- continue;
- }
- if (ceph_seq_cmp(p->migrate_seq, cap->get_mseq()) < 0) {
- dout(7) << " mseq " << p->migrate_seq << " < " << cap->get_mseq()
- << " on " << *in << dendl;
- continue;
- }
- if (p->seq != cap->get_last_issue()) {
- dout(10) << " issue_seq " << p->seq << " != " << cap->get_last_issue() << " on " << *in << dendl;
-
- // clean out any old revoke history
- cap->clean_revoke_from(p->seq);
- eval_cap_gather(in);
- continue;
- }
+ for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p)
+ _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+
+ m->put();
+}
+
+class C_Locker_RetryCapRelease : public Context {
+ Locker *locker;
+ client_t client;
+ inodeno_t ino;
+ uint64_t cap_id;
+ ceph_seq_t migrate_seq;
+ ceph_seq_t issue_seq;
+public:
+ C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
+ ceph_seq_t mseq, ceph_seq_t seq) :
+ locker(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
+ void finish(int r) {
+ locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
+ }
+};
- dout(7) << "removing cap on " << *in << dendl;
- remove_client_cap(in, client);
+void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
+ ceph_seq_t mseq, ceph_seq_t seq)
+{
+ CInode *in = mdcache->get_inode(ino);
+ if (!in) {
+ dout(7) << "_do_cap_release missing ino " << ino << dendl;
+ return;
+ }
+ Capability *cap = in->get_client_cap(client);
+ if (!cap) {
+ dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
+ return;
}
- m->put();
+ dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
+ if (cap->get_cap_id() != cap_id) {
+ dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
+ return;
+ }
+ if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
+ dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
+ return;
+ }
+ if (should_defer_client_cap_frozen(in)) {
+ dout(7) << " freezing|frozen, deferring" << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE,
+ new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
+ return;
+ }
+ if (seq != cap->get_last_issue()) {
+ dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
+ // clean out any old revoke history
+ cap->clean_revoke_from(seq);
+ eval_cap_gather(in);
+ return;
+ }
+ remove_client_cap(in, client);
}
/* This function DOES put the passed message before returning */
@@ -4108,6 +4150,10 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
if (lock->get_parent()->is_freezing_or_frozen())
return;
+ // wait for scan
+ if (lock->get_state() == LOCK_SCAN)
+ return;
+
// excl -> *?
if (lock->get_state() == LOCK_EXCL) {
dout(20) << " is excl" << dendl;
diff --git a/src/mds/Locker.h b/src/mds/Locker.h
index f4d9861a384..b97307d6cb2 100644
--- a/src/mds/Locker.h
+++ b/src/mds/Locker.h
@@ -225,6 +225,7 @@ public:
bool _do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, MClientCaps *m,
MClientCaps *ack=0);
void handle_client_cap_release(class MClientCapRelease *m);
+ void _do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, ceph_seq_t mseq, ceph_seq_t seq);
// local
@@ -284,6 +285,7 @@ private:
friend class C_MDL_CheckMaxSize;
friend class C_MDL_RequestInodeFileCaps;
friend class C_Locker_FileUpdate_finish;
+ friend class C_Locker_RetryCapRelease;
// -- client leases --
diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h
index 8cf58a18306..44c79425738 100644
--- a/src/mds/LogSegment.h
+++ b/src/mds/LogSegment.h
@@ -33,19 +33,6 @@ class CDentry;
class MDS;
class MDSlaveUpdate;
-// The backtrace info struct here is used to maintain the backtrace in
-// a queue that we will eventually want to write out (on journal segment
-// expiry).
-class BacktraceInfo {
-public:
- int64_t location;
- int64_t pool;
- struct inode_backtrace_t bt;
- elist<BacktraceInfo*>::item item_logseg;
- BacktraceInfo(int64_t l, CInode *i, LogSegment *ls, int64_t p = -1);
- ~BacktraceInfo();
-};
-
class LogSegment {
public:
uint64_t offset, end;
@@ -58,12 +45,11 @@ class LogSegment {
elist<CDentry*> dirty_dentries;
elist<CInode*> open_files;
+ elist<CInode*> dirty_parent_inodes;
elist<CInode*> dirty_dirfrag_dir;
elist<CInode*> dirty_dirfrag_nest;
elist<CInode*> dirty_dirfrag_dirfragtree;
- elist<BacktraceInfo*> update_backtraces;
-
elist<MDSlaveUpdate*> slave_updates;
set<CInode*> truncating_inodes;
@@ -90,20 +76,13 @@ class LogSegment {
dirty_inodes(member_offset(CInode, item_dirty)),
dirty_dentries(member_offset(CDentry, item_dirty)),
open_files(member_offset(CInode, item_open_file)),
+ dirty_parent_inodes(member_offset(CInode, item_dirty_parent)),
dirty_dirfrag_dir(member_offset(CInode, item_dirty_dirfrag_dir)),
dirty_dirfrag_nest(member_offset(CInode, item_dirty_dirfrag_nest)),
dirty_dirfrag_dirfragtree(member_offset(CInode, item_dirty_dirfrag_dirfragtree)),
- update_backtraces(member_offset(BacktraceInfo, item_logseg)),
slave_updates(0), // passed to begin() manually
inotablev(0), sessionmapv(0)
{ }
-
- // backtrace handling
- void queue_backtrace_update(CInode *in, int64_t location, int64_t pool = -1);
- void remove_pending_backtraces(inodeno_t ino, int64_t pool);
- void store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin);
- void _stored_backtrace(BacktraceInfo *info, Context *fin);
- unsigned encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info);
};
#endif
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index f0a2cc2a7f0..0c279b66a91 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -79,6 +79,9 @@
#include "messages/MMDSFindIno.h"
#include "messages/MMDSFindInoReply.h"
+#include "messages/MMDSOpenIno.h"
+#include "messages/MMDSOpenInoReply.h"
+
#include "messages/MClientRequest.h"
#include "messages/MClientCaps.h"
#include "messages/MClientSnap.h"
@@ -235,6 +238,8 @@ void MDCache::remove_inode(CInode *o)
if (o->is_dirty())
o->mark_clean();
+ if (o->is_dirty_parent())
+ o->clear_dirty_parent();
o->filelock.remove_dirty();
o->nestlock.remove_dirty();
@@ -461,7 +466,7 @@ void MDCache::_create_system_file(CDir *dir, const char *name, CInode *in, Conte
if (!in->is_mdsdir()) {
predirty_journal_parents(mut, &le->metablob, in, dir, PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
- le->metablob.add_primary_dentry(dn, true, in);
+ le->metablob.add_primary_dentry(dn, in, true);
} else {
predirty_journal_parents(mut, &le->metablob, in, dir, PREDIRTY_DIR, 1);
journal_dirty_inode(mut, &le->metablob, in);
@@ -1552,7 +1557,7 @@ void MDCache::journal_cow_dentry(Mutation *mut, EMetaBlob *metablob, CDentry *dn
CDentry *olddn = dn->dir->add_primary_dentry(dn->name, oldin, oldfirst, follows);
oldin->inode.version = olddn->pre_dirty();
dout(10) << " olddn " << *olddn << dendl;
- metablob->add_primary_dentry(olddn, true, 0);
+ metablob->add_primary_dentry(olddn, 0, true);
mut->add_cow_dentry(olddn);
} else {
assert(dnl->is_remote());
@@ -1585,7 +1590,13 @@ void MDCache::journal_dirty_inode(Mutation *mut, EMetaBlob *metablob, CInode *in
CDentry *dn = in->get_projected_parent_dn();
if (!dn->get_projected_linkage()->is_null()) // no need to cow a null dentry
journal_cow_dentry(mut, metablob, dn, follows);
- metablob->add_primary_dentry(dn, true, in);
+ if (in->get_projected_inode()->is_backtrace_updated()) {
+ bool dirty_pool = in->get_projected_inode()->layout.fl_pg_pool !=
+ in->get_previous_projected_inode()->layout.fl_pg_pool;
+ metablob->add_primary_dentry(dn, in, true, true, dirty_pool);
+ } else {
+ metablob->add_primary_dentry(dn, in, true);
+ }
}
}
@@ -2144,32 +2155,27 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob,
struct C_MDC_CommittedMaster : public Context {
MDCache *cache;
metareqid_t reqid;
- LogSegment *ls;
- list<Context*> waiters;
- C_MDC_CommittedMaster(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) :
- cache(s), reqid(r), ls(l) {
- waiters.swap(w);
- }
+ C_MDC_CommittedMaster(MDCache *s, metareqid_t r) : cache(s), reqid(r) {}
void finish(int r) {
- cache->_logged_master_commit(reqid, ls, waiters);
+ cache->_logged_master_commit(reqid);
}
};
void MDCache::log_master_commit(metareqid_t reqid)
{
dout(10) << "log_master_commit " << reqid << dendl;
+ uncommitted_masters[reqid].committing = true;
mds->mdlog->start_submit_entry(new ECommitted(reqid),
- new C_MDC_CommittedMaster(this, reqid,
- uncommitted_masters[reqid].ls,
- uncommitted_masters[reqid].waiters));
- mds->mdcache->uncommitted_masters.erase(reqid);
+ new C_MDC_CommittedMaster(this, reqid));
}
-void MDCache::_logged_master_commit(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters)
+void MDCache::_logged_master_commit(metareqid_t reqid)
{
dout(10) << "_logged_master_commit " << reqid << dendl;
- ls->uncommitted_masters.erase(reqid);
- mds->queue_waiters(waiters);
+ assert(uncommitted_masters.count(reqid));
+ uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid);
+ mds->queue_waiters(uncommitted_masters[reqid].waiters);
+ uncommitted_masters.erase(reqid);
}
// while active...
@@ -2179,7 +2185,7 @@ void MDCache::committed_master_slave(metareqid_t r, int from)
dout(10) << "committed_master_slave mds." << from << " on " << r << dendl;
assert(uncommitted_masters.count(r));
uncommitted_masters[r].slaves.erase(from);
- if (uncommitted_masters[r].slaves.empty())
+ if (!uncommitted_masters[r].recovering && uncommitted_masters[r].slaves.empty())
log_master_commit(r);
}
@@ -2196,20 +2202,20 @@ void MDCache::logged_master_update(metareqid_t reqid)
}
/*
- * The mds could crash after receiving all slaves' commit acknowledgement,
- * but before journalling the ECommitted.
+ * Master may crash after receiving all slaves' commit acks, but before journalling
+ * the final commit. Slaves may crash after journalling the slave commit, but before
+ * sending commit ack to the master. Commit masters with no uncommitted slave when
+ * resolve finishes.
*/
void MDCache::finish_committed_masters()
{
- map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin();
- while (p != uncommitted_masters.end()) {
- if (p->second.slaves.empty()) {
- metareqid_t reqid = p->first;
- dout(10) << "finish_committed_masters " << reqid << dendl;
- ++p;
- log_master_commit(reqid);
- } else {
- ++p;
+ for (map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin();
+ p != uncommitted_masters.end();
+ ++p) {
+ p->second.recovering = false;
+ if (!p->second.committing && p->second.slaves.empty()) {
+ dout(10) << "finish_committed_masters " << p->first << dendl;
+ log_master_commit(p->first);
}
}
}
@@ -2450,8 +2456,6 @@ void MDCache::resolve_start()
adjust_subtree_auth(rootdir, CDIR_AUTH_UNKNOWN);
}
resolve_gather = recovery_set;
- resolve_gather.erase(mds->get_nodeid());
- rejoin_gather = resolve_gather;
}
void MDCache::send_resolves()
@@ -2705,6 +2709,16 @@ void MDCache::handle_mds_failure(int who)
}
}
+ for (map<metareqid_t, umaster>::iterator p = uncommitted_masters.begin();
+ p != uncommitted_masters.end();
+ ++p) {
+ // The failed MDS may have already committed the slave update
+ if (p->second.slaves.count(who)) {
+ p->second.recovering = true;
+ p->second.slaves.erase(who);
+ }
+ }
+
while (!finish.empty()) {
dout(10) << "cleaning up slave request " << *finish.front() << dendl;
request_finish(finish.front());
@@ -2712,6 +2726,7 @@ void MDCache::handle_mds_failure(int who)
}
kick_find_ino_peers(who);
+ kick_open_ino_peers(who);
show_subtrees();
}
@@ -2771,7 +2786,7 @@ void MDCache::handle_mds_recovery(int who)
}
kick_discovers(who);
-
+ kick_open_ino_peers(who);
kick_find_ino_peers(who);
// queue them up.
@@ -2964,17 +2979,17 @@ void MDCache::maybe_resolve_finish()
dout(10) << "maybe_resolve_finish still waiting for resolves ("
<< resolve_gather << ")" << dendl;
return;
+ }
+
+ dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
+ disambiguate_imports();
+ finish_committed_masters();
+ if (mds->is_resolve()) {
+ trim_unlinked_inodes();
+ recalc_auth_bits();
+ mds->resolve_done();
} else {
- dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
- disambiguate_imports();
- if (mds->is_resolve()) {
- trim_unlinked_inodes();
- recalc_auth_bits();
- trim_non_auth();
- mds->resolve_done();
- } else {
- maybe_send_pending_rejoins();
- }
+ maybe_send_pending_rejoins();
}
}
@@ -3397,6 +3412,8 @@ void MDCache::recalc_auth_bits()
dnl->get_inode()->state_clear(CInode::STATE_AUTH);
if (dnl->get_inode()->is_dirty())
dnl->get_inode()->mark_clean();
+ if (dnl->get_inode()->is_dirty_parent())
+ dnl->get_inode()->clear_dirty_parent();
// avoid touching scatterlocks for our subtree roots!
if (subtree_inodes.count(dnl->get_inode()) == 0)
dnl->get_inode()->clear_scatter_dirty();
@@ -3451,6 +3468,15 @@ void MDCache::recalc_auth_bits()
* after recovery.
*/
+void MDCache::rejoin_start()
+{
+ dout(10) << "rejoin_start" << dendl;
+
+ rejoin_gather = recovery_set;
+ // need finish opening cap inodes before sending cache rejoins
+ rejoin_gather.insert(mds->get_nodeid());
+ process_imported_caps();
+}
/*
* rejoin phase!
@@ -3467,6 +3493,11 @@ void MDCache::rejoin_send_rejoins()
{
dout(10) << "rejoin_send_rejoins with recovery_set " << recovery_set << dendl;
+ if (rejoin_gather.count(mds->get_nodeid())) {
+ dout(7) << "rejoin_send_rejoins still processing imported caps, delaying" << dendl;
+ rejoins_pending = true;
+ return;
+ }
if (!resolve_gather.empty()) {
dout(7) << "rejoin_send_rejoins still waiting for resolves ("
<< resolve_gather << ")" << dendl;
@@ -3476,12 +3507,6 @@ void MDCache::rejoin_send_rejoins()
map<int, MMDSCacheRejoin*> rejoins;
- // encode cap list once.
- bufferlist cap_export_bl;
- if (mds->is_rejoin()) {
- ::encode(cap_exports, cap_export_bl);
- ::encode(cap_export_paths, cap_export_bl);
- }
// if i am rejoining, send a rejoin to everyone.
// otherwise, just send to others who are rejoining.
@@ -3490,12 +3515,20 @@ void MDCache::rejoin_send_rejoins()
++p) {
if (*p == mds->get_nodeid()) continue; // nothing to myself!
if (rejoin_sent.count(*p)) continue; // already sent a rejoin to this node!
- if (mds->is_rejoin()) {
+ if (mds->is_rejoin())
rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_WEAK);
- rejoins[*p]->copy_cap_exports(cap_export_bl);
- } else if (mds->mdsmap->is_rejoin(*p))
+ else if (mds->mdsmap->is_rejoin(*p))
rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_STRONG);
- }
+ }
+
+ if (mds->is_rejoin()) {
+ for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = cap_exports.begin();
+ p != cap_exports.end();
+ p++) {
+ assert(cap_export_targets.count(p->first));
+ rejoins[cap_export_targets[p->first]]->cap_exports[p->first] = p->second;
+ }
+ }
assert(!migrator->is_importing());
assert(!migrator->is_exporting());
@@ -3821,7 +3854,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
p != weak->cap_exports.end();
++p) {
CInode *in = get_inode(p->first);
- if (!in || !in->is_auth()) continue;
+ assert(!in || in->is_auth());
for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin();
q != p->second.end();
++q) {
@@ -3838,16 +3871,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
p != weak->cap_exports.end();
++p) {
CInode *in = get_inode(p->first);
- if (in && !in->is_auth())
- continue;
- filepath& path = weak->cap_export_paths[p->first];
- if (!in) {
- if (!path_is_mine(path))
- continue;
- cap_import_paths[p->first] = path;
- dout(10) << " noting cap import " << p->first << " path " << path << dendl;
- }
-
+ assert(in && in->is_auth());
// note
for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin();
q != p->second.end();
@@ -4016,6 +4040,7 @@ public:
}
};
+#if 0
/**
* parallel_fetch -- make a pass at fetching a bunch of paths in parallel
*
@@ -4134,9 +4159,7 @@ bool MDCache::parallel_fetch_traverse_dir(inodeno_t ino, filepath& path,
missing.insert(ino);
return true;
}
-
-
-
+#endif
/*
* rejoin_scour_survivor_replica - remove source from replica list on unmentioned objects
@@ -4505,7 +4528,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
int from = ack->get_source().num();
// for sending cache expire message
- list<CInode*> isolated_inodes;
+ set<CInode*> isolated_inodes;
// dirs
for (map<dirfrag_t, MMDSCacheRejoin::dirfrag_strong>::iterator p = ack->strong_dirfrags.begin();
@@ -4521,19 +4544,20 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
diri = new CInode(this, false);
diri->inode.ino = p->first.ino;
diri->inode.mode = S_IFDIR;
- if (MDS_INO_MDSDIR(p->first.ino)) {
+ add_inode(diri);
+ if (MDS_INO_MDSDIR(from) == p->first.ino) {
diri->inode_auth = pair<int,int>(from, CDIR_AUTH_UNKNOWN);
- add_inode(diri);
dout(10) << " add inode " << *diri << dendl;
} else {
- diri->inode_auth = CDIR_AUTH_UNDEF;
- isolated_inodes.push_back(diri);
+ diri->inode_auth = CDIR_AUTH_DEFAULT;
+ isolated_inodes.insert(diri);
dout(10) << " unconnected dirfrag " << p->first << dendl;
}
}
// barebones dirfrag; the full dirfrag loop below will clean up.
dir = diri->add_dirfrag(new CDir(diri, p->first.frag, this, false));
- if (dir->authority().first != from)
+ if (dir->authority() != CDIR_AUTH_UNDEF &&
+ dir->authority().first != from)
adjust_subtree_auth(dir, from);
dout(10) << " add dirfrag " << *dir << dendl;
}
@@ -4598,6 +4622,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
in->get_parent_dir()->unlink_inode(in->get_parent_dn());
}
dn->dir->link_primary_inode(dn, in);
+ isolated_inodes.erase(in);
}
}
@@ -4659,20 +4684,9 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
dout(10) << " got inode locks " << *in << dendl;
}
- // trim unconnected subtree
- if (!isolated_inodes.empty()) {
- map<int, MCacheExpire*> expiremap;
- for (list<CInode*>::iterator p = isolated_inodes.begin();
- p != isolated_inodes.end();
- ++p) {
- list<CDir*> ls;
- (*p)->get_dirfrags(ls);
- trim_dirfrag(*ls.begin(), 0, expiremap);
- assert((*p)->get_num_ref() == 0);
- delete *p;
- }
- send_expire_messages(expiremap);
- }
+ // FIXME: This can happen if entire subtree, together with the inode subtree root
+ // belongs to, were trimmed between sending cache rejoin and receiving rejoin ack.
+ assert(isolated_inodes.empty());
// done?
assert(rejoin_ack_gather.count(from));
@@ -4840,16 +4854,9 @@ void MDCache::rejoin_gather_finish()
if (open_undef_inodes_dirfrags())
return;
- // fetch paths?
- // do this before ack, since some inodes we may have already gotten
- // from surviving MDSs.
- if (!cap_import_paths.empty()) {
- if (parallel_fetch(cap_import_paths, cap_imports_missing)) {
- return;
- }
- }
-
- process_imported_caps();
+ if (process_imported_caps())
+ return;
+
choose_lock_states_and_reconnect_caps();
identify_files_to_recover(rejoin_recover_q, rejoin_check_q);
@@ -4867,34 +4874,123 @@ void MDCache::rejoin_gather_finish()
}
}
-void MDCache::process_imported_caps()
+class C_MDC_RejoinOpenInoFinish: public Context {
+ MDCache *cache;
+ inodeno_t ino;
+public:
+ C_MDC_RejoinOpenInoFinish(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ void finish(int r) {
+ cache->rejoin_open_ino_finish(ino, r);
+ }
+};
+
+void MDCache::rejoin_open_ino_finish(inodeno_t ino, int ret)
+{
+ dout(10) << "open_caps_inode_finish ino " << ino << " ret " << ret << dendl;
+
+ if (ret < 0) {
+ cap_imports_missing.insert(ino);
+ } else if (ret == mds->get_nodeid()) {
+ assert(get_inode(ino));
+ } else {
+ map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > >::iterator p;
+ p = cap_imports.find(ino);
+ assert(p != cap_imports.end());
+ for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ assert(q->second.count(-1));
+ assert(q->second.size() == 1);
+ rejoin_export_caps(p->first, q->first, q->second[-1], ret);
+ }
+ cap_imports.erase(p);
+ }
+
+ assert(cap_imports_num_opening > 0);
+ cap_imports_num_opening--;
+
+ if (cap_imports_num_opening == 0) {
+ if (rejoin_gather.count(mds->get_nodeid()))
+ process_imported_caps();
+ else
+ rejoin_gather_finish();
+ }
+}
+
+bool MDCache::process_imported_caps()
{
dout(10) << "process_imported_caps" << dendl;
- // process cap imports
- // ino -> client -> frommds -> capex
- map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
- while (p != cap_imports.end()) {
+ map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p;
+ for (p = cap_imports.begin(); p != cap_imports.end(); ++p) {
CInode *in = get_inode(p->first);
- if (!in) {
- dout(10) << "process_imported_caps still missing " << p->first
- << ", will try again after replayed client requests"
- << dendl;
- ++p;
+ if (in) {
+ assert(in->is_auth());
+ cap_imports_missing.erase(p->first);
continue;
}
- for (map<client_t, map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
- q != p->second.end();
- ++q)
- for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
+ if (cap_imports_missing.count(p->first) > 0)
+ continue;
+
+ cap_imports_num_opening++;
+ dout(10) << " opening missing ino " << p->first << dendl;
+ open_ino(p->first, (int64_t)-1, new C_MDC_RejoinOpenInoFinish(this, p->first), false);
+ }
+
+ if (cap_imports_num_opening > 0)
+ return true;
+
+ // called by rejoin_gather_finish() ?
+ if (rejoin_gather.count(mds->get_nodeid()) == 0) {
+ // process cap imports
+ // ino -> client -> frommds -> capex
+ p = cap_imports.begin();
+ while (p != cap_imports.end()) {
+ CInode *in = get_inode(p->first);
+ if (!in) {
+ dout(10) << " still missing ino " << p->first
+ << ", will try again after replayed client requests" << dendl;
+ ++p;
+ continue;
+ }
+ assert(in->is_auth());
+ for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q)
+ for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
+ r != q->second.end();
+ ++r) {
+ dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl;
+ add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
+ rejoin_import_cap(in, q->first, r->second, r->first);
+ }
+ cap_imports.erase(p++); // remove and move on
+ }
+ } else {
+ for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator q = cap_exports.begin();
+ q != cap_exports.end();
+ q++) {
+ for (map<client_t,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
r != q->second.end();
++r) {
- dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl;
- add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
- rejoin_import_cap(in, q->first, r->second, r->first);
+ dout(10) << " exporting caps for client." << r->first << " ino " << q->first << dendl;
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(r->first.v));
+ assert(session);
+ // mark client caps stale.
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, q->first, 0, 0, 0);
+ mds->send_message_client_counted(m, session);
}
- cap_imports.erase(p++); // remove and move on
+ }
+
+ trim_non_auth();
+
+ rejoin_gather.erase(mds->get_nodeid());
+ maybe_send_pending_rejoins();
+
+ if (rejoin_gather.empty() && rejoin_ack_gather.count(mds->get_nodeid()))
+ rejoin_gather_finish();
}
+ return false;
}
void MDCache::check_realm_past_parents(SnapRealm *realm)
@@ -5056,9 +5152,12 @@ void MDCache::export_remaining_imported_caps()
{
dout(10) << "export_remaining_imported_caps" << dendl;
+ stringstream warn_str;
+
for (map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
p != cap_imports.end();
++p) {
+ warn_str << " ino " << p->first << "\n";
for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
q != p->second.end();
++q) {
@@ -5072,6 +5171,11 @@ void MDCache::export_remaining_imported_caps()
}
cap_imports.clear();
+
+ if (warn_str.peek() != EOF) {
+ mds->clog.warn() << "failed to reconnect caps for missing inodes:" << "\n";
+ mds->clog.warn(warn_str);
+ }
}
void MDCache::try_reconnect_cap(CInode *in, Session *session)
@@ -5216,9 +5320,22 @@ void MDCache::open_snap_parents()
gather.set_finisher(new C_MDC_OpenSnapParents(this));
gather.activate();
} else {
+ if (!reconnected_snaprealms.empty()) {
+ stringstream warn_str;
+ for (map<inodeno_t,map<client_t,snapid_t> >::iterator p = reconnected_snaprealms.begin();
+ p != reconnected_snaprealms.end();
+ ++p) {
+ warn_str << " unconnected snaprealm " << p->first << "\n";
+ for (map<client_t,snapid_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q)
+ warn_str << " client." << q->first << " snapid " << q->second << "\n";
+ }
+ mds->clog.warn() << "open_snap_parents has:" << "\n";
+ mds->clog.warn(warn_str);
+ }
assert(rejoin_waiters.empty());
assert(missing_snap_parents.empty());
- //assert(reconnected_snaprealms.empty()); // FIXME: need to properly address #5031
dout(10) << "open_snap_parents - all open" << dendl;
do_delayed_cap_imports();
@@ -5504,7 +5621,7 @@ void MDCache::queue_file_recover(CInode *in)
}
in->parent->first = in->first;
- le->metablob.add_primary_dentry(in->parent, true, in);
+ le->metablob.add_primary_dentry(in->parent, in, true);
mds->mdlog->submit_entry(le, new C_MDC_QueuedCow(this, in, mut));
mds->mdlog->flush();
}
@@ -5784,7 +5901,7 @@ void MDCache::truncate_inode_finish(CInode *in, LogSegment *ls)
EUpdate *le = new EUpdate(mds->mdlog, "truncate finish");
mds->mdlog->start_entry(le);
le->metablob.add_dir_context(in->get_parent_dir());
- le->metablob.add_primary_dentry(in->get_projected_parent_dn(), true, in);
+ le->metablob.add_primary_dentry(in->get_projected_parent_dn(), in, true);
le->metablob.add_truncate_finish(in->ino(), ls->offset);
journal_dirty_inode(mut, &le->metablob, in);
@@ -6133,7 +6250,6 @@ void MDCache::trim_inode(CDentry *dn, CInode *in, CDir *con, map<int, MCacheExpi
void MDCache::trim_non_auth()
{
dout(7) << "trim_non_auth" << dendl;
- stringstream warn_str_dirs;
// temporarily pin all subtree roots
for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
@@ -6167,22 +6283,18 @@ void MDCache::trim_non_auth()
assert(dir);
// unlink the dentry
- dout(15) << "trim_non_auth removing " << *dn << dendl;
+ dout(10) << " removing " << *dn << dendl;
if (dnl->is_remote()) {
dir->unlink_inode(dn);
}
else if (dnl->is_primary()) {
CInode *in = dnl->get_inode();
+ dout(10) << " removing " << *in << dendl;
list<CDir*> ls;
- warn_str_dirs << in->get_parent_dn()->get_name() << "\n";
in->get_dirfrags(ls);
for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
CDir *subdir = *p;
- filepath fp;
- subdir->get_inode()->make_path(fp);
- warn_str_dirs << fp << "\n";
- if (subdir->is_subtree_root())
- remove_subtree(subdir);
+ assert(!subdir->is_subtree_root());
in->close_dirfrag(subdir->dirfrag().frag);
}
dir->unlink_inode(dn);
@@ -6221,18 +6333,13 @@ void MDCache::trim_non_auth()
for (list<CDir*>::iterator p = ls.begin();
p != ls.end();
++p) {
- dout(0) << " ... " << **p << dendl;
- CInode *diri = (*p)->get_inode();
- filepath fp;
- diri->make_path(fp);
- warn_str_dirs << fp << "\n";
+ dout(10) << " removing " << **p << dendl;
assert((*p)->get_num_ref() == 1); // SUBTREE
remove_subtree((*p));
in->close_dirfrag((*p)->dirfrag().frag);
}
- dout(0) << " ... " << *in << dendl;
- if (in->get_parent_dn())
- warn_str_dirs << in->get_parent_dn()->get_name() << "\n";
+ dout(10) << " removing " << *in << dendl;
+ assert(!in->get_parent_dn());
assert(in->get_num_ref() == 0);
remove_inode(in);
}
@@ -6241,10 +6348,6 @@ void MDCache::trim_non_auth()
}
show_subtrees();
- if (warn_str_dirs.peek() != EOF) {
- mds->clog.info() << "trim_non_auth has deleted paths: " << "\n";
- mds->clog.info(warn_str_dirs);
- }
}
/**
@@ -7024,6 +7127,13 @@ void MDCache::dispatch(Message *m)
case MSG_MDS_FINDINOREPLY:
handle_find_ino_reply(static_cast<MMDSFindInoReply *>(m));
break;
+
+ case MSG_MDS_OPENINO:
+ handle_open_ino(static_cast<MMDSOpenIno *>(m));
+ break;
+ case MSG_MDS_OPENINOREPLY:
+ handle_open_ino_reply(static_cast<MMDSOpenInoReply *>(m));
+ break;
default:
dout(7) << "cache unknown message " << m->get_type() << dendl;
@@ -7232,8 +7342,8 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh
} else {
dout(7) << "remote link to " << dnl->get_remote_ino() << ", which i don't have" << dendl;
assert(mdr); // we shouldn't hit non-primary dentries doing a non-mdr traversal!
- open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin),
- (null_okay && depth == path.depth() - 1));
+ open_remote_dentry(dn, true, _get_waiter(mdr, req, fin),
+ (null_okay && depth == path.depth() - 1));
if (mds->logger) mds->logger->inc(l_mds_trino);
return 1;
}
@@ -7390,6 +7500,7 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh
return 0;
}
+#if 0
/**
* Find out if the MDS is auth for a given path.
*
@@ -7422,6 +7533,7 @@ bool MDCache::path_is_mine(filepath& path)
return cur->is_auth();
}
+#endif
CInode *MDCache::cache_traverse(const filepath& fp)
{
@@ -7678,36 +7790,51 @@ void MDCache::open_remote_ino_2(inodeno_t ino, vector<Anchor>& anchortrace, bool
struct C_MDC_OpenRemoteDentry : public Context {
MDCache *mdc;
CDentry *dn;
- bool projected;
+ inodeno_t ino;
Context *onfinish;
- C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, bool p, Context *f) :
- mdc(m), dn(d), projected(p), onfinish(f) {}
+ bool want_xlocked;
+ int mode;
+ C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, inodeno_t i, Context *f,
+ bool wx, int md) :
+ mdc(m), dn(d), ino(i), onfinish(f), want_xlocked(wx), mode(md) {}
void finish(int r) {
- mdc->_open_remote_dentry_finish(r, dn, projected, onfinish);
+ mdc->_open_remote_dentry_finish(dn, ino, onfinish, want_xlocked, mode, r);
}
};
-void MDCache::open_remote_dentry(CDentry *dn, bool projected, Context *fin)
+void MDCache::open_remote_dentry(CDentry *dn, bool projected, Context *fin, bool want_xlocked)
{
dout(10) << "open_remote_dentry " << *dn << dendl;
CDentry::linkage_t *dnl = projected ? dn->get_projected_linkage() : dn->get_linkage();
- open_remote_ino(dnl->get_remote_ino(),
- new C_MDC_OpenRemoteDentry(this, dn, projected, fin));
+ inodeno_t ino = dnl->get_remote_ino();
+ int mode = g_conf->mds_open_remote_link_mode;
+ Context *fin2 = new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked, mode);
+ if (mode == 0)
+ open_remote_ino(ino, fin2, want_xlocked); // anchor
+ else
+ open_ino(ino, -1, fin2, true, want_xlocked); // backtrace
}
-void MDCache::_open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin)
+void MDCache::_open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin,
+ bool want_xlocked, int mode, int r)
{
- if (r == -ENOENT) {
- dout(0) << "open_remote_dentry_finish bad remote dentry " << *dn << dendl;
- dn->state_set(CDentry::STATE_BADREMOTEINO);
- } else if (r != 0)
- assert(0);
- fin->finish(r);
- delete fin;
+ if (r < 0) {
+ if (mode == 0) {
+ dout(0) << "open_remote_dentry_finish bad remote dentry " << *dn << dendl;
+ dn->state_set(CDentry::STATE_BADREMOTEINO);
+ } else {
+ dout(7) << "open_remote_dentry_finish failed to open ino " << ino
+ << " for " << *dn << ", retry using anchortable" << dendl;
+ assert(mode == 1);
+ Context *fin2 = new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked, 0);
+ open_remote_ino(ino, fin2, want_xlocked);
+ return;
+ }
+ }
+ fin->complete(r < 0 ? r : 0);
}
-
void MDCache::make_trace(vector<CDentry*>& trace, CInode *in)
{
// empty trace if we're a base inode
@@ -7724,6 +7851,443 @@ void MDCache::make_trace(vector<CDentry*>& trace, CInode *in)
}
+// -------------------------------------------------------------------------------
+// Open inode by inode number
+
+class C_MDC_OpenInoBacktraceFetched : public Context {
+ MDCache *cache;
+ inodeno_t ino;
+ public:
+ bufferlist bl;
+ C_MDC_OpenInoBacktraceFetched(MDCache *c, inodeno_t i) :
+ cache(c), ino(i) {}
+ void finish(int r) {
+ cache->_open_ino_backtrace_fetched(ino, bl, r);
+ }
+};
+
+struct C_MDC_OpenInoTraverseDir : public Context {
+ MDCache *cache;
+ inodeno_t ino;
+ public:
+ C_MDC_OpenInoTraverseDir(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ void finish(int r) {
+ assert(cache->opening_inodes.count(ino));
+ cache->_open_ino_traverse_dir(ino, cache->opening_inodes[ino], r);
+ }
+};
+
+struct C_MDC_OpenInoParentOpened : public Context {
+ MDCache *cache;
+ inodeno_t ino;
+ public:
+ C_MDC_OpenInoParentOpened(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ void finish(int r) {
+ cache->_open_ino_parent_opened(ino, r);
+ }
+};
+
+void MDCache::_open_ino_backtrace_fetched(inodeno_t ino, bufferlist& bl, int err)
+{
+ dout(10) << "_open_ino_backtrace_fetched ino " << ino << " errno " << err << dendl;
+
+ assert(opening_inodes.count(ino));
+ open_ino_info_t& info = opening_inodes[ino];
+
+ CInode *in = get_inode(ino);
+ if (in) {
+ dout(10) << " found cached " << *in << dendl;
+ open_ino_finish(ino, info, in->authority().first);
+ return;
+ }
+
+ inode_backtrace_t backtrace;
+ if (err == 0) {
+ ::decode(backtrace, bl);
+ if (backtrace.pool != info.pool) {
+ dout(10) << " old object in pool " << info.pool
+ << ", retrying pool " << backtrace.pool << dendl;
+ info.pool = backtrace.pool;
+ C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino);
+ fetch_backtrace(ino, info.pool, fin->bl, fin);
+ return;
+ }
+ } else if (err == -ENOENT) {
+ int64_t meta_pool = mds->mdsmap->get_metadata_pool();
+ if (info.pool != meta_pool) {
+ dout(10) << " no object in pool " << info.pool
+ << ", retrying pool " << meta_pool << dendl;
+ info.pool = meta_pool;
+ C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino);
+ fetch_backtrace(ino, info.pool, fin->bl, fin);
+ return;
+ }
+ }
+
+ if (err == 0) {
+ if (backtrace.ancestors.empty()) {
+ dout(10) << " got empty backtrace " << dendl;
+ err = -EIO;
+ } else if (!info.ancestors.empty()) {
+ if (info.ancestors[0] == backtrace.ancestors[0]) {
+ dout(10) << " got same parents " << info.ancestors[0] << " 2 times" << dendl;
+ err = -EINVAL;
+ }
+ }
+ }
+ if (err) {
+ dout(10) << " failed to open ino " << ino << dendl;
+ open_ino_finish(ino, info, err);
+ return;
+ }
+
+ dout(10) << " got backtrace " << backtrace << dendl;
+ info.ancestors = backtrace.ancestors;
+
+ _open_ino_traverse_dir(ino, info, 0);
+}
+
+void MDCache::_open_ino_parent_opened(inodeno_t ino, int ret)
+{
+ dout(10) << "_open_ino_parent_opened ino " << ino << " ret " << ret << dendl;
+
+ assert(opening_inodes.count(ino));
+ open_ino_info_t& info = opening_inodes[ino];
+
+ CInode *in = get_inode(ino);
+ if (in) {
+ dout(10) << " found cached " << *in << dendl;
+ open_ino_finish(ino, info, in->authority().first);
+ return;
+ }
+
+ if (ret == mds->get_nodeid()) {
+ _open_ino_traverse_dir(ino, info, 0);
+ } else {
+ if (ret >= 0) {
+ info.check_peers = true;
+ info.auth_hint = ret;
+ info.checked.erase(ret);
+ }
+ do_open_ino(ino, info, ret);
+ }
+}
+
+Context* MDCache::_open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m)
+{
+ if (m)
+ return new C_MDS_RetryMessage(mds, m);
+ else
+ return new C_MDC_OpenInoTraverseDir(this, ino);
+}
+
+void MDCache::_open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int ret)
+{
+ dout(10) << "_open_ino_trvserse_dir ino " << ino << " ret " << ret << dendl;
+
+ CInode *in = get_inode(ino);
+ if (in) {
+ dout(10) << " found cached " << *in << dendl;
+ open_ino_finish(ino, info, in->authority().first);
+ return;
+ }
+
+ if (ret) {
+ do_open_ino(ino, info, ret);
+ return;
+ }
+
+ int hint = info.auth_hint;
+ ret = open_ino_traverse_dir(ino, NULL, info.ancestors,
+ info.discover, info.want_xlocked, &hint);
+ if (ret > 0)
+ return;
+ if (hint != mds->get_nodeid())
+ info.auth_hint = hint;
+ do_open_ino(ino, info, ret);
+}
+
+int MDCache::open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
+ vector<inode_backpointer_t>& ancestors,
+ bool discover, bool want_xlocked, int *hint)
+{
+ dout(10) << "open_ino_traverse_dir ino " << ino << " " << ancestors << dendl;
+ int err = 0;
+ for (unsigned i = 0; i < ancestors.size(); i++) {
+ CInode *diri = get_inode(ancestors[i].dirino);
+
+ if (!diri) {
+ if (discover && MDS_INO_IS_MDSDIR(ancestors[i].dirino)) {
+ open_foreign_mdsdir(ancestors[i].dirino, _open_ino_get_waiter(ino, m));
+ return 1;
+ }
+ continue;
+ }
+
+ if (diri->state_test(CInode::STATE_REJOINUNDEF))
+ continue;
+
+ if (!diri->is_dir()) {
+ dout(10) << " " << *diri << " is not dir" << dendl;
+ if (i == 0)
+ err = -ENOTDIR;
+ break;
+ }
+
+ string &name = ancestors[i].dname;
+ frag_t fg = diri->pick_dirfrag(name);
+ CDir *dir = diri->get_dirfrag(fg);
+ if (!dir) {
+ if (diri->is_auth()) {
+ if (diri->is_frozen()) {
+ dout(10) << " " << *diri << " is frozen, waiting " << dendl;
+ diri->add_waiter(CDir::WAIT_UNFREEZE, _open_ino_get_waiter(ino, m));
+ return 1;
+ }
+ dir = diri->get_or_open_dirfrag(this, fg);
+ } else if (discover) {
+ open_remote_dirfrag(diri, fg, _open_ino_get_waiter(ino, m));
+ return 1;
+ }
+ }
+ if (dir) {
+ inodeno_t next_ino = i > 0 ? ancestors[i - 1].dirino : ino;
+ if (dir->is_auth()) {
+ CDentry *dn = dir->lookup(name);
+ CDentry::linkage_t *dnl = dn ? dn->get_linkage() : NULL;
+
+ if (dnl && dnl->is_primary() &&
+ dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF)) {
+ dout(10) << " fetching undef " << *dnl->get_inode() << dendl;
+ dir->fetch(_open_ino_get_waiter(ino, m));
+ return 1;
+ }
+
+ if (!dnl && !dir->is_complete() &&
+ (!dir->has_bloom() || dir->is_in_bloom(name))) {
+ dout(10) << " fetching incomplete " << *dir << dendl;
+ dir->fetch(_open_ino_get_waiter(ino, m));
+ return 1;
+ }
+
+ dout(10) << " no ino " << next_ino << " in " << *dir << dendl;
+ if (i == 0)
+ err = -ENOENT;
+ } else if (discover) {
+ discover_ino(dir, next_ino, _open_ino_get_waiter(ino, m),
+ (i == 0 && want_xlocked));
+ return 1;
+ }
+ }
+ if (hint && i == 0)
+ *hint = dir ? dir->authority().first : diri->authority().first;
+ break;
+ }
+ return err;
+}
+
+void MDCache::open_ino_finish(inodeno_t ino, open_ino_info_t& info, int ret)
+{
+ dout(10) << "open_ino_finish ino " << ino << " ret " << ret << dendl;
+
+ finish_contexts(g_ceph_context, info.waiters, ret);
+ opening_inodes.erase(ino);
+}
+
+void MDCache::do_open_ino(inodeno_t ino, open_ino_info_t& info, int err)
+{
+ if (err < 0) {
+ info.checked.clear();
+ info.checked.insert(mds->get_nodeid());
+ info.checking = -1;
+ info.check_peers = true;
+ info.fetch_backtrace = true;
+ if (info.discover) {
+ info.discover = false;
+ info.ancestors.clear();
+ }
+ }
+
+ if (info.check_peers) {
+ info.check_peers = false;
+ info.checking = -1;
+ do_open_ino_peer(ino, info);
+ } else if (info.fetch_backtrace) {
+ info.check_peers = true;
+ info.fetch_backtrace = false;
+ info.checking = mds->get_nodeid();
+ info.checked.clear();
+ info.checked.insert(mds->get_nodeid());
+ C_MDC_OpenInoBacktraceFetched *fin = new C_MDC_OpenInoBacktraceFetched(this, ino);
+ fetch_backtrace(ino, info.pool, fin->bl, fin);
+ } else {
+ assert(!info.ancestors.empty());
+ info.checking = mds->get_nodeid();
+ open_ino(info.ancestors[0].dirino, mds->mdsmap->get_metadata_pool(),
+ new C_MDC_OpenInoParentOpened(this, ino), info.want_replica);
+ }
+}
+
+void MDCache::do_open_ino_peer(inodeno_t ino, open_ino_info_t& info)
+{
+ set<int> all, active;
+ mds->mdsmap->get_mds_set(all);
+ mds->mdsmap->get_clientreplay_or_active_or_stopping_mds_set(active);
+ if (mds->get_state() == MDSMap::STATE_REJOIN)
+ mds->mdsmap->get_mds_set(active, MDSMap::STATE_REJOIN);
+
+ dout(10) << "do_open_ino_peer " << ino << " active " << active
+ << " all " << all << " checked " << info.checked << dendl;
+
+ int peer = -1;
+ if (info.auth_hint >= 0) {
+ if (active.count(info.auth_hint)) {
+ peer = info.auth_hint;
+ info.auth_hint = -1;
+ }
+ } else {
+ for (set<int>::iterator p = active.begin(); p != active.end(); ++p)
+ if (*p != mds->get_nodeid() && info.checked.count(*p) == 0) {
+ peer = *p;
+ break;
+ }
+ }
+ if (peer < 0) {
+ if (all.size() > active.size() && all != info.checked) {
+ dout(10) << " waiting for more peers to be active" << dendl;
+ } else {
+ dout(10) << " all MDS peers have been checked " << dendl;
+ do_open_ino(ino, info, 0);
+ }
+ } else {
+ info.checking = peer;
+ mds->send_message_mds(new MMDSOpenIno(info.tid, ino, info.ancestors), peer);
+ }
+}
+
+void MDCache::handle_open_ino(MMDSOpenIno *m)
+{
+ dout(10) << "handle_open_ino " << *m << dendl;
+
+ inodeno_t ino = m->ino;
+ MMDSOpenInoReply *reply;
+ CInode *in = get_inode(ino);
+ if (in) {
+ dout(10) << " have " << *in << dendl;
+ reply = new MMDSOpenInoReply(m->get_tid(), ino, 0);
+ if (in->is_auth()) {
+ touch_inode(in);
+ while (1) {
+ CDentry *pdn = in->get_parent_dn();
+ if (!pdn)
+ break;
+ CInode *diri = pdn->get_dir()->get_inode();
+ reply->ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name,
+ in->inode.version));
+ in = diri;
+ }
+ } else {
+ reply->hint = in->authority().first;
+ }
+ } else {
+ int hint = -1;
+ int ret = open_ino_traverse_dir(ino, m, m->ancestors, false, false, &hint);
+ if (ret > 0)
+ return;
+ reply = new MMDSOpenInoReply(m->get_tid(), ino, hint, ret);
+ }
+ mds->messenger->send_message(reply, m->get_connection());
+ m->put();
+}
+
+void MDCache::handle_open_ino_reply(MMDSOpenInoReply *m)
+{
+ dout(10) << "handle_open_ino_reply " << *m << dendl;
+
+ inodeno_t ino = m->ino;
+ int from = m->get_source().num();
+ if (opening_inodes.count(ino)) {
+ open_ino_info_t& info = opening_inodes[ino];
+
+ if (info.checking == from)
+ info.checking = -1;
+ info.checked.insert(from);
+
+ CInode *in = get_inode(ino);
+ if (in) {
+ dout(10) << " found cached " << *in << dendl;
+ open_ino_finish(ino, info, in->authority().first);
+ } else if (!m->ancestors.empty()) {
+ dout(10) << " found ino " << ino << " on mds." << from << dendl;
+ if (!info.want_replica) {
+ open_ino_finish(ino, info, from);
+ return;
+ }
+
+ info.ancestors = m->ancestors;
+ info.auth_hint = from;
+ info.checking = mds->get_nodeid();
+ info.discover = true;
+ _open_ino_traverse_dir(ino, info, 0);
+ } else if (m->error) {
+ dout(10) << " error " << m->error << " from mds." << from << dendl;
+ do_open_ino(ino, info, m->error);
+ } else {
+ if (m->hint >= 0 && m->hint != mds->get_nodeid()) {
+ info.auth_hint = m->hint;
+ info.checked.erase(m->hint);
+ }
+ do_open_ino_peer(ino, info);
+ }
+ }
+ m->put();
+}
+
+void MDCache::kick_open_ino_peers(int who)
+{
+ dout(10) << "kick_open_ino_peers mds." << who << dendl;
+
+ for (map<inodeno_t, open_ino_info_t>::iterator p = opening_inodes.begin();
+ p != opening_inodes.end();
+ ++p) {
+ open_ino_info_t& info = p->second;
+ if (info.checking == who) {
+ dout(10) << " kicking ino " << p->first << " who was checking mds." << who << dendl;
+ info.checking = -1;
+ do_open_ino_peer(p->first, info);
+ } else if (info.checking == -1) {
+ dout(10) << " kicking ino " << p->first << " who was waiting" << dendl;
+ do_open_ino_peer(p->first, info);
+ }
+ }
+}
+
+void MDCache::open_ino(inodeno_t ino, int64_t pool, Context* fin,
+ bool want_replica, bool want_xlocked)
+{
+ dout(10) << "open_ino " << ino << " pool " << pool << " want_replica "
+ << want_replica << dendl;
+
+ if (opening_inodes.count(ino)) {
+ open_ino_info_t& info = opening_inodes[ino];
+ if (want_replica) {
+ info.want_replica = true;
+ if (want_xlocked)
+ info.want_xlocked = true;
+ }
+ info.waiters.push_back(fin);
+ } else {
+ open_ino_info_t& info = opening_inodes[ino];
+ info.checked.insert(mds->get_nodeid());
+ info.want_replica = want_replica;
+ info.want_xlocked = want_xlocked;
+ info.tid = ++open_ino_last_tid;
+ info.pool = pool >= 0 ? pool : mds->mdsmap->get_first_data_pool();
+ info.waiters.push_back(fin);
+ do_open_ino(ino, info, 0);
+ }
+}
+
/* ---------------------------- */
/*
@@ -8388,7 +8952,7 @@ void MDCache::snaprealm_create(MDRequest *mdr, CInode *in)
predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
journal_cow_inode(mut, &le->metablob, in);
- le->metablob.add_primary_dentry(in->get_projected_parent_dn(), true, in);
+ le->metablob.add_primary_dentry(in->get_projected_parent_dn(), in, true);
mds->mdlog->submit_entry(le, new C_MDC_snaprealm_create_finish(this, mdr, mut, in));
mds->mdlog->flush();
@@ -8631,6 +9195,20 @@ void MDCache::eval_remote(CDentry *dn)
}
}
+void MDCache::fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin)
+{
+ object_t oid = CInode::get_object_name(ino, frag_t(), "");
+ mds->objecter->getxattr(oid, object_locator_t(pool), "parent", CEPH_NOSNAP, &bl, 0, fin);
+}
+
+void MDCache::remove_backtrace(inodeno_t ino, int64_t pool, Context *fin)
+{
+ SnapContext snapc;
+ object_t oid = CInode::get_object_name(ino, frag_t(), "");
+ mds->objecter->removexattr(oid, object_locator_t(pool), "parent", snapc,
+ ceph_clock_now(g_ceph_context), 0, NULL, fin);
+}
+
class C_MDC_PurgeStrayPurged : public Context {
MDCache *cache;
CDentry *dn;
@@ -8645,13 +9223,12 @@ public:
class C_MDC_PurgeForwardingPointers : public Context {
MDCache *cache;
CDentry *dn;
- Context *fin;
public:
- inode_backtrace_t backtrace;
- C_MDC_PurgeForwardingPointers(MDCache *c, CDentry *d, Context *f) :
- cache(c), dn(d), fin(f) {}
+ bufferlist bl;
+ C_MDC_PurgeForwardingPointers(MDCache *c, CDentry *d) :
+ cache(c), dn(d) {}
void finish(int r) {
- cache->_purge_forwarding_pointers(&backtrace, dn, r, fin);
+ cache->_purge_forwarding_pointers(bl, dn, r);
}
};
@@ -8666,18 +9243,22 @@ public:
}
};
-void MDCache::_purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *d, int r, Context *fin)
+void MDCache::_purge_forwarding_pointers(bufferlist& bl, CDentry *dn, int r)
{
assert(r == 0 || r == -ENOENT || r == -ENODATA);
+ inode_backtrace_t backtrace;
+ if (r == 0)
+ ::decode(backtrace, bl);
+
// setup gathering context
C_GatherBuilder gather_bld(g_ceph_context);
// remove all the objects with forwarding pointer backtraces (aka sentinels)
- for (set<int64_t>::const_iterator i = backtrace->old_pools.begin();
- i != backtrace->old_pools.end();
+ for (set<int64_t>::const_iterator i = backtrace.old_pools.begin();
+ i != backtrace.old_pools.end();
++i) {
SnapContext snapc;
- object_t oid = CInode::get_object_name(backtrace->ino, frag_t(), "");
+ object_t oid = CInode::get_object_name(backtrace.ino, frag_t(), "");
object_locator_t oloc(*i);
mds->objecter->remove(oid, oloc, snapc, ceph_clock_now(g_ceph_context), 0,
@@ -8685,10 +9266,10 @@ void MDCache::_purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *
}
if (gather_bld.has_subs()) {
- gather_bld.set_finisher(fin);
+ gather_bld.set_finisher(new C_MDC_PurgeStray(this, dn));
gather_bld.activate();
} else {
- fin->finish(r);
+ _purge_stray(dn, r);
}
}
@@ -8752,17 +9333,12 @@ void MDCache::purge_stray(CDentry *dn)
if (in->is_dir()) {
dout(10) << "purge_stray dir ... implement me!" << dendl; // FIXME XXX
// remove the backtrace
- SnapContext snapc;
- object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
-
- mds->objecter->removexattr(oid, oloc, "parent", snapc, ceph_clock_now(g_ceph_context), 0,
- NULL, new C_MDC_PurgeStrayPurged(this, dn));
+ remove_backtrace(in->ino(), mds->mdsmap->get_metadata_pool(),
+ new C_MDC_PurgeStrayPurged(this, dn));
} else if (in->is_file()) {
// get the backtrace before blowing away the object
- C_MDC_PurgeStray *strayfin = new C_MDC_PurgeStray(this, dn);
- C_MDC_PurgeForwardingPointers *fpfin = new C_MDC_PurgeForwardingPointers(this, dn, strayfin);
- in->fetch_backtrace(&fpfin->backtrace, fpfin);
+ C_MDC_PurgeForwardingPointers *fin = new C_MDC_PurgeForwardingPointers(this, dn);
+ fetch_backtrace(in->ino(), in->get_inode().layout.fl_pg_pool, fin->bl, fin);
} else {
// not a dir or file; purged!
_purge_stray_purged(dn);
@@ -8837,7 +9413,7 @@ void MDCache::_purge_stray_purged(CDentry *dn, int r)
pi->version = in->pre_dirty();
le->metablob.add_dir_context(dn->dir);
- le->metablob.add_primary_dentry(dn, true, in);
+ le->metablob.add_primary_dentry(dn, in, true);
mds->mdlog->submit_entry(le, new C_MDC_PurgeStrayLoggedTruncate(this, dn, mds->mdlog->get_current_segment()));
}
@@ -9178,7 +9754,8 @@ void MDCache::handle_discover(MDiscover *dis)
snapid_t snapid = dis->get_snapid();
// get started.
- if (MDS_INO_IS_BASE(dis->get_base_ino())) {
+ if (MDS_INO_IS_BASE(dis->get_base_ino()) &&
+ !dis->wants_base_dir() && dis->get_want().depth() == 0) {
// wants root
dout(7) << "handle_discover from mds." << from
<< " wants base + " << dis->get_want().get_path()
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index d837586a3ac..3da8a36f799 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -53,6 +53,8 @@ class MDentryUnlink;
class MLock;
class MMDSFindIno;
class MMDSFindInoReply;
+class MMDSOpenIno;
+class MMDSOpenInoReply;
class Message;
class MClientRequest;
@@ -291,7 +293,7 @@ public:
}
void log_master_commit(metareqid_t reqid);
void logged_master_update(metareqid_t reqid);
- void _logged_master_commit(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters);
+ void _logged_master_commit(metareqid_t reqid);
void committed_master_slave(metareqid_t r, int from);
void finish_committed_masters();
@@ -323,6 +325,9 @@ protected:
LogSegment *ls;
list<Context*> waiters;
bool safe;
+ bool committing;
+ bool recovering;
+ umaster() : committing(false), recovering(false) {}
};
map<metareqid_t, umaster> uncommitted_masters; // master: req -> slave set
@@ -407,11 +412,12 @@ protected:
set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack
map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
- map<inodeno_t,filepath> cap_export_paths;
+ map<inodeno_t,int> cap_export_targets; // ino -> auth mds
map<inodeno_t,map<client_t,map<int,ceph_mds_cap_reconnect> > > cap_imports; // ino -> client -> frommds -> capex
map<inodeno_t,filepath> cap_import_paths;
set<inodeno_t> cap_imports_missing;
+ int cap_imports_num_opening;
set<CInode*> rejoin_undef_inodes;
set<CInode*> rejoin_potential_updated_scatterlocks;
@@ -426,7 +432,6 @@ protected:
void handle_cache_rejoin_weak(MMDSCacheRejoin *m);
CInode* rejoin_invent_inode(inodeno_t ino, snapid_t last);
CDir* rejoin_invent_dirfrag(dirfrag_t df);
- bool rejoin_fetch_dirfrags(MMDSCacheRejoin *m);
void handle_cache_rejoin_strong(MMDSCacheRejoin *m);
void rejoin_scour_survivor_replicas(int from, MMDSCacheRejoin *ack,
set<SimpleLock *>& gather_locks,
@@ -442,11 +447,13 @@ protected:
rejoin_send_rejoins();
}
public:
+ void rejoin_start();
void rejoin_gather_finish();
void rejoin_send_rejoins();
- void rejoin_export_caps(inodeno_t ino, client_t client, cap_reconnect_t& icr) {
- cap_exports[ino][client] = icr.capinfo;
- cap_export_paths[ino] = filepath(icr.path, (uint64_t)icr.capinfo.pathbase);
+ void rejoin_export_caps(inodeno_t ino, client_t client, ceph_mds_cap_reconnect& capinfo,
+ int target=-1) {
+ cap_exports[ino][client] = capinfo;
+ cap_export_targets[ino] = target;
}
void rejoin_recovered_caps(inodeno_t ino, client_t client, cap_reconnect_t& icr,
int frommds=-1) {
@@ -477,7 +484,10 @@ public:
void add_reconnected_snaprealm(client_t client, inodeno_t ino, snapid_t seq) {
reconnected_snaprealms[ino][client] = seq;
}
- void process_imported_caps();
+
+ friend class C_MDC_RejoinOpenInoFinish;
+ void rejoin_open_ino_finish(inodeno_t ino, int ret);
+ bool process_imported_caps();
void choose_lock_states_and_reconnect_caps();
void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
map<client_t,MClientSnap*>& splits);
@@ -744,15 +754,59 @@ public:
void open_remote_ino_2(inodeno_t ino,
vector<Anchor>& anchortrace, bool want_xlocked,
inodeno_t hadino, version_t hadv, Context *onfinish);
- void open_remote_dentry(CDentry *dn, bool projected, Context *fin);
- void _open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin);
bool parallel_fetch(map<inodeno_t,filepath>& pathmap, set<inodeno_t>& missing);
bool parallel_fetch_traverse_dir(inodeno_t ino, filepath& path,
set<CDir*>& fetch_queue, set<inodeno_t>& missing,
C_GatherBuilder &gather_bld);
+ void open_remote_dentry(CDentry *dn, bool projected, Context *fin,
+ bool want_xlocked=false);
+ void _open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin,
+ bool want_xlocked, int mode, int r);
+
void make_trace(vector<CDentry*>& trace, CInode *in);
+
+protected:
+ struct open_ino_info_t {
+ vector<inode_backpointer_t> ancestors;
+ set<int> checked;
+ int checking;
+ int auth_hint;
+ bool check_peers;
+ bool fetch_backtrace;
+ bool discover;
+ bool want_replica;
+ bool want_xlocked;
+ version_t tid;
+ int64_t pool;
+ list<Context*> waiters;
+ open_ino_info_t() : checking(-1), auth_hint(-1),
+ check_peers(true), fetch_backtrace(true), discover(false) {}
+ };
+ tid_t open_ino_last_tid;
+ map<inodeno_t,open_ino_info_t> opening_inodes;
+
+ void _open_ino_backtrace_fetched(inodeno_t ino, bufferlist& bl, int err);
+ void _open_ino_parent_opened(inodeno_t ino, int ret);
+ void _open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int err);
+ Context* _open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m);
+ int open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
+ vector<inode_backpointer_t>& ancestors,
+ bool discover, bool want_xlocked, int *hint);
+ void open_ino_finish(inodeno_t ino, open_ino_info_t& info, int err);
+ void do_open_ino(inodeno_t ino, open_ino_info_t& info, int err);
+ void do_open_ino_peer(inodeno_t ino, open_ino_info_t& info);
+ void handle_open_ino(MMDSOpenIno *m);
+ void handle_open_ino_reply(MMDSOpenInoReply *m);
+ friend class C_MDC_OpenInoBacktraceFetched;
+ friend class C_MDC_OpenInoTraverseDir;
+ friend class C_MDC_OpenInoParentOpened;
+
+public:
+ void kick_open_ino_peers(int who);
+ void open_ino(inodeno_t ino, int64_t pool, Context *fin,
+ bool want_replica=true, bool want_xlocked=false);
// -- find_ino_peer --
struct find_ino_peer_info_t {
@@ -817,12 +871,15 @@ public:
eval_stray(dn);
}
protected:
- void _purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *dn, int r, Context *fin);
+ void fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin);
+ void remove_backtrace(inodeno_t ino, int64_t pool, Context *fin);
+ void _purge_forwarding_pointers(bufferlist& bl, CDentry *dn, int r);
void _purge_stray(CDentry *dn, int r);
void purge_stray(CDentry *dn);
void _purge_stray_purged(CDentry *dn, int r=0);
void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls);
void _purge_stray_logged_truncate(CDentry *dn, LogSegment *ls);
+ friend class C_MDC_FetchedBacktrace;
friend class C_MDC_PurgeForwardingPointers;
friend class C_MDC_PurgeStray;
friend class C_MDC_PurgeStrayLogged;
diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc
index 53897432522..c4773131d3c 100644
--- a/src/mds/MDLog.cc
+++ b/src/mds/MDLog.cc
@@ -619,10 +619,10 @@ void MDLog::standby_trim_segments()
seg->dirty_inodes.clear_list();
seg->dirty_dentries.clear_list();
seg->open_files.clear_list();
+ seg->dirty_parent_inodes.clear_list();
seg->dirty_dirfrag_dir.clear_list();
seg->dirty_dirfrag_nest.clear_list();
seg->dirty_dirfrag_dirfragtree.clear_list();
- seg->update_backtraces.clear_list();
remove_oldest_segment();
removed_segment = true;
}
diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc
index d5d6001151e..552f103f126 100644
--- a/src/mds/MDS.cc
+++ b/src/mds/MDS.cc
@@ -976,6 +976,8 @@ void MDS::handle_mds_map(MMDSMap *m)
resolve_start();
} else if (is_reconnect()) {
reconnect_start();
+ } else if (is_rejoin()) {
+ rejoin_start();
} else if (is_clientreplay()) {
clientreplay_start();
} else if (is_creating()) {
@@ -1012,12 +1014,7 @@ void MDS::handle_mds_map(MMDSMap *m)
if (g_conf->mds_dump_cache_after_rejoin &&
oldmap->is_rejoining() && !mdsmap->is_rejoining())
mdcache->dump_cache(); // for DEBUG only
- }
- if (oldmap->is_degraded() && !mdsmap->is_degraded() && state >= MDSMap::STATE_ACTIVE)
- dout(1) << "cluster recovered." << dendl;
- // did someone go active?
- if (is_clientreplay() || is_active() || is_stopping()) {
// ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
set<int> olddis, dis;
oldmap->get_mds_set(olddis, MDSMap::STATE_ACTIVE);
@@ -1028,9 +1025,17 @@ void MDS::handle_mds_map(MMDSMap *m)
mdsmap->get_mds_set(dis, MDSMap::STATE_REJOIN);
for (set<int>::iterator p = dis.begin(); p != dis.end(); ++p)
if (*p != whoami && // not me
- olddis.count(*p) == 0) // newly so?
+ olddis.count(*p) == 0) { // newly so?
mdcache->kick_discovers(*p);
+ mdcache->kick_open_ino_peers(*p);
+ }
+ }
+ if (oldmap->is_degraded() && !mdsmap->is_degraded() && state >= MDSMap::STATE_ACTIVE)
+ dout(1) << "cluster recovered." << dendl;
+
+ // did someone go active?
+ if (is_clientreplay() || is_active() || is_stopping()) {
set<int> oldactive, active;
oldmap->get_mds_set(oldactive, MDSMap::STATE_ACTIVE);
oldmap->get_mds_set(oldactive, MDSMap::STATE_CLIENTREPLAY);
@@ -1461,9 +1466,13 @@ void MDS::reconnect_done()
void MDS::rejoin_joint_start()
{
dout(1) << "rejoin_joint_start" << dendl;
- mdcache->finish_committed_masters();
mdcache->rejoin_send_rejoins();
}
+void MDS::rejoin_start()
+{
+ dout(1) << "rejoin_start" << dendl;
+ mdcache->rejoin_start();
+}
void MDS::rejoin_done()
{
dout(1) << "rejoin_done" << dendl;
diff --git a/src/mds/MDS.h b/src/mds/MDS.h
index 88d9fe2931e..4e69dcaf8f9 100644
--- a/src/mds/MDS.h
+++ b/src/mds/MDS.h
@@ -35,7 +35,7 @@
#include "SessionMap.h"
-#define CEPH_MDS_PROTOCOL 16 /* cluster internal */
+#define CEPH_MDS_PROTOCOL 17 /* cluster internal */
enum {
@@ -376,6 +376,7 @@ class MDS : public Dispatcher {
void reconnect_start();
void reconnect_done();
void rejoin_joint_start();
+ void rejoin_start();
void rejoin_done();
void recovery_done();
void clientreplay_start();
diff --git a/src/mds/MDSMap.h b/src/mds/MDSMap.h
index c5bc1c36460..3e2f67e01de 100644
--- a/src/mds/MDSMap.h
+++ b/src/mds/MDSMap.h
@@ -308,6 +308,13 @@ public:
if (p->second.state >= STATE_REPLAY && p->second.state <= STATE_STOPPING)
s.insert(p->second.rank);
}
+ void get_clientreplay_or_active_or_stopping_mds_set(set<int>& s) {
+ for (map<uint64_t,mds_info_t>::const_iterator p = mds_info.begin();
+ p != mds_info.end();
+ ++p)
+ if (p->second.state >= STATE_CLIENTREPLAY && p->second.state <= STATE_STOPPING)
+ s.insert(p->second.rank);
+ }
void get_mds_set(set<int>& s, int state) {
for (map<uint64_t,mds_info_t>::const_iterator p = mds_info.begin();
p != mds_info.end();
diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc
index e7f4e27fecb..92962424e46 100644
--- a/src/mds/Migrator.cc
+++ b/src/mds/Migrator.cc
@@ -397,7 +397,7 @@ void Migrator::handle_mds_failure_or_stop(int who)
cache->get_subtree_bounds(dir, bounds);
import_remove_pins(dir, bounds);
- // adjust auth back to me
+ // adjust auth back to the exporter
cache->adjust_subtree_auth(dir, import_peer[df]);
cache->try_subtree_merge(dir); // NOTE: may journal subtree_map as side-effect
@@ -1026,6 +1026,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl,
map<client_t,Capability::Export> cap_map;
in->export_client_caps(cap_map);
::encode(cap_map, bl);
+ ::encode(in->get_mds_caps_wanted(), bl);
in->state_set(CInode::STATE_EXPORTINGCAPS);
in->get(CInode::PIN_EXPORTINGCAPS);
@@ -1067,10 +1068,6 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini
{
dout(12) << "finish_export_inode " << *in << dendl;
- in->finish_export(now);
-
- finish_export_inode_caps(in);
-
// clean
if (in->is_dirty())
in->mark_clean();
@@ -1102,9 +1099,15 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini
in->item_open_file.remove_myself();
+ in->clear_dirty_parent();
+
// waiters
in->take_waiting(CInode::WAIT_ANY_MASK, finished);
+
+ in->finish_export(now);
+ finish_export_inode_caps(in);
+
// *** other state too?
// move to end of LRU so we drop out of cache quickly!
@@ -1219,9 +1222,6 @@ void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t no
if (dir->is_dirty())
dir->mark_clean();
-
- // discard most dir state
- dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things.
// suck up all waiters
dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
@@ -1587,27 +1587,26 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
dout(7) << "handle_export_discover on " << m->get_path() << dendl;
- if (!mds->mdcache->is_open()) {
- dout(5) << " waiting for root" << dendl;
- mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
- return;
- }
-
// note import state
dirfrag_t df = m->get_dirfrag();
-
// only start discovering on this message once.
if (!m->started) {
m->started = true;
+ import_pending_msg[df] = m;
import_state[df] = IMPORT_DISCOVERING;
import_peer[df] = from;
+ } else {
+ // am i retrying after ancient path_traverse results?
+ if (import_pending_msg.count(df) == 0 || import_pending_msg[df] != m) {
+ dout(7) << " dropping obsolete message" << dendl;
+ m->put();
+ return;
+ }
}
- // am i retrying after ancient path_traverse results?
- if (import_state.count(df) == 0 ||
- import_state[df] != IMPORT_DISCOVERING) {
- dout(7) << "hmm import_state is off, i must be obsolete lookup" << dendl;
- m->put();
+ if (!mds->mdcache->is_open()) {
+ dout(5) << " waiting for root" << dendl;
+ mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
return;
}
@@ -1633,6 +1632,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
import_state[m->get_dirfrag()] = IMPORT_DISCOVERED;
+ import_pending_msg.erase(m->get_dirfrag());
// pin inode in the cache (for now)
assert(in->is_dir());
@@ -1647,6 +1647,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
void Migrator::import_reverse_discovering(dirfrag_t df)
{
+ import_pending_msg.erase(df);
import_state.erase(df);
import_peer.erase(df);
}
@@ -1661,6 +1662,7 @@ void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
void Migrator::import_reverse_prepping(CDir *dir)
{
+ import_pending_msg.erase(dir->dirfrag());
set<CDir*> bounds;
cache->map_dirfrag_set(import_bound_ls[dir], bounds);
import_remove_pins(dir, bounds);
@@ -1685,6 +1687,12 @@ void Migrator::handle_export_cancel(MExportDirCancel *m)
} else if (import_state[df] == IMPORT_PREPPED) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
+ set<CDir*> bounds;
+ cache->get_subtree_bounds(dir, bounds);
+ import_remove_pins(dir, bounds);
+ // adjust auth back to the exportor
+ cache->adjust_subtree_auth(dir, import_peer[df]);
+ cache->try_subtree_merge(dir);
import_reverse_unfreeze(dir);
} else {
assert(0 == "got export_cancel in weird state");
@@ -1698,32 +1706,29 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
int oldauth = m->get_source().num();
assert(oldauth != mds->get_nodeid());
- // make sure we didn't abort
- if (import_state.count(m->get_dirfrag()) == 0 ||
- (import_state[m->get_dirfrag()] != IMPORT_DISCOVERED &&
- import_state[m->get_dirfrag()] != IMPORT_PREPPING) ||
- import_peer[m->get_dirfrag()] != oldauth) {
- dout(10) << "handle_export_prep import has aborted, dropping" << dendl;
- m->put();
- return;
- }
-
- CInode *diri = cache->get_inode(m->get_dirfrag().ino);
- assert(diri);
-
+ CDir *dir;
+ CInode *diri;
list<Context*> finished;
// assimilate root dir.
- CDir *dir;
-
if (!m->did_assim()) {
+ diri = cache->get_inode(m->get_dirfrag().ino);
+ assert(diri);
bufferlist::iterator p = m->basedir.begin();
dir = cache->add_replica_dir(p, diri, oldauth, finished);
dout(7) << "handle_export_prep on " << *dir << " (first pass)" << dendl;
} else {
+ if (import_pending_msg.count(m->get_dirfrag()) == 0 ||
+ import_pending_msg[m->get_dirfrag()] != m) {
+ dout(7) << "handle_export_prep obsolete message, dropping" << dendl;
+ m->put();
+ return;
+ }
+
dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
dout(7) << "handle_export_prep on " << *dir << " (subsequent pass)" << dendl;
+ diri = dir->get_inode();
}
assert(dir->is_auth() == false);
@@ -1742,16 +1747,17 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
if (!m->did_assim()) {
dout(7) << "doing assim on " << *dir << dendl;
m->mark_assim(); // only do this the first time!
+ import_pending_msg[dir->dirfrag()] = m;
+
+ // change import state
+ import_state[dir->dirfrag()] = IMPORT_PREPPING;
+ import_bound_ls[dir] = m->get_bounds();
+ assert(g_conf->mds_kill_import_at != 3);
// move pin to dir
diri->put(CInode::PIN_IMPORTING);
dir->get(CDir::PIN_IMPORTING);
dir->state_set(CDir::STATE_IMPORTING);
-
- // change import state
- import_state[dir->dirfrag()] = IMPORT_PREPPING;
- assert(g_conf->mds_kill_import_at != 3);
- import_bound_ls[dir] = m->get_bounds();
// bystander list
import_bystanders[dir] = m->get_bystanders();
@@ -1868,6 +1874,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
// note new state
import_state[dir->dirfrag()] = IMPORT_PREPPED;
+ import_pending_msg.erase(dir->dirfrag());
assert(g_conf->mds_kill_import_at != 4);
// done
m->put();
@@ -2072,6 +2079,8 @@ void Migrator::import_reverse(CDir *dir)
if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
in->clear_scatter_dirty();
+ in->clear_dirty_parent();
+
in->authlock.clear_gather();
in->linklock.clear_gather();
in->dirfragtreelock.clear_gather();
@@ -2379,7 +2388,8 @@ void Migrator::decode_import_inode_caps(CInode *in,
{
map<client_t,Capability::Export> cap_map;
::decode(cap_map, blp);
- if (!cap_map.empty()) {
+ ::decode(in->get_mds_caps_wanted(), blp);
+ if (!cap_map.empty() || !in->get_mds_caps_wanted().empty()) {
cap_imports[in].swap(cap_map);
in->get(CInode::PIN_IMPORTINGCAPS);
}
@@ -2388,8 +2398,6 @@ void Migrator::decode_import_inode_caps(CInode *in,
void Migrator::finish_import_inode_caps(CInode *in, int from,
map<client_t,Capability::Export> &cap_map)
{
- assert(!cap_map.empty());
-
for (map<client_t,Capability::Export>::iterator it = cap_map.begin();
it != cap_map.end();
++it) {
@@ -2406,6 +2414,7 @@ void Migrator::finish_import_inode_caps(CInode *in, int from,
mds->mdcache->do_cap_import(session, in, cap);
}
+ in->replica_caps_wanted = 0;
in->put(CInode::PIN_IMPORTINGCAPS);
}
@@ -2514,7 +2523,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
// add dentry to journal entry
if (le)
- le->metablob.add_dentry(dn, dn->is_dirty());
+ le->metablob.add_import_dentry(dn);
}
#ifdef MDS_VERIFY_FRAGSTAT
diff --git a/src/mds/Migrator.h b/src/mds/Migrator.h
index f395bc1d237..70b59bc0f97 100644
--- a/src/mds/Migrator.h
+++ b/src/mds/Migrator.h
@@ -116,6 +116,7 @@ public:
protected:
map<dirfrag_t,int> import_state; // FIXME make these dirfrags
map<dirfrag_t,int> import_peer;
+ map<dirfrag_t,Message*> import_pending_msg;
map<CDir*,set<int> > import_bystanders;
map<CDir*,list<dirfrag_t> > import_bound_ls;
map<CDir*,list<ScatterLock*> > import_updated_scatterlocks;
diff --git a/src/mds/Mutation.cc b/src/mds/Mutation.cc
index 4e4f69cf31e..3916b2a1a33 100644
--- a/src/mds/Mutation.cc
+++ b/src/mds/Mutation.cc
@@ -30,6 +30,13 @@ void Mutation::pin(MDSCacheObject *o)
}
}
+void Mutation::unpin(MDSCacheObject *o)
+{
+ assert(pins.count(o));
+ o->put(MDSCacheObject::PIN_REQUEST);
+ pins.erase(o);
+}
+
void Mutation::set_stickydirs(CInode *in)
{
if (stickydirs.count(in) == 0) {
diff --git a/src/mds/Mutation.h b/src/mds/Mutation.h
index de122a57552..c0bea19d16e 100644
--- a/src/mds/Mutation.h
+++ b/src/mds/Mutation.h
@@ -113,6 +113,7 @@ struct Mutation {
// pin items in cache
void pin(MDSCacheObject *o);
+ void unpin(MDSCacheObject *o);
void set_stickydirs(CInode *in);
void drop_pins();
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index 7889a2cb73c..98dafc3e285 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -635,25 +635,16 @@ void Server::handle_client_reconnect(MClientReconnect *m)
continue;
}
- filepath path(p->second.path, (uint64_t)p->second.capinfo.pathbase);
if (in && !in->is_auth()) {
// not mine.
- dout(0) << "non-auth " << p->first << " " << path
- << ", will pass off to authority" << dendl;
-
- // mark client caps stale.
- MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0);
- //stale->head.migrate_seq = 0; // FIXME ******
- mds->send_message_client_counted(stale, session);
-
+ dout(10) << "non-auth " << *in << ", will pass off to authority" << dendl;
// add to cap export list.
- mdcache->rejoin_export_caps(p->first, from, p->second);
+ mdcache->rejoin_export_caps(p->first, from, p->second.capinfo,
+ in->authority().first);
} else {
// don't know if the inode is mine
- dout(0) << "missing " << p->first << " " << path
- << " will load or export later" << dendl;
+ dout(10) << "missing ino " << p->first << ", will load later" << dendl;
mdcache->rejoin_recovered_caps(p->first, from, p->second, -1);
- mdcache->rejoin_export_caps(p->first, from, p->second);
}
}
@@ -1797,6 +1788,24 @@ CDentry* Server::prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dn
return dn;
}
+CDentry* Server::prepare_stray_dentry(MDRequest *mdr, CInode *in)
+{
+ CDentry *straydn = mdr->straydn;
+ if (straydn) {
+ string name;
+ in->name_stray_dentry(name);
+ if (straydn->get_name() == name)
+ return straydn;
+
+ assert(!mdr->done_locking);
+ mdr->unpin(straydn);
+ }
+
+ straydn = mdcache->get_or_create_stray_dentry(in);
+ mdr->straydn = straydn;
+ mdr->pin(straydn);
+ return straydn;
+}
/** prepare_new_inode
*
@@ -2670,6 +2679,7 @@ public:
// dirty inode, dn, dir
newi->inode.version--; // a bit hacky, see C_MDS_mknod_finish
newi->mark_dirty(newi->inode.version+1, mdr->ls);
+ newi->_mark_dirty_parent(mdr->ls);
mdr->apply();
@@ -2679,8 +2689,6 @@ public:
mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR);
- mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
-
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_extra_bl(mdr->reply_extra_bl);
mds->server->reply_request(mdr, reply);
@@ -2803,6 +2811,7 @@ void Server::handle_client_openc(MDRequest *mdr)
dn->push_projected_linkage(in);
in->inode.version = dn->pre_dirty();
+ in->inode.update_backtrace();
if (cmode & CEPH_FILE_MODE_WR) {
in->inode.client_ranges[client].range.first = 0;
in->inode.client_ranges[client].range.last = in->inode.get_layout_size_increment();
@@ -2821,7 +2830,7 @@ void Server::handle_client_openc(MDRequest *mdr)
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
journal_allocated_inos(mdr, &le->metablob);
mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
- le->metablob.add_primary_dentry(dn, true, in);
+ le->metablob.add_primary_dentry(dn, in, true, true);
// do the open
mds->locker->issue_new_caps(in, cmode, mdr->session, realm, req->is_replay());
@@ -3086,8 +3095,6 @@ public:
void finish(int r) {
assert(r == 0);
- int64_t old_pool = in->inode.layout.fl_pg_pool;
-
// apply
in->pop_and_dirty_projected_inode(mdr->ls);
mdr->apply();
@@ -3104,16 +3111,6 @@ public:
if (changed_ranges)
mds->locker->share_inode_max_size(in);
-
- // if pool changed, queue a new backtrace and set forward pointer on old
- if (old_pool != in->inode.layout.fl_pg_pool) {
- mdr->ls->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool);
- mdr->ls->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
-
- // set forwarding pointer on old backtrace
- mdr->ls->remove_pending_backtraces(in->ino(), old_pool);
- mdr->ls->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool);
- }
}
};
@@ -3494,8 +3491,6 @@ void Server::handle_client_setlayout(MDRequest *mdr)
EUpdate *le = new EUpdate(mdlog, "setlayout");
mdlog->start_entry(le);
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
- // add the old pool to the metablob to indicate the pool changed with this event
- le->metablob.add_old_pool(old_pool);
mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(mdr, &le->metablob, cur);
@@ -3753,16 +3748,14 @@ void Server::handle_set_vxattr(MDRequest *mdr, CInode *cur,
}
pi->version = cur->pre_dirty();
+ if (cur->is_file())
+ pi->update_backtrace();
// log + wait
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "set vxattr layout");
mdlog->start_entry(le);
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
- if (cur->is_file()) {
- assert(old_pool != -1);
- le->metablob.add_old_pool(old_pool);
- }
mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(mdr, &le->metablob, cur);
@@ -3995,6 +3988,7 @@ public:
// a new version of hte inode since it's just been created)
newi->inode.version--;
newi->mark_dirty(newi->inode.version + 1, mdr->ls);
+ newi->_mark_dirty_parent(mdr->ls);
// mkdir?
if (newi->inode.is_dir()) {
@@ -4014,15 +4008,6 @@ public:
// hit pop
mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR);
- // store the backtrace on the 'parent' xattr
- if (newi->inode.is_dir()) {
- // if its a dir, put it in the metadata pool
- mdr->ls->queue_backtrace_update(newi, mds->mdsmap->get_metadata_pool());
- } else {
- // if its a file, put it in the data pool for that file
- mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
- }
-
// reply
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_result(0);
@@ -4077,6 +4062,7 @@ void Server::handle_client_mknod(MDRequest *mdr)
newi->inode.mode |= S_IFREG;
newi->inode.version = dn->pre_dirty();
newi->inode.rstat.rfiles = 1;
+ newi->inode.update_backtrace();
// if the client created a _regular_ file via MKNOD, it's highly likely they'll
// want to write to it (e.g., if they are reexporting NFS)
@@ -4117,7 +4103,7 @@ void Server::handle_client_mknod(MDRequest *mdr)
mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(),
PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
- le->metablob.add_primary_dentry(dn, true, newi);
+ le->metablob.add_primary_dentry(dn, newi, true, true);
journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(mds, mdr, dn, newi, follows));
}
@@ -4157,6 +4143,7 @@ void Server::handle_client_mkdir(MDRequest *mdr)
newi->inode.version = dn->pre_dirty();
newi->inode.rstat.rsubdirs = 1;
+ newi->inode.update_backtrace();
dout(12) << " follows " << follows << dendl;
if (follows >= dn->first)
@@ -4175,7 +4162,7 @@ void Server::handle_client_mkdir(MDRequest *mdr)
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
journal_allocated_inos(mdr, &le->metablob);
mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
- le->metablob.add_primary_dentry(dn, true, newi);
+ le->metablob.add_primary_dentry(dn, newi, true, true);
le->metablob.add_new_dir(newdir); // dirty AND complete AND new
// issue a cap on the directory
@@ -4233,6 +4220,7 @@ void Server::handle_client_symlink(MDRequest *mdr)
newi->inode.rstat.rbytes = newi->inode.size;
newi->inode.rstat.rfiles = 1;
newi->inode.version = dn->pre_dirty();
+ newi->inode.update_backtrace();
if (follows >= dn->first)
dn->first = follows + 1;
@@ -4245,7 +4233,7 @@ void Server::handle_client_symlink(MDRequest *mdr)
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
journal_allocated_inos(mdr, &le->metablob);
mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
- le->metablob.add_primary_dentry(dn, true, newi);
+ le->metablob.add_primary_dentry(dn, newi, true, true);
journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(mds, mdr, dn, newi, follows));
}
@@ -4435,8 +4423,14 @@ void Server::_link_remote(MDRequest *mdr, bool inc, CDentry *dn, CInode *targeti
// 1. send LinkPrepare to dest (journal nlink++ prepare)
int linkauth = targeti->authority().first;
if (mdr->more()->witnessed.count(linkauth) == 0) {
- dout(10) << " targeti auth must prepare nlink++/--" << dendl;
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(linkauth)) {
+ dout(10) << " targeti auth mds." << linkauth << " is not active" << dendl;
+ if (mdr->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(linkauth, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+ dout(10) << " targeti auth must prepare nlink++/--" << dendl;
int op;
if (inc)
op = MMDSSlaveRequest::OP_LINKPREP;
@@ -4777,7 +4771,7 @@ void Server::do_link_rollback(bufferlist &rbl, int master, MDRequest *mdr)
mdlog->start_entry(le);
le->commit.add_dir_context(parent);
le->commit.add_dir(parent, true);
- le->commit.add_primary_dentry(in->get_projected_parent_dn(), true, 0);
+ le->commit.add_primary_dentry(in->get_projected_parent_dn(), 0, true);
mdlog->submit_entry(le, new C_MDS_LoggedLinkRollback(this, mut, mdr));
mdlog->flush();
@@ -4899,18 +4893,14 @@ void Server::handle_client_unlink(MDRequest *mdr)
}
// -- create stray dentry? --
- CDentry *straydn = mdr->straydn;
+ CDentry *straydn = NULL;
if (dnl->is_primary()) {
- if (!straydn) {
- straydn = mdcache->get_or_create_stray_dentry(dnl->get_inode());
- mdr->pin(straydn);
- mdr->straydn = straydn;
- }
- } else if (straydn)
- straydn = NULL;
- if (straydn)
+ straydn = prepare_stray_dentry(mdr, dnl->get_inode());
dout(10) << " straydn is " << *straydn << dendl;
-
+ } else if (mdr->straydn) {
+ mdr->unpin(mdr->straydn);
+ mdr->straydn = NULL;
+ }
// lock
set<SimpleLock*> rdlocks, wrlocks, xlocks;
@@ -4996,7 +4986,8 @@ void Server::handle_client_unlink(MDRequest *mdr)
} else if (mdr->more()->waiting_on_slave.count(*p)) {
dout(10) << " already waiting on witness mds." << *p << dendl;
} else {
- _rmdir_prepare_witness(mdr, *p, dn, straydn);
+ if (!_rmdir_prepare_witness(mdr, *p, dn, straydn))
+ return;
}
}
if (!mdr->more()->waiting_on_slave.empty())
@@ -5075,7 +5066,8 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn)
if (in->snaprealm || follows + 1 > dn->first)
in->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm());
- le->metablob.add_primary_dentry(straydn, true, in);
+ pi->update_backtrace();
+ le->metablob.add_primary_dentry(straydn, in, true, true);
} else {
// remote link. update remote inode.
mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_DIR, -1);
@@ -5158,10 +5150,16 @@ void Server::_unlink_local_finish(MDRequest *mdr,
dn->get_dir()->try_remove_unlinked_dn(dn);
}
-void Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn)
+bool Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn)
{
- dout(10) << "_rmdir_prepare_witness mds." << who << " for " << *mdr << dendl;
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) {
+ dout(10) << "_rmdir_prepare_witness mds." << who << " is not active" << dendl;
+ if (mdr->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(who, new C_MDS_RetryRequest(mdcache, mdr));
+ return false;
+ }
+ dout(10) << "_rmdir_prepare_witness mds." << who << dendl;
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_RMDIRPREP);
dn->make_path(req->srcdnpath);
@@ -5174,6 +5172,7 @@ void Server::_rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentr
assert(mdr->more()->waiting_on_slave.count(who) == 0);
mdr->more()->waiting_on_slave.insert(who);
+ return true;
}
struct C_MDS_SlaveRmdirPrep : public Context {
@@ -5228,7 +5227,7 @@ void Server::handle_slave_rmdir_prep(MDRequest *mdr)
le->rollback = mdr->more()->rollback_bl;
le->commit.add_dir_context(straydn->get_dir());
- le->commit.add_primary_dentry(straydn, true, in);
+ le->commit.add_primary_dentry(straydn, in, true);
// slave: no need to journal original dentry
dout(10) << " noting renamed (unlinked) dir ino " << in->ino() << " in metablob" << dendl;
@@ -5362,7 +5361,7 @@ void Server::do_rmdir_rollback(bufferlist &rbl, int master, MDRequest *mdr)
mdlog->start_entry(le);
le->commit.add_dir_context(dn->get_dir());
- le->commit.add_primary_dentry(dn, true, in);
+ le->commit.add_primary_dentry(dn, in, true);
// slave: no need to journal straydn
dout(10) << " noting renamed (unlinked) dir ino " << in->ino() << " in metablob" << dendl;
@@ -5654,17 +5653,14 @@ void Server::handle_client_rename(MDRequest *mdr)
dout(10) << " this is a link merge" << dendl;
// -- create stray dentry? --
- CDentry *straydn = mdr->straydn;
+ CDentry *straydn = NULL;
if (destdnl->is_primary() && !linkmerge) {
- if (!straydn) {
- straydn = mdcache->get_or_create_stray_dentry(destdnl->get_inode());
- mdr->pin(straydn);
- mdr->straydn = straydn;
- }
- } else if (straydn)
- straydn = NULL;
- if (straydn)
+ straydn = prepare_stray_dentry(mdr, destdnl->get_inode());
dout(10) << " straydn is " << *straydn << dendl;
+ } else if (mdr->straydn) {
+ mdr->unpin(mdr->straydn);
+ mdr->straydn = NULL;
+ }
// -- prepare witness list --
/*
@@ -5873,7 +5869,8 @@ void Server::handle_client_rename(MDRequest *mdr)
} else if (mdr->more()->waiting_on_slave.count(*p)) {
dout(10) << " already waiting on witness mds." << *p << dendl;
} else {
- _rename_prepare_witness(mdr, *p, witnesses, srcdn, destdn, straydn);
+ if (!_rename_prepare_witness(mdr, *p, witnesses, srcdn, destdn, straydn))
+ return;
}
}
if (!mdr->more()->waiting_on_slave.empty())
@@ -5951,20 +5948,6 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe
// did we import srci? if so, explicitly ack that import that, before we unlock and reply.
assert(g_conf->mds_kill_rename_at != 7);
- // backtrace
- if (destdnl->inode->is_dir()) {
- // replace previous backtrace on this inode with myself
- mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), mds->mdsmap->get_metadata_pool());
- // queue an updated backtrace
- mdr->ls->queue_backtrace_update(destdnl->inode, mds->mdsmap->get_metadata_pool());
-
- } else {
- // remove all pending backtraces going to the same pool
- mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), destdnl->inode->inode.layout.fl_pg_pool);
- // queue an updated backtrace
- mdr->ls->queue_backtrace_update(destdnl->inode, destdnl->inode->inode.layout.fl_pg_pool);
- }
- assert(g_conf->mds_kill_rename_at != 8);
// reply
MClientReply *reply = new MClientReply(mdr->client_request, 0);
@@ -5979,9 +5962,16 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe
// helpers
-void Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse,
+bool Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse,
CDentry *srcdn, CDentry *destdn, CDentry *straydn)
{
+ if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) {
+ dout(10) << "_rename_prepare_witness mds." << who << " is not active" << dendl;
+ if (mdr->more()->waiting_on_slave.empty())
+ mds->wait_for_active_peer(who, new C_MDS_RetryRequest(mdcache, mdr));
+ return false;
+ }
+
dout(10) << "_rename_prepare_witness mds." << who << dendl;
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_RENAMEPREP);
@@ -5999,6 +5989,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse
assert(mdr->more()->waiting_on_slave.count(who) == 0);
mdr->more()->waiting_on_slave.insert(who);
+ return true;
}
version_t Server::_rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferlist *client_map_bl)
@@ -6133,6 +6124,7 @@ void Server::_rename_prepare(MDRequest *mdr,
if (destdn->is_auth()) {
tpi = oldin->project_inode(); //project_snaprealm
tpi->version = straydn->pre_dirty(tpi->version);
+ tpi->update_backtrace();
}
straydn->push_projected_linkage(oldin);
} else if (destdnl->is_remote()) {
@@ -6187,6 +6179,7 @@ void Server::_rename_prepare(MDRequest *mdr,
pi = srci->project_inode(); // project snaprealm if srcdnl->is_primary
// & srcdnl->snaprealm
pi->version = mdr->more()->pvmap[destdn] = destdn->pre_dirty(oldpv);
+ pi->update_backtrace();
}
destdn->push_projected_linkage(srci);
}
@@ -6198,7 +6191,6 @@ void Server::_rename_prepare(MDRequest *mdr,
if (!silent) {
if (pi) {
- pi->last_renamed_version = pi->version;
pi->ctime = mdr->now;
if (linkmerge)
pi->nlink--;
@@ -6252,11 +6244,11 @@ void Server::_rename_prepare(MDRequest *mdr,
if (oldin->snaprealm || src_realm->get_newest_seq() + 1 > srcdn->first)
oldin->project_past_snaprealm_parent(straydn->get_dir()->inode->find_snaprealm());
straydn->first = MAX(oldin->first, next_dest_snap);
- metablob->add_primary_dentry(straydn, true, oldin);
+ metablob->add_primary_dentry(straydn, oldin, true, true);
} else if (force_journal_stray) {
dout(10) << " forced journaling straydn " << *straydn << dendl;
metablob->add_dir_context(straydn->get_dir());
- metablob->add_primary_dentry(straydn, true, oldin);
+ metablob->add_primary_dentry(straydn, oldin, true);
}
} else if (destdnl->is_remote()) {
if (oldin->is_auth()) {
@@ -6264,7 +6256,7 @@ void Server::_rename_prepare(MDRequest *mdr,
metablob->add_dir_context(oldin->get_projected_parent_dir());
mdcache->journal_cow_dentry(mdr, metablob, oldin->get_projected_parent_dn(),
CEPH_NOSNAP, 0, destdnl);
- metablob->add_primary_dentry(oldin->get_projected_parent_dn(), true, oldin);
+ metablob->add_primary_dentry(oldin->get_projected_parent_dn(), oldin, true);
}
}
}
@@ -6282,7 +6274,7 @@ void Server::_rename_prepare(MDRequest *mdr,
if (srci->get_projected_parent_dn()->is_auth()) { // it's remote
metablob->add_dir_context(srci->get_projected_parent_dir());
mdcache->journal_cow_dentry(mdr, metablob, srci->get_projected_parent_dn(), CEPH_NOSNAP, 0, srcdnl);
- metablob->add_primary_dentry(srci->get_projected_parent_dn(), true, srci);
+ metablob->add_primary_dentry(srci->get_projected_parent_dn(), srci, true);
}
} else {
if (destdn->is_auth() && !destdnl->is_null())
@@ -6291,7 +6283,7 @@ void Server::_rename_prepare(MDRequest *mdr,
destdn->first = MAX(destdn->first, next_dest_snap);
if (destdn->is_auth())
- metablob->add_primary_dentry(destdn, true, destdnl->get_inode());
+ metablob->add_primary_dentry(destdn, destdnl->get_inode(), true, true);
}
} else if (srcdnl->is_primary()) {
// project snap parent update?
@@ -6305,11 +6297,21 @@ void Server::_rename_prepare(MDRequest *mdr,
destdn->first = MAX(destdn->first, next_dest_snap);
if (destdn->is_auth())
- metablob->add_primary_dentry(destdn, true, srci);
+ metablob->add_primary_dentry(destdn, srci, true, true);
else if (force_journal_dest) {
dout(10) << " forced journaling destdn " << *destdn << dendl;
metablob->add_dir_context(destdn->get_dir());
- metablob->add_primary_dentry(destdn, true, srci);
+ metablob->add_primary_dentry(destdn, srci, true);
+ if (srcdn->is_auth() && srci->is_dir()) {
+ // journal new subtrees root dirfrags
+ list<CDir*> ls;
+ srci->get_dirfrags(ls);
+ for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
+ CDir *dir = *p;
+ if (dir->is_auth())
+ metablob->add_dir(dir, true);
+ }
+ }
}
}
@@ -6321,7 +6323,7 @@ void Server::_rename_prepare(MDRequest *mdr,
// both primary and NULL dentries. Because during journal replay, null dentry is
// processed after primary dentry.
if (srcdnl->is_primary() && !srci->is_dir() && !destdn->is_auth())
- metablob->add_primary_dentry(srcdn, true, srci);
+ metablob->add_primary_dentry(srcdn, srci, true);
metablob->add_null_dentry(srcdn, true);
} else
dout(10) << " NOT journaling srcdn " << *srcdn << dendl;
@@ -6341,8 +6343,6 @@ void Server::_rename_prepare(MDRequest *mdr,
if (srci->is_dir())
mdcache->project_subtree_rename(srci, srcdn->get_dir(), destdn->get_dir());
- // always update the backtrace
- metablob->update_backtrace();
}
@@ -6789,23 +6789,10 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
mdlog->flush();
} else {
if (srcdn->is_auth() && destdnl->is_primary()) {
-
dout(10) << " reversing inode export of " << *destdnl->get_inode() << dendl;
destdnl->get_inode()->abort_export();
-
- // unfreeze
- assert(destdnl->get_inode()->is_frozen_inode());
- destdnl->get_inode()->unfreeze_inode(finished);
}
- // singleauth
- if (mdr->more()->is_ambiguous_auth) {
- mdr->more()->rename_inode->clear_ambiguous_auth(finished);
- mdr->more()->is_ambiguous_auth = false;
- }
-
- mds->queue_waiters(finished);
-
// abort
// rollback_bl may be empty if we froze the inode but had to provide an expanded
// witness list from the master, and they failed before we tried prep again.
@@ -6813,11 +6800,20 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
if (mdcache->is_ambiguous_slave_update(mdr->reqid, mdr->slave_to_mds)) {
mdcache->remove_ambiguous_slave_update(mdr->reqid, mdr->slave_to_mds);
// rollback but preserve the slave request
- do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, NULL);
+ do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr, false);
} else
- do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr);
+ do_rename_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr, true);
} else {
dout(10) << " rollback_bl empty, not rollback back rename (master failed after getting extra witnesses?)" << dendl;
+ // singleauth
+ if (mdr->more()->is_ambiguous_auth) {
+ if (srcdn->is_auth())
+ mdr->more()->rename_inode->unfreeze_inode(finished);
+
+ mdr->more()->rename_inode->clear_ambiguous_auth(finished);
+ mdr->more()->is_ambiguous_auth = false;
+ }
+ mds->queue_waiters(finished);
mds->mdcache->request_finish(mdr);
}
}
@@ -6862,15 +6858,20 @@ struct C_MDS_LoggedRenameRollback : public Context {
version_t srcdnpv;
CDentry *destdn;
CDentry *straydn;
+ bool finish_mdr;
C_MDS_LoggedRenameRollback(Server *s, Mutation *m, MDRequest *r,
- CDentry *sd, version_t pv, CDentry *dd, CDentry *st) :
- server(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd), straydn(st) {}
+ CDentry *sd, version_t pv, CDentry *dd,
+ CDentry *st, bool f) :
+ server(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd),
+ straydn(st), finish_mdr(f) {}
void finish(int r) {
- server->_rename_rollback_finish(mut, mdr, srcdn, srcdnpv, destdn, straydn);
+ server->_rename_rollback_finish(mut, mdr, srcdn, srcdnpv,
+ destdn, straydn, finish_mdr);
}
};
-void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
+void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr,
+ bool finish_mdr)
{
rename_rollback rollback;
bufferlist::iterator p = rbl.begin();
@@ -7000,7 +7001,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
}
if (straydn)
- destdn->push_projected_linkage();
+ straydn->push_projected_linkage();
if (target) {
inode_t *ti = NULL;
@@ -7032,7 +7033,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
if (srcdn && (srcdn->authority().first == whoami || force_journal_src)) {
le->commit.add_dir_context(srcdir);
if (rollback.orig_src.ino)
- le->commit.add_primary_dentry(srcdn, true);
+ le->commit.add_primary_dentry(srcdn, 0, true);
else
le->commit.add_remote_dentry(srcdn, true);
}
@@ -7040,7 +7041,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
if (force_journal_dest) {
assert(rollback.orig_dest.ino);
le->commit.add_dir_context(destdir);
- le->commit.add_primary_dentry(destdn, true);
+ le->commit.add_primary_dentry(destdn, 0, true);
}
// slave: no need to journal straydn
@@ -7048,7 +7049,7 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
if (target && target->authority().first == whoami) {
assert(rollback.orig_dest.remote_ino);
le->commit.add_dir_context(target->get_projected_parent_dir());
- le->commit.add_primary_dentry(target->get_projected_parent_dn(), true, target);
+ le->commit.add_primary_dentry(target->get_projected_parent_dn(), target, true);
}
if (force_journal_dest) {
@@ -7069,15 +7070,16 @@ void Server::do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr)
mdcache->project_subtree_rename(in, destdir, srcdir);
}
- mdlog->submit_entry(le, new C_MDS_LoggedRenameRollback(this, mut, mdr,
- srcdn, srcdnpv, destdn, straydn));
+ mdlog->submit_entry(le, new C_MDS_LoggedRenameRollback(this, mut, mdr, srcdn, srcdnpv,
+ destdn, straydn, finish_mdr));
mdlog->flush();
}
void Server::_rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn,
- version_t srcdnpv, CDentry *destdn, CDentry *straydn)
+ version_t srcdnpv, CDentry *destdn,
+ CDentry *straydn, bool finish_mdr)
{
- dout(10) << "_rename_rollback_finish" << mut->reqid << dendl;
+ dout(10) << "_rename_rollback_finish " << mut->reqid << dendl;
if (straydn) {
straydn->get_dir()->unlink_inode(straydn);
@@ -7123,8 +7125,19 @@ void Server::_rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *src
mdcache->try_trim_non_auth_subtree(root);
}
- if (mdr)
- mds->mdcache->request_finish(mdr);
+ if (mdr) {
+ list<Context*> finished;
+ if (mdr->more()->is_ambiguous_auth) {
+ if (srcdn->is_auth())
+ mdr->more()->rename_inode->unfreeze_inode(finished);
+
+ mdr->more()->rename_inode->clear_ambiguous_auth(finished);
+ mdr->more()->is_ambiguous_auth = false;
+ }
+ mds->queue_waiters(finished);
+ if (finish_mdr)
+ mds->mdcache->request_finish(mdr);
+ }
mds->mdcache->finish_rollback(mut->reqid);
diff --git a/src/mds/Server.h b/src/mds/Server.h
index 15c8077c984..35a405b58eb 100644
--- a/src/mds/Server.h
+++ b/src/mds/Server.h
@@ -120,6 +120,7 @@ public:
CDir *validate_dentry_dir(MDRequest *mdr, CInode *diri, const string& dname);
CDir *traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, filepath refpath);
CDentry *prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dname, bool okexist=false);
+ CDentry *prepare_stray_dentry(MDRequest *mdr, CInode *in);
CInode* prepare_new_inode(MDRequest *mdr, CDir *dir, inodeno_t useino, unsigned mode,
ceph_file_layout *layout=NULL);
void journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob);
@@ -206,7 +207,7 @@ public:
void _unlink_local_finish(MDRequest *mdr,
CDentry *dn, CDentry *straydn,
version_t);
- void _rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn);
+ bool _rmdir_prepare_witness(MDRequest *mdr, int who, CDentry *dn, CDentry *straydn);
void handle_slave_rmdir_prep(MDRequest *mdr);
void _logged_slave_rmdir(MDRequest *mdr, CDentry *srcdn, CDentry *straydn);
void _commit_slave_rmdir(MDRequest *mdr, int r);
@@ -226,7 +227,7 @@ public:
void _rmsnap_finish(MDRequest *mdr, CInode *diri, snapid_t snapid);
// helpers
- void _rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse,
+ bool _rename_prepare_witness(MDRequest *mdr, int who, set<int> &witnesse,
CDentry *srcdn, CDentry *destdn, CDentry *straydn);
version_t _rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferlist *client_map_bl);
bool _need_force_journal(CInode *diri, bool empty);
@@ -243,9 +244,9 @@ public:
void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
- void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr);
- void _rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn,
- version_t srcdnpv, CDentry *destdn, CDentry *staydn);
+ void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, bool finish_mdr=false);
+ void _rename_rollback_finish(Mutation *mut, MDRequest *mdr, CDentry *srcdn, version_t srcdnpv,
+ CDentry *destdn, CDentry *staydn, bool finish_mdr);
};
diff --git a/src/mds/events/EMetaBlob.h b/src/mds/events/EMetaBlob.h
index 439bd78bc8f..b91303a1328 100644
--- a/src/mds/events/EMetaBlob.h
+++ b/src/mds/events/EMetaBlob.h
@@ -59,6 +59,9 @@ public:
* the struct_v in the encode function!
*/
struct fullbit {
+ static const int STATE_DIRTY = (1<<0);
+ static const int STATE_DIRTYPARENT = (1<<1);
+ static const int STATE_DIRTYPOOL = (1<<2);
string dn; // dentry
snapid_t dnfirst, dnlast;
version_t dnv;
@@ -67,7 +70,7 @@ public:
map<string,bufferptr> xattrs;
string symlink;
bufferlist snapbl;
- bool dirty;
+ __u8 state;
typedef map<snapid_t, old_inode_t> old_inodes_t;
old_inodes_t old_inodes;
@@ -79,7 +82,7 @@ public:
fullbit(const string& d, snapid_t df, snapid_t dl,
version_t v, const inode_t& i, const fragtree_t &dft,
const map<string,bufferptr> &xa, const string& sym,
- const bufferlist &sbl, bool dr,
+ const bufferlist &sbl, __u8 st,
const old_inodes_t *oi = NULL) :
//dn(d), dnfirst(df), dnlast(dl), dnv(v),
//inode(i), dirfragtree(dft), xattrs(xa), symlink(sym), snapbl(sbl), dirty(dr)
@@ -97,7 +100,7 @@ public:
::encode(dft, _enc);
::encode(sbl, _enc);
}
- ::encode(dr, _enc);
+ ::encode(st, _enc);
::encode(oi ? true : false, _enc);
if (oi)
::encode(*oi, _enc);
@@ -114,11 +117,28 @@ public:
static void generate_test_instances(list<EMetaBlob::fullbit*>& ls);
void update_inode(MDS *mds, CInode *in);
+ bool is_dirty() const { return (state & STATE_DIRTY); }
+ bool is_dirty_parent() const { return (state & STATE_DIRTYPARENT); }
+ bool is_dirty_pool() const { return (state & STATE_DIRTYPOOL); }
void print(ostream& out) const {
out << " fullbit dn " << dn << " [" << dnfirst << "," << dnlast << "] dnv " << dnv
<< " inode " << inode.ino
- << " dirty=" << dirty << std::endl;
+ << " state=" << state << std::endl;
+ }
+ string state_string() const {
+ string state_string;
+ bool marked_already = false;
+ if (is_dirty()) {
+ state_string.append("dirty");
+ marked_already = true;
+ }
+ if (is_dirty_parent()) {
+ state_string.append(marked_already ? "+dirty_parent" : "dirty_parent");
+ if (is_dirty_pool())
+ state_string.append("+dirty_pool");
+ }
+ return state_string;
}
};
WRITE_CLASS_ENCODER(fullbit)
@@ -318,9 +338,6 @@ private:
// idempotent op(s)
list<pair<metareqid_t,uint64_t> > client_reqs;
- int64_t old_pool;
- bool update_bt;
-
public:
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
@@ -414,11 +431,15 @@ private:
}
// return remote pointer to to-be-journaled inode
- void add_primary_dentry(CDentry *dn, bool dirty, CInode *in=0) {
- add_primary_dentry(add_dir(dn->get_dir(), false),
- dn, dirty, in);
+ void add_primary_dentry(CDentry *dn, CInode *in, bool dirty,
+ bool dirty_parent=false, bool dirty_pool=false) {
+ __u8 state = 0;
+ if (dirty) state |= fullbit::STATE_DIRTY;
+ if (dirty_parent) state |= fullbit::STATE_DIRTYPARENT;
+ if (dirty_pool) state |= fullbit::STATE_DIRTYPOOL;
+ add_primary_dentry(add_dir(dn->get_dir(), false), dn, in, state);
}
- void add_primary_dentry(dirlump& lump, CDentry *dn, bool dirty, CInode *in=0) {
+ void add_primary_dentry(dirlump& lump, CDentry *dn, CInode *in, __u8 state) {
if (!in)
in = dn->get_projected_linkage()->get_inode();
@@ -439,16 +460,26 @@ private:
*pi, in->dirfragtree,
*in->get_projected_xattrs(),
in->symlink, snapbl,
- dirty,
+ state,
&in->old_inodes)));
}
// convenience: primary or remote? figure it out.
void add_dentry(CDentry *dn, bool dirty) {
dirlump& lump = add_dir(dn->get_dir(), false);
- add_dentry(lump, dn, dirty);
+ add_dentry(lump, dn, dirty, false, false);
+ }
+ void add_import_dentry(CDentry *dn) {
+ bool dirty_parent = false;
+ bool dirty_pool = false;
+ if (dn->get_linkage()->is_primary()) {
+ dirty_parent = dn->get_linkage()->get_inode()->is_dirty_parent();
+ dirty_pool = dn->get_linkage()->get_inode()->is_dirty_pool();
+ }
+ dirlump& lump = add_dir(dn->get_dir(), false);
+ add_dentry(lump, dn, dn->is_dirty(), dirty_parent, dirty_pool);
}
- void add_dentry(dirlump& lump, CDentry *dn, bool dirty) {
+ void add_dentry(dirlump& lump, CDentry *dn, bool dirty, bool dirty_parent, bool dirty_pool) {
// primary or remote
if (dn->get_projected_linkage()->is_remote()) {
add_remote_dentry(dn, dirty);
@@ -458,7 +489,7 @@ private:
return;
}
assert(dn->get_projected_linkage()->is_primary());
- add_primary_dentry(dn, dirty);
+ add_primary_dentry(dn, 0, dirty, dirty_parent, dirty_pool);
}
void add_root(bool dirty, CInode *in, inode_t *pi=0, fragtree_t *pdft=0, bufferlist *psnapbl=0,
@@ -484,9 +515,9 @@ private:
}
string empty;
- roots.push_back(std::tr1::shared_ptr<fullbit>(new fullbit(empty, in->first, in->last,
- 0, *pi, *pdft, *px, in->symlink,
- snapbl, dirty,
+ roots.push_back(std::tr1::shared_ptr<fullbit>(new fullbit(empty, in->first, in->last, 0, *pi,
+ *pdft, *px, in->symlink, snapbl,
+ dirty ? fullbit::STATE_DIRTY : 0,
&in->old_inodes)));
}
@@ -522,13 +553,6 @@ private:
static const int TO_ROOT = 1;
void add_dir_context(CDir *dir, int mode = TO_AUTH_SUBTREE_ROOT);
-
- void add_old_pool(int64_t pool) {
- old_pool = pool;
- }
- void update_backtrace() {
- update_bt = true;
- }
void print(ostream& out) const {
out << "[metablob";
diff --git a/src/mds/events/EOpen.h b/src/mds/events/EOpen.h
index 792540ef5da..1267cf0af72 100644
--- a/src/mds/events/EOpen.h
+++ b/src/mds/events/EOpen.h
@@ -34,7 +34,7 @@ public:
void add_clean_inode(CInode *in) {
if (!in->is_base()) {
metablob.add_dir_context(in->get_projected_parent_dn()->get_dir());
- metablob.add_primary_dentry(in->get_projected_parent_dn(), false, 0);
+ metablob.add_primary_dentry(in->get_projected_parent_dn(), 0, false);
inos.push_back(in->ino());
}
}
diff --git a/src/mds/inode_backtrace.h b/src/mds/inode_backtrace.h
index d223f724a99..2d80ae3efad 100644
--- a/src/mds/inode_backtrace.h
+++ b/src/mds/inode_backtrace.h
@@ -35,6 +35,10 @@ struct inode_backpointer_t {
};
WRITE_CLASS_ENCODER(inode_backpointer_t)
+inline bool operator==(const inode_backpointer_t& l, const inode_backpointer_t& r) {
+ return l.dirino == r.dirino && l.version == r.version && l.dname == r.dname;
+}
+
inline ostream& operator<<(ostream& out, const inode_backpointer_t& ib) {
return out << "<" << ib.dirino << "/" << ib.dname << " v" << ib.version << ">";
}
diff --git a/src/mds/journal.cc b/src/mds/journal.cc
index b8139e3a05b..9eb0e73feba 100644
--- a/src/mds/journal.cc
+++ b/src/mds/journal.cc
@@ -185,9 +185,16 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
assert(g_conf->mds_kill_journal_expire_at != 3);
// backtraces to be stored/updated
- for (elist<BacktraceInfo*>::iterator p = update_backtraces.begin(); !p.end(); ++p) {
- BacktraceInfo *btinfo = *p;
- store_backtrace_update(mds, btinfo, gather_bld.new_sub());
+ for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
+ CInode *in = *p;
+ assert(in->is_auth());
+ if (in->can_auth_pin()) {
+ dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
+ in->store_backtrace(gather_bld.new_sub());
+ } else {
+ dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
+ }
}
assert(g_conf->mds_kill_journal_expire_at != 4);
@@ -267,101 +274,6 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
}
}
-// ----------------------------
-// backtrace handling
-
-// BacktraceInfo is used for keeping the
-// current state of the backtrace to be stored later on
-// logsegment expire. Constructing a BacktraceInfo
-// automatically puts it on the LogSegment list that is passed in,
-// after building the backtrace based on the current state of the inode. We
-// construct the backtrace here to avoid keeping a ref to the inode.
-BacktraceInfo::BacktraceInfo(
- int64_t l, CInode *i, LogSegment *ls, int64_t p) :
- location(l), pool(p) {
-
- // on setlayout cases, forward pointers mean
- // pool != location, but for all others it does
- if (pool == -1) pool = location;
-
- bt.pool = pool;
- i->build_backtrace(l, &bt);
- ls->update_backtraces.push_back(&item_logseg);
-}
-
-// When the info_t is destroyed, it just needs to remove itself
-// from the LogSegment list
-BacktraceInfo::~BacktraceInfo() {
- item_logseg.remove_myself();
-}
-
-// Queue a backtrace for later
-void LogSegment::queue_backtrace_update(CInode *inode, int64_t location, int64_t pool) {
- // allocating a pointer here and not setting it to anything
- // might look strange, but the constructor adds itself to the backtraces
- // list of this LogSegment, which is how we keep track of it
- new BacktraceInfo(location, inode, this, pool);
-}
-
-void LogSegment::remove_pending_backtraces(inodeno_t ino, int64_t pool) {
- elist<BacktraceInfo*>::iterator i = update_backtraces.begin();
- while(!i.end()) {
- ++i;
- if((*i)->bt.ino == ino && (*i)->location == pool) {
- delete (*i);
- }
- }
-}
-
-unsigned LogSegment::encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info)
-{
- bufferlist parent;
- ::encode(info->bt, parent);
- m.setxattr("parent", parent);
- return parent.length();
-}
-
-struct C_LogSegment_StoredBacktrace : public Context {
- LogSegment *ls;
- BacktraceInfo *info;
- Context *fin;
- C_LogSegment_StoredBacktrace(LogSegment *l, BacktraceInfo *c,
- Context *f) : ls(l), info(c), fin(f) {}
- void finish(int r) {
- ls->_stored_backtrace(info, fin);
- }
-};
-
-void LogSegment::store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin)
-{
- ObjectOperation m;
- // prev_pool will be the target pool on create,mkdir,etc.
- encode_parent_mutation(m, info);
-
- // write it.
- SnapContext snapc;
-
- object_t oid = CInode::get_object_name(info->bt.ino, frag_t(), "");
-
- dout(10) << "store_parent for oid " << oid << " location " << info->location << " pool " << info->pool << dendl;
-
- // store the backtrace in the specified pool
- object_locator_t oloc(info->location);
-
- mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0,
- NULL, new C_LogSegment_StoredBacktrace(this, info, fin) );
-
-}
-
-void LogSegment::_stored_backtrace(BacktraceInfo *info, Context *fin)
-{
- delete info;
- if (fin) {
- fin->finish(0);
- delete fin;
- }
-}
-
#undef DOUT_COND
#define DOUT_COND(cct, l) (l<=cct->_conf->debug_mds || l <= cct->_conf->debug_mds_log)
@@ -372,8 +284,6 @@ void LogSegment::_stored_backtrace(BacktraceInfo *info, Context *fin)
EMetaBlob::EMetaBlob(MDLog *mdlog) : opened_ino(0), renamed_dirino(0),
inotablev(0), sessionmapv(0),
allocated_ino(0),
- old_pool(-1),
- update_bt(false),
last_subtree_map(mdlog ? mdlog->get_last_segment_offset() : 0),
my_offset(mdlog ? mdlog->get_write_pos() : 0) //, _segment(0)
{ }
@@ -406,7 +316,7 @@ void EMetaBlob::add_dir_context(CDir *dir, int mode)
if (mode == TO_AUTH_SUBTREE_ROOT) {
// subtree root?
- if (dir->is_subtree_root()) {
+ if (dir->is_subtree_root() && !dir->state_test(CDir::STATE_EXPORTBOUND)) {
if (dir->is_auth() && !dir->is_ambiguous_auth()) {
// it's an auth subtree, we don't need maybe (if any), and we're done.
dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached unambig auth subtree, don't need " << maybe
@@ -485,10 +395,10 @@ void EMetaBlob::update_segment(LogSegment *ls)
// EMetaBlob::fullbit
void EMetaBlob::fullbit::encode(bufferlist& bl) const {
- ENCODE_START(5, 5, bl);
+ ENCODE_START(6, 5, bl);
if (!_enc.length()) {
fullbit copy(dn, dnfirst, dnlast, dnv, inode, dirfragtree, xattrs, symlink,
- snapbl, dirty, &old_inodes);
+ snapbl, state, &old_inodes);
bl.append(copy._enc);
} else {
bl.append(_enc);
@@ -497,7 +407,7 @@ void EMetaBlob::fullbit::encode(bufferlist& bl) const {
}
void EMetaBlob::fullbit::decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(5, 5, 5, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
::decode(dn, bl);
::decode(dnfirst, bl);
::decode(dnlast, bl);
@@ -519,7 +429,14 @@ void EMetaBlob::fullbit::decode(bufferlist::iterator &bl) {
}
}
}
- ::decode(dirty, bl);
+ if (struct_v >= 6) {
+ ::decode(state, bl);
+ } else {
+ bool dirty;
+ ::decode(dirty, bl);
+ state = dirty ? EMetaBlob::fullbit::STATE_DIRTY : 0;
+ }
+
if (struct_v >= 3) {
bool old_inodes_present;
::decode(old_inodes_present, bl);
@@ -571,7 +488,7 @@ void EMetaBlob::fullbit::dump(Formatter *f) const
f->close_section(); // file layout policy
}
}
- f->dump_string("dirty", dirty ? "true" : "false");
+ f->dump_string("state", state_string());
if (!old_inodes.empty()) {
f->open_array_section("old inodes");
for (old_inodes_t::const_iterator iter = old_inodes.begin();
@@ -824,7 +741,7 @@ void EMetaBlob::dirlump::generate_test_instances(list<dirlump*>& ls)
*/
void EMetaBlob::encode(bufferlist& bl) const
{
- ENCODE_START(6, 5, bl);
+ ENCODE_START(7, 5, bl);
::encode(lump_order, bl);
::encode(lump_map, bl);
::encode(roots, bl);
@@ -842,13 +759,18 @@ void EMetaBlob::encode(bufferlist& bl) const
::encode(client_reqs, bl);
::encode(renamed_dirino, bl);
::encode(renamed_dir_frags, bl);
- ::encode(old_pool, bl);
- ::encode(update_bt, bl);
+ {
+ // make MDS use v6 format happy
+ int64_t i = -1;
+ bool b = false;
+ ::encode(i, bl);
+ ::encode(b, bl);
+ }
ENCODE_FINISH(bl);
}
void EMetaBlob::decode(bufferlist::iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl);
::decode(lump_order, bl);
::decode(lump_map, bl);
if (struct_v >= 4) {
@@ -887,8 +809,11 @@ void EMetaBlob::decode(bufferlist::iterator &bl)
::decode(renamed_dir_frags, bl);
}
if (struct_v >= 6) {
- ::decode(old_pool, bl);
- ::decode(update_bt, bl);
+ // ignore
+ int64_t i;
+ bool b;
+ ::decode(i, bl);
+ ::decode(b, bl);
}
DECODE_FINISH(bl);
}
@@ -1004,7 +929,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
if (isnew)
mds->mdcache->add_inode(in);
- if ((*p)->dirty) in->_mark_dirty(logseg);
+ if ((*p)->is_dirty()) in->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl;
}
@@ -1106,11 +1031,11 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
if (!dn) {
dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
dn->set_version(p->dnv);
- if (p->dirty) dn->_mark_dirty(logseg);
+ if (p->is_dirty()) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *dn << dendl;
} else {
dn->set_version(p->dnv);
- if (p->dirty) dn->_mark_dirty(logseg);
+ if (p->is_dirty()) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
dn->first = p->dnfirst;
assert(dn->last == p->dnlast);
@@ -1135,7 +1060,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
if (unlinked.count(in))
linked.insert(in);
dir->link_primary_inode(dn, in);
- if (p->dirty) in->_mark_dirty(logseg);
+ if (p->is_dirty()) in->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *in << dendl;
} else {
if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) {
@@ -1146,7 +1071,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
if (in->get_parent_dn() && in->inode.anchored != p->inode.anchored)
in->get_parent_dn()->adjust_nested_anchors( (int)p->inode.anchored - (int)in->inode.anchored );
p->update_inode(mds, in);
- if (p->dirty) in->_mark_dirty(logseg);
+ if (p->is_dirty()) in->_mark_dirty(logseg);
if (dn->get_linkage()->get_inode() != in) {
if (!dn->get_linkage()->is_null()) { // note: might be remote. as with stray reintegration.
if (dn->get_linkage()->is_primary()) {
@@ -1171,35 +1096,8 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
}
assert(g_conf->mds_kill_journal_replay_at != 2);
-
- // store backtrace for allocated inos (create, mkdir, symlink, mknod)
- if (allocated_ino || used_preallocated_ino) {
- if (in->inode.is_dir()) {
- logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool());
- } else {
- logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
- }
- }
- // handle change of pool with backtrace update
- if (old_pool != -1 && old_pool != in->inode.layout.fl_pg_pool) {
- // update backtrace on new data pool
- logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
-
- // set forwarding pointer on old backtrace
- logseg->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool);
- }
- // handle backtrace update if specified (used by rename)
- if (update_bt) {
- if (in->is_dir()) {
- // replace previous backtrace on this inode with myself
- logseg->remove_pending_backtraces(in->ino(), mds->mdsmap->get_metadata_pool());
- logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool());
- } else {
- // remove all pending backtraces going to the same pool
- logseg->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool);
- logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
- }
- }
+ if (p->is_dirty_parent())
+ in->_mark_dirty_parent(logseg, p->is_dirty_pool());
}
// remote dentries
@@ -1280,7 +1178,8 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
list<frag_t> leaves;
renamed_diri->dirfragtree.get_leaves(leaves);
for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p) {
- CDir *dir = renamed_diri->get_or_open_dirfrag(mds->mdcache, *p);
+ CDir *dir = renamed_diri->get_dirfrag(*p);
+ assert(dir);
// preserve subtree bound until slave commit
if (dir->get_dir_auth() == CDIR_AUTH_UNDEF)
slaveup->olddirs.insert(dir);
diff --git a/src/mds/locks.c b/src/mds/locks.c
index c7dd5bec0ee..90310874411 100644
--- a/src/mds/locks.c
+++ b/src/mds/locks.c
@@ -97,8 +97,8 @@ const struct sm_state_t filelock[LOCK_MAX] = {
[LOCK_XSYN_SYNC] = { LOCK_SYNC, true, LOCK_LOCK, AUTH, 0, AUTH,0, 0, 0, 0, 0,CEPH_CAP_GCACHE,0,0 },
[LOCK_LOCK] = { 0, false, LOCK_LOCK, AUTH, 0, REQ, AUTH,0, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,0 },
- [LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, CEPH_CAP_GCACHE,0,0,CEPH_CAP_GCACHE },
- [LOCK_EXCL_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,CEPH_CAP_GCACHE },
+ [LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, CEPH_CAP_GCACHE,0,0,0 },
+ [LOCK_EXCL_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,0 },
[LOCK_MIX_LOCK] = { LOCK_LOCK, false, LOCK_MIX, AUTH, 0, REQ, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_MIX_LOCK2] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, 0,0,0,0 },
diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc
index b1ce640a539..6886786f27e 100644
--- a/src/mds/mdstypes.cc
+++ b/src/mds/mdstypes.cc
@@ -236,7 +236,7 @@ void inode_t::encode(bufferlist &bl) const
::encode(version, bl);
::encode(file_data_version, bl);
::encode(xattr_version, bl);
- ::encode(last_renamed_version, bl);
+ ::encode(backtrace_version, bl);
::encode(old_pools, bl);
ENCODE_FINISH(bl);
@@ -291,7 +291,7 @@ void inode_t::decode(bufferlist::iterator &p)
::decode(file_data_version, p);
::decode(xattr_version, p);
if (struct_v >= 2)
- ::decode(last_renamed_version, p);
+ ::decode(backtrace_version, p);
if (struct_v >= 7)
::decode(old_pools, p);
@@ -357,7 +357,7 @@ void inode_t::dump(Formatter *f) const
f->dump_unsigned("version", version);
f->dump_unsigned("file_data_version", file_data_version);
f->dump_unsigned("xattr_version", xattr_version);
- f->dump_unsigned("last_renamed_version", last_renamed_version);
+ f->dump_unsigned("backtrace_version", backtrace_version);
}
void inode_t::generate_test_instances(list<inode_t*>& ls)
diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
index aa9d165b53d..5537407a75d 100644
--- a/src/mds/mdstypes.h
+++ b/src/mds/mdstypes.h
@@ -347,7 +347,7 @@ struct inode_t {
version_t file_data_version; // auth only
version_t xattr_version;
- version_t last_renamed_version; // when i was last renamed
+ version_t backtrace_version;
inode_t() : ino(0), rdev(0),
mode(0), uid(0), gid(0),
@@ -355,7 +355,7 @@ struct inode_t {
size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
truncate_pending(0),
time_warp_seq(0),
- version(0), file_data_version(0), xattr_version(0), last_renamed_version(0) {
+ version(0), file_data_version(0), xattr_version(0), backtrace_version(0) {
clear_layout();
memset(&dir_layout, 0, sizeof(dir_layout));
}
@@ -425,7 +425,15 @@ struct inode_t {
}
}
+ bool is_backtrace_updated() {
+ return backtrace_version == version;
+ }
+ void update_backtrace() {
+ backtrace_version = version;
+ }
+
void add_old_pool(int64_t l) {
+ backtrace_version = version;
old_pools.push_back(l);
}
diff --git a/src/messages/MMDSCacheRejoin.h b/src/messages/MMDSCacheRejoin.h
index dc8a1afe114..3ae83553dad 100644
--- a/src/messages/MMDSCacheRejoin.h
+++ b/src/messages/MMDSCacheRejoin.h
@@ -167,9 +167,7 @@ class MMDSCacheRejoin : public Message {
map<vinodeno_t, inode_strong> strong_inodes;
// open
- bufferlist cap_export_bl;
map<inodeno_t,map<client_t, ceph_mds_cap_reconnect> > cap_exports;
- map<inodeno_t,filepath> cap_export_paths;
// full
bufferlist inode_base;
@@ -258,10 +256,6 @@ public:
in->encode_lock_state(CEPH_LOCK_IDFT, inode_scatterlocks[in->ino()].dft);
}
- void copy_cap_exports(bufferlist &bl) {
- cap_export_bl = bl;
- }
-
// dirfrags
void add_strong_dirfrag(dirfrag_t df, int n, int dr) {
strong_dirfrags[df] = dirfrag_strong(n, dr);
@@ -304,7 +298,7 @@ public:
::encode(frozen_authpin_inodes, payload);
::encode(xlocked_inodes, payload);
::encode(wrlocked_inodes, payload);
- ::encode(cap_export_bl, payload);
+ ::encode(cap_exports, payload);
::encode(strong_dirfrags, payload);
::encode(dirfrag_bases, payload);
::encode(weak, payload);
@@ -325,12 +319,7 @@ public:
::decode(frozen_authpin_inodes, p);
::decode(xlocked_inodes, p);
::decode(wrlocked_inodes, p);
- ::decode(cap_export_bl, p);
- if (cap_export_bl.length()) {
- bufferlist::iterator q = cap_export_bl.begin();
- ::decode(cap_exports, q);
- ::decode(cap_export_paths, q);
- }
+ ::decode(cap_exports, p);
::decode(strong_dirfrags, p);
::decode(dirfrag_bases, p);
::decode(weak, p);
diff --git a/src/messages/MMDSOpenIno.h b/src/messages/MMDSOpenIno.h
new file mode 100644
index 00000000000..0918e87e0d9
--- /dev/null
+++ b/src/messages/MMDSOpenIno.h
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MDSOPENINO_H
+#define CEPH_MDSOPENINO_H
+
+#include "msg/Message.h"
+
+struct MMDSOpenIno : public Message {
+ inodeno_t ino;
+ vector<inode_backpointer_t> ancestors;
+
+ MMDSOpenIno() : Message(MSG_MDS_OPENINO) {}
+ MMDSOpenIno(tid_t t, inodeno_t i, vector<inode_backpointer_t>& a) :
+ Message(MSG_MDS_OPENINO), ino(i), ancestors(a) {
+ header.tid = t;
+ }
+
+ const char *get_type_name() const { return "openino"; }
+ void print(ostream &out) const {
+ out << "openino(" << header.tid << " " << ino << " " << ancestors << ")";
+ }
+
+ void encode_payload(uint64_t features) {
+ ::encode(ino, payload);
+ ::encode(ancestors, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(ino, p);
+ ::decode(ancestors, p);
+ }
+};
+
+#endif
diff --git a/src/messages/MMDSOpenInoReply.h b/src/messages/MMDSOpenInoReply.h
new file mode 100644
index 00000000000..245027f11f3
--- /dev/null
+++ b/src/messages/MMDSOpenInoReply.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MDSOPENINOREPLY_H
+#define CEPH_MDSOPENINOREPLY_H
+
+#include "msg/Message.h"
+
+struct MMDSOpenInoReply : public Message {
+ inodeno_t ino;
+ vector<inode_backpointer_t> ancestors;
+ int32_t hint;
+ int32_t error;
+
+ MMDSOpenInoReply() : Message(MSG_MDS_OPENINOREPLY) {}
+ MMDSOpenInoReply(tid_t t, inodeno_t i, int h=-1, int e=0) :
+ Message(MSG_MDS_OPENINOREPLY), ino(i), hint(h), error(e) {
+ header.tid = t;
+ }
+
+ const char *get_type_name() const { return "openinoreply"; }
+ void print(ostream &out) const {
+ out << "openinoreply(" << header.tid << " "
+ << ino << " " << hint << " " << ancestors << ")";
+ }
+
+ void encode_payload(uint64_t features) {
+ ::encode(ino, payload);
+ ::encode(ancestors, payload);
+ ::encode(hint, payload);
+ ::encode(error, payload);
+ }
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(ino, p);
+ ::decode(ancestors, p);
+ ::decode(hint, p);
+ ::decode(error, p);
+ }
+};
+
+#endif
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index 39e3fe9bbe0..338b5195af2 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -926,7 +926,8 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
<< dendl;
// already pending failure?
- if (pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
+ if (pending_inc.new_state.count(target_osd) &&
+ pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
dout(10) << " already pending failure" << dendl;
return true;
}
@@ -3174,6 +3175,8 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
done:
dout(10) << " creating osd." << i << dendl;
+ if (pending_inc.new_state.count(i) == 0)
+ pending_inc.new_state[i] = 0;
pending_inc.new_state[i] |= CEPH_OSD_EXISTS | CEPH_OSD_NEW;
if (!uuid.is_zero())
pending_inc.new_uuid[i] = uuid;
diff --git a/src/msg/Message.cc b/src/msg/Message.cc
index 77be03a590b..a6889d39fdf 100644
--- a/src/msg/Message.cc
+++ b/src/msg/Message.cc
@@ -112,6 +112,8 @@ using namespace std;
#include "messages/MMDSCacheRejoin.h"
#include "messages/MMDSFindIno.h"
#include "messages/MMDSFindInoReply.h"
+#include "messages/MMDSOpenIno.h"
+#include "messages/MMDSOpenInoReply.h"
#include "messages/MDirUpdate.h"
#include "messages/MDiscover.h"
@@ -533,6 +535,13 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
m = new MMDSFindInoReply;
break;
+ case MSG_MDS_OPENINO:
+ m = new MMDSOpenIno;
+ break;
+ case MSG_MDS_OPENINOREPLY:
+ m = new MMDSOpenInoReply;
+ break;
+
case MSG_MDS_FRAGMENTNOTIFY:
m = new MMDSFragmentNotify;
break;
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 18a64c1d02e..aca91184141 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -124,6 +124,8 @@
#define MSG_MDS_DENTRYLINK 0x20c
#define MSG_MDS_FINDINO 0x20d
#define MSG_MDS_FINDINOREPLY 0x20e
+#define MSG_MDS_OPENINO 0x20f
+#define MSG_MDS_OPENINOREPLY 0x210
#define MSG_MDS_LOCK 0x300
#define MSG_MDS_INODEFILECAPS 0x301
diff --git a/src/os/FDCache.h b/src/os/FDCache.h
new file mode 100644
index 00000000000..cf07f860aa5
--- /dev/null
+++ b/src/os/FDCache.h
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_FDCACHE_H
+#define CEPH_FDCACHE_H
+
+#include <memory>
+#include <errno.h>
+#include <cstdio>
+#include "hobject.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/shared_cache.hpp"
+#include "include/compat.h"
+
+/**
+ * FD Cache
+ */
+class FDCache : public md_config_obs_t {
+ /**
+ * FD
+ *
+ * Wrapper for an fd. Destructor closes the fd.
+ */
+ class FD {
+ public:
+ const int fd;
+ FD(int _fd) : fd(_fd) {
+ assert(_fd >= 0);
+ }
+ int operator*() const {
+ return fd;
+ }
+ ~FD() {
+ TEMP_FAILURE_RETRY(::close(fd));
+ }
+ };
+
+ SharedLRU<hobject_t, FD> registry;
+ CephContext *cct;
+public:
+ FDCache(CephContext *cct) : cct(cct) {
+ assert(cct);
+ cct->_conf->add_observer(this);
+ registry.set_size(cct->_conf->filestore_fd_cache_size);
+ }
+ ~FDCache() {
+ cct->_conf->remove_observer(this);
+ }
+ typedef std::tr1::shared_ptr<FD> FDRef;
+
+ FDRef lookup(const hobject_t &hoid) {
+ return registry.lookup(hoid);
+ }
+
+ FDRef add(const hobject_t &hoid, int fd) {
+ return registry.add(hoid, new FD(fd));
+ }
+
+ /// clear cached fd for hoid, subsequent lookups will get an empty FD
+ void clear(const hobject_t &hoid) {
+ registry.clear(hoid);
+ assert(!registry.lookup(hoid));
+ }
+
+ /// md_config_obs_t
+ const char** get_tracked_conf_keys() const {
+ static const char* KEYS[] = {
+ "filestore_fd_cache_size",
+ NULL
+ };
+ return KEYS;
+ }
+ void handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed) {
+ if (changed.count("filestore_fd_cache_size")) {
+ registry.set_size(conf->filestore_fd_cache_size);
+ }
+ }
+
+};
+typedef FDCache::FDRef FDRef;
+
+#endif
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index b32f2875f71..dca8a1bbfea 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -196,10 +196,22 @@ int FileStore::lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf)
return r;
}
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
+int FileStore::lfn_open(coll_t cid,
+ const hobject_t& oid,
+ bool create,
+ FDRef *outfd,
IndexedPath *path,
Index *index)
{
+ assert(outfd);
+ int flags = O_RDWR;
+ if (create)
+ flags |= O_CREAT;
+ Mutex::Locker l(fdcache_lock);
+ *outfd = fdcache.lookup(oid);
+ if (*outfd) {
+ return 0;
+ }
Index index2;
IndexedPath path2;
if (!path)
@@ -224,16 +236,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode
goto fail;
}
- r = ::open((*path)->path(), flags, mode);
+ r = ::open((*path)->path(), flags, 0644);
if (r < 0) {
r = -errno;
dout(10) << "error opening file " << (*path)->path() << " with flags="
- << flags << " and mode=" << mode << ": " << cpp_strerror(-r) << dendl;
+ << flags << ": " << cpp_strerror(-r) << dendl;
goto fail;
}
fd = r;
- if ((flags & O_CREAT) && (!exist)) {
+ if (create && (!exist)) {
r = (*index)->created(oid, (*path)->path());
if (r < 0) {
TEMP_FAILURE_RETRY(::close(fd));
@@ -242,31 +254,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode
goto fail;
}
}
- return fd;
+ *outfd = fdcache.add(oid, fd);
+ return 0;
fail:
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, IndexedPath *path)
+void FileStore::lfn_close(FDRef fd)
{
- return lfn_open(cid, oid, flags, mode, path, 0);
-}
-
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode)
-{
- return lfn_open(cid, oid, flags, mode, 0, 0);
-}
-
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags)
-{
- return lfn_open(cid, oid, flags, 0);
-}
-
-void FileStore::lfn_close(int fd)
-{
- TEMP_FAILURE_RETRY(::close(fd));
}
int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
@@ -324,6 +321,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
const SequencerPosition &spos)
{
+ Mutex::Locker l(fdcache_lock);
Index index;
int r = get_index(cid, &index);
if (r < 0)
@@ -355,6 +353,8 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
if (g_conf->filestore_debug_inject_read_err) {
debug_obj_on_delete(o);
}
+ wbthrottle.clear_object(o); // should be only non-cache ref
+ fdcache.clear(o);
} else {
/* Ensure that replay of this op doesn't result in the object_map
* going away.
@@ -387,6 +387,9 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
sync_entry_timeo_lock("sync_entry_timeo_lock"),
timer(g_ceph_context, sync_entry_timeo_lock),
stop(false), sync_thread(this),
+ fdcache_lock("fdcache_lock"),
+ fdcache(g_ceph_context),
+ wbthrottle(g_ceph_context),
default_osr("default"),
op_queue_len(0), op_queue_bytes(0),
op_throttle_lock("FileStore::op_throttle_lock"),
@@ -394,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
- flusher_queue_len(0), flusher_thread(this),
logger(NULL),
read_error_lock("FileStore::read_error_lock"),
m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range),
m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ),
m_filestore_commit_timeout(g_conf->filestore_commit_timeout),
m_filestore_fiemap(g_conf->filestore_fiemap),
- m_filestore_flusher (g_conf->filestore_flusher ),
m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data),
m_filestore_journal_parallel(g_conf->filestore_journal_parallel ),
m_filestore_journal_trailing(g_conf->filestore_journal_trailing),
m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead),
m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold),
- m_filestore_sync_flush(g_conf->filestore_sync_flush),
- m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds),
- m_filestore_flush_min(g_conf->filestore_flush_min),
m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval),
m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval),
m_filestore_fail_eio(g_conf->filestore_fail_eio),
@@ -1067,6 +1065,7 @@ int FileStore::_detect_fs()
#if defined(__linux__)
if (st.f_type == BTRFS_SUPER_MAGIC) {
dout(0) << "mount detected btrfs" << dendl;
+ wbthrottle.set_fs(WBThrottle::BTRFS);
btrfs = true;
btrfs_stable_commits = btrfs && m_filestore_btrfs_snap;
@@ -1778,7 +1777,6 @@ int FileStore::mount()
journal_start();
op_tp.start();
- flusher_thread.create();
op_finisher.start();
ondisk_finisher.start();
@@ -1816,11 +1814,9 @@ int FileStore::umount()
lock.Lock();
stop = true;
sync_cond.Signal();
- flusher_cond.Signal();
lock.Unlock();
sync_thread.join();
op_tp.stop();
- flusher_thread.join();
journal_stop();
@@ -1968,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o)
void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
+ wbthrottle.throttle();
// inject a stall?
if (g_conf->filestore_inject_stall) {
int orig = g_conf->filestore_inject_stall;
@@ -2263,12 +2260,13 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos
if (!replaying || btrfs_stable_commits)
return 1;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl;
return 1; // if file does not exist, there is no guard, and we can replay.
}
- int ret = _check_replay_guard(fd, spos);
+ int ret = _check_replay_guard(**fd, spos);
lfn_close(fd);
return ret;
}
@@ -2762,22 +2760,24 @@ int FileStore::read(
dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl;
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " << cpp_strerror(fd) << dendl;
- return fd;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
+ dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: "
+ << cpp_strerror(r) << dendl;
+ return r;
}
if (len == 0) {
struct stat st;
memset(&st, 0, sizeof(struct stat));
- int r = ::fstat(fd, &st);
+ int r = ::fstat(**fd, &st);
assert(r == 0);
len = st.st_size;
}
bufferptr bptr(len); // prealloc space for entire read
- got = safe_pread(fd, bptr.c_str(), len, offset);
+ got = safe_pread(**fd, bptr.c_str(), len, offset);
if (got < 0) {
dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
lfn_close(fd);
@@ -2815,15 +2815,14 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid,
dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl;
- int r;
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- r = fd;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
} else {
uint64_t i;
- r = do_fiemap(fd, offset, len, &fiemap);
+ r = do_fiemap(**fd, offset, len, &fiemap);
if (r < 0)
goto done;
@@ -2865,10 +2864,10 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid,
}
done:
- if (fd >= 0)
+ if (r >= 0) {
lfn_close(fd);
- if (r >= 0)
::encode(exomap, bl);
+ }
dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << exomap.size() << " " << exomap << dendl;
free(fiemap);
@@ -2899,14 +2898,13 @@ int FileStore::_touch(coll_t cid, const hobject_t& oid)
{
dout(15) << "touch " << cid << "/" << oid << dendl;
- int flags = O_WRONLY|O_CREAT;
- int fd = lfn_open(cid, oid, flags, 0644);
- int r;
- if (fd >= 0) {
+ FDRef fd;
+ int r = lfn_open(cid, oid, true, &fd);
+ if (r < 0) {
+ return r;
+ } else {
lfn_close(fd);
- r = 0;
- } else
- r = fd;
+ }
dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl;
return r;
}
@@ -2920,17 +2918,17 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
int64_t actual;
- int flags = O_WRONLY|O_CREAT;
- int fd = lfn_open(cid, oid, flags, 0644);
- if (fd < 0) {
- r = fd;
- dout(0) << "write couldn't open " << cid << "/" << oid << " flags " << flags << ": "
+ FDRef fd;
+ r = lfn_open(cid, oid, true, &fd);
+ if (r < 0) {
+ dout(0) << "write couldn't open " << cid << "/"
+ << oid << ": "
<< cpp_strerror(r) << dendl;
goto out;
}
// seek
- actual = ::lseek64(fd, offset, SEEK_SET);
+ actual = ::lseek64(**fd, offset, SEEK_SET);
if (actual < 0) {
r = -errno;
dout(0) << "write lseek64 to " << offset << " failed: " << cpp_strerror(r) << dendl;
@@ -2945,43 +2943,13 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
}
// write
- r = bl.write_fd(fd);
+ r = bl.write_fd(**fd);
if (r == 0)
r = bl.length();
// flush?
- {
- bool should_flush = (ssize_t)len >= m_filestore_flush_min;
- bool local_flush = false;
-#ifdef HAVE_SYNC_FILE_RANGE
- bool async_done = false;
- if (!should_flush ||
- !m_filestore_flusher ||
- !(async_done = queue_flusher(fd, offset, len, replica))) {
- if (should_flush && m_filestore_sync_flush) {
- ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE);
- local_flush = true;
- }
- }
- //Both lfn_close() and possible posix_fadvise() done by flusher
- if (async_done) fd = -1;
-#else
- // no sync_file_range; (maybe) flush inline and close.
- if (should_flush && m_filestore_sync_flush) {
- ::fdatasync(fd);
- local_flush = true;
- }
-#endif
- if (local_flush && replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- }
- if (fd >= 0) lfn_close(fd);
+ wbthrottle.queue_wb(fd, oid, offset, len, replica);
+ lfn_close(fd);
out:
dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
@@ -2996,14 +2964,14 @@ int FileStore::_zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t l
#ifdef CEPH_HAVE_FALLOCATE
# if !defined(DARWIN) && !defined(__FreeBSD__)
// first try to punch a hole.
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- ret = -errno;
+ FDRef fd;
+ ret = lfn_open(cid, oid, false, &fd);
+ if (ret < 0) {
goto out;
}
// first try fallocate
- ret = fallocate(fd, FALLOC_FL_PUNCH_HOLE, offset, len);
+ ret = fallocate(**fd, FALLOC_FL_PUNCH_HOLE, offset, len);
if (ret < 0)
ret = -errno;
lfn_close(fd);
@@ -3039,23 +3007,26 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
if (_check_replay_guard(cid, newoid, spos) < 0)
return 0;
- int o, n, r;
+ int r;
+ FDRef o, n;
{
Index index;
IndexedPath from, to;
- o = lfn_open(cid, oldoid, O_RDONLY, 0, &from, &index);
- if (o < 0) {
- r = o;
+ r = lfn_open(cid, oldoid, false, &o, &from, &index);
+ if (r < 0) {
goto out2;
}
- n = lfn_open(cid, newoid, O_CREAT|O_TRUNC|O_WRONLY, 0644, &to, &index);
- if (n < 0) {
- r = n;
+ r = lfn_open(cid, newoid, true, &n, &to, &index);
+ if (r < 0) {
+ goto out;
+ }
+ r = ::ftruncate(**n, 0);
+ if (r < 0) {
goto out;
}
struct stat st;
- ::fstat(o, &st);
- r = _do_clone_range(o, n, 0, st.st_size, 0);
+ ::fstat(**o, &st);
+ r = _do_clone_range(**o, **n, 0, st.st_size, 0);
if (r < 0) {
r = -errno;
goto out3;
@@ -3068,17 +3039,17 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
{
map<string, bufferptr> aset;
- r = _fgetattrs(o, aset, false);
+ r = _fgetattrs(**o, aset, false);
if (r < 0)
goto out3;
- r = _fsetattrs(n, aset);
+ r = _fsetattrs(**n, aset);
if (r < 0)
goto out3;
}
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos, &newoid);
+ _set_replay_guard(**n, spos, &newoid);
out3:
lfn_close(n);
@@ -3248,21 +3219,19 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
return 0;
int r;
- int o, n;
- o = lfn_open(cid, oldoid, O_RDONLY);
- if (o < 0) {
- r = o;
+ FDRef o, n;
+ r = lfn_open(cid, oldoid, false, &o);
+ if (r < 0) {
goto out2;
}
- n = lfn_open(cid, newoid, O_CREAT|O_WRONLY, 0644);
- if (n < 0) {
- r = n;
+ r = lfn_open(cid, newoid, true, &n);
+ if (r < 0) {
goto out;
}
- r = _do_clone_range(o, n, srcoff, len, dstoff);
+ r = _do_clone_range(**o, **n, srcoff, len, dstoff);
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos, &newoid);
+ _set_replay_guard(**n, spos, &newoid);
lfn_close(n);
out:
@@ -3273,89 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
return r;
}
-
-bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica)
-{
- bool queued;
- lock.Lock();
- if (flusher_queue_len < m_filestore_flusher_max_fds) {
- flusher_queue.push_back(sync_epoch);
- flusher_queue.push_back(fd);
- flusher_queue.push_back(off);
- flusher_queue.push_back(len);
- flusher_queue.push_back(replica);
- flusher_queue_len++;
- flusher_cond.Signal();
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << dendl;
- queued = true;
- } else {
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << " hit flusher_max_fds " << m_filestore_flusher_max_fds
- << ", skipping async flush" << dendl;
- queued = false;
- }
- lock.Unlock();
- return queued;
-}
-
-void FileStore::flusher_entry()
-{
- lock.Lock();
- dout(20) << "flusher_entry start" << dendl;
- while (true) {
- if (!flusher_queue.empty()) {
-#ifdef HAVE_SYNC_FILE_RANGE
- list<uint64_t> q;
- q.swap(flusher_queue);
-
- int num = flusher_queue_len; // see how many we're taking, here
-
- lock.Unlock();
- while (!q.empty()) {
- uint64_t ep = q.front();
- q.pop_front();
- int fd = q.front();
- q.pop_front();
- uint64_t off = q.front();
- q.pop_front();
- uint64_t len = q.front();
- q.pop_front();
- bool replica = q.front();
- q.pop_front();
- if (!stop && ep == sync_epoch) {
- dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl;
- ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE);
- if (replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- } else
- dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep
- << ", sync_epoch=" << sync_epoch << ")" << dendl;
- lfn_close(fd);
- }
- lock.Lock();
- flusher_queue_len -= num; // they're definitely closed, forget
-#endif
- } else {
- if (stop)
- break;
- dout(20) << "flusher_entry sleeping" << dendl;
- flusher_cond.Wait(lock);
- dout(20) << "flusher_entry awoke" << dendl;
- }
- }
- dout(20) << "flusher_entry finish" << dendl;
- lock.Unlock();
-}
-
class SyncEntryTimeout : public Context {
public:
SyncEntryTimeout(int commit_timeo)
@@ -3548,6 +3434,7 @@ void FileStore::sync_entry()
logger->tinc(l_os_commit_len, dur);
apply_manager.commit_finish();
+ wbthrottle.clear();
logger->set(l_os_committing, 0);
@@ -3856,15 +3743,14 @@ bool FileStore::debug_mdata_eio(const hobject_t &oid) {
int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, bufferptr &bp)
{
dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl;
- int r;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
- r = _fgetattr(fd, n, bp);
+ r = _fgetattr(**fd, n, bp);
lfn_close(fd);
if (r == -ENODATA && g_conf->filestore_xattr_use_omap) {
map<string, bufferlist> got;
@@ -3903,13 +3789,12 @@ int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, buffe
int FileStore::getattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset, bool user_only)
{
dout(15) << "getattrs " << cid << "/" << oid << dendl;
- int r;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
- r = _fgetattrs(fd, aset, user_only);
+ r = _fgetattrs(**fd, aset, user_only);
lfn_close(fd);
if (g_conf->filestore_xattr_use_omap) {
set<string> omap_attrs;
@@ -3967,14 +3852,13 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
set<string> omap_remove;
map<string, bufferptr> inline_set;
map<string, bufferptr> inline_to_set;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
if (g_conf->filestore_xattr_use_omap) {
- r = _fgetattrs(fd, inline_set, false);
+ r = _fgetattrs(**fd, inline_set, false);
assert(!m_filestore_fail_eio || r != -EIO);
}
dout(15) << "setattrs " << cid << "/" << oid << dendl;
@@ -3988,7 +3872,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
if (p->second.length() > g_conf->filestore_max_inline_xattr_size) {
if (inline_set.count(p->first)) {
inline_set.erase(p->first);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
goto out_close;
}
@@ -4000,7 +3884,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
inline_set.size() >= g_conf->filestore_max_inline_xattrs) {
if (inline_set.count(p->first)) {
inline_set.erase(p->first);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
goto out_close;
}
@@ -4015,7 +3899,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
}
- r = _fsetattrs(fd, inline_to_set);
+ r = _fsetattrs(**fd, inline_to_set);
if (r < 0)
goto out_close;
@@ -4050,15 +3934,14 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name,
const SequencerPosition &spos)
{
dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r == -ENODATA && g_conf->filestore_xattr_use_omap) {
Index index;
r = get_index(cid, &index);
@@ -4088,18 +3971,17 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid,
dout(15) << "rmattrs " << cid << "/" << oid << dendl;
map<string,bufferptr> aset;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
- r = _fgetattrs(fd, aset, false);
+ r = _fgetattrs(**fd, aset, false);
if (r >= 0) {
for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
break;
}
@@ -4687,21 +4569,21 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
// open guard on object so we don't any previous operations on the
// new name that will modify the source inode.
- int fd = lfn_open(oldcid, o, 0);
- if (fd < 0) {
+ FDRef fd;
+ int r = lfn_open(oldcid, o, 0, &fd);
+ if (r < 0) {
// the source collection/object does not exist. If we are replaying, we
// should be safe, so just return 0 and move on.
assert(replaying);
dout(10) << "collection_add " << c << "/" << o << " from "
- << oldcid << "/" << o << " (dne, continue replay) " << dendl;
+ << oldcid << "/" << o << " (dne, continue replay) " << dendl;
return 0;
}
- assert(fd >= 0);
if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
- _set_replay_guard(fd, spos, &o, true);
+ _set_replay_guard(**fd, spos, &o, true);
}
- int r = lfn_link(oldcid, c, o);
+ r = lfn_link(oldcid, c, o);
if (replaying && !btrfs_stable_commits &&
r == -EEXIST) // crashed between link() and set_replay_guard()
r = 0;
@@ -4710,7 +4592,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
// close guard on object so we don't do this again
if (r == 0) {
- _close_replay_guard(fd, spos);
+ _close_replay_guard(**fd, spos);
}
lfn_close(fd);
@@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const
"filestore_queue_max_bytes",
"filestore_queue_committing_max_ops",
"filestore_queue_committing_max_bytes",
- "filestore_flusher",
- "filestore_flusher_max_fds",
- "filestore_sync_flush",
"filestore_commit_timeout",
"filestore_dump_file",
"filestore_kill_at",
@@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
changed.count("filestore_queue_max_bytes") ||
changed.count("filestore_queue_committing_max_ops") ||
changed.count("filestore_queue_committing_max_bytes") ||
- changed.count("filestore_flusher") ||
- changed.count("filestore_flusher_max_fds") ||
- changed.count("filestore_flush_min") ||
changed.count("filestore_kill_at") ||
changed.count("filestore_fail_eio") ||
changed.count("filestore_replica_fadvise")) {
@@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
- m_filestore_flusher = conf->filestore_flusher;
- m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds;
- m_filestore_flush_min = conf->filestore_flush_min;
- m_filestore_sync_flush = conf->filestore_sync_flush;
m_filestore_kill_at.set(conf->filestore_kill_at);
m_filestore_fail_eio = conf->filestore_fail_eio;
m_filestore_replica_fadvise = conf->filestore_replica_fadvise;
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index d5ca2a4c237..78668dd92a4 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -40,6 +40,8 @@ using namespace __gnu_cxx;
#include "IndexManager.h"
#include "ObjectMap.h"
#include "SequencerPosition.h"
+#include "FDCache.h"
+#include "WBThrottle.h"
#include "include/uuid.h"
@@ -198,6 +200,10 @@ private:
friend ostream& operator<<(ostream& out, const OpSequencer& s);
+ Mutex fdcache_lock;
+ FDCache fdcache;
+ WBThrottle wbthrottle;
+
Sequencer default_osr;
deque<OpSequencer*> op_queue;
uint64_t op_queue_len, op_queue_bytes;
@@ -250,37 +256,22 @@ private:
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend class C_JournaledAhead;
- // flusher thread
- Cond flusher_cond;
- list<uint64_t> flusher_queue;
- int flusher_queue_len;
- void flusher_entry();
- struct FlusherThread : public Thread {
- FileStore *fs;
- FlusherThread(FileStore *f) : fs(f) {}
- void *entry() {
- fs->flusher_entry();
- return 0;
- }
- } flusher_thread;
- bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica);
-
int open_journal();
-
PerfCounters *logger;
public:
int lfn_find(coll_t cid, const hobject_t& oid, IndexedPath *path);
int lfn_truncate(coll_t cid, const hobject_t& oid, off_t length);
int lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
- IndexedPath *path);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
- IndexedPath *path, Index *index);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags);
- void lfn_close(int fd);
+ int lfn_open(
+ coll_t cid,
+ const hobject_t& oid,
+ bool create,
+ FDRef *outfd,
+ IndexedPath *path = 0,
+ Index *index = 0);
+ void lfn_close(FDRef fd);
int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ;
int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
@@ -510,15 +501,11 @@ private:
bool m_filestore_btrfs_snap;
float m_filestore_commit_timeout;
bool m_filestore_fiemap;
- bool m_filestore_flusher;
bool m_filestore_fsync_flushes_journal_data;
bool m_filestore_journal_parallel;
bool m_filestore_journal_trailing;
bool m_filestore_journal_writeahead;
int m_filestore_fiemap_threshold;
- bool m_filestore_sync_flush;
- int m_filestore_flusher_max_fds;
- int m_filestore_flush_min;
double m_filestore_max_sync_interval;
double m_filestore_min_sync_interval;
bool m_filestore_fail_eio;
diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc
new file mode 100644
index 00000000000..4673488f833
--- /dev/null
+++ b/src/os/WBThrottle.cc
@@ -0,0 +1,239 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "os/WBThrottle.h"
+#include "common/perf_counters.h"
+
+WBThrottle::WBThrottle(CephContext *cct) :
+ cur_ios(0), cur_size(0),
+ cct(cct),
+ logger(NULL),
+ stopping(false),
+ lock("WBThrottle::lock", false, true, false, cct),
+ fs(XFS)
+{
+ {
+ Mutex::Locker l(lock);
+ set_from_conf();
+ }
+ assert(cct);
+ PerfCountersBuilder b(
+ cct, string("WBThrottle"),
+ l_wbthrottle_first, l_wbthrottle_last);
+ b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied");
+ b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb");
+ b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied");
+ b.add_u64(l_wbthrottle_ios_wb, "ios_wb");
+ b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied");
+ b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb");
+ logger = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+ for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
+ logger->set(i, 0);
+
+ cct->_conf->add_observer(this);
+ create();
+}
+
+WBThrottle::~WBThrottle() {
+ assert(cct);
+ {
+ Mutex::Locker l(lock);
+ stopping = true;
+ cond.Signal();
+ }
+ join();
+ cct->get_perfcounters_collection()->remove(logger);
+ delete logger;
+ cct->_conf->remove_observer(this);
+}
+
+const char** WBThrottle::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "filestore_wbthrottle_btrfs_bytes_start_flusher",
+ "filestore_wbthrottle_btrfs_bytes_hard_limit",
+ "filestore_wbthrottle_btrfs_ios_start_flusher",
+ "filestore_wbthrottle_btrfs_ios_hard_limit",
+ "filestore_wbthrottle_btrfs_inodes_start_flusher",
+ "filestore_wbthrottle_btrfs_inodes_hard_limit",
+ "filestore_wbthrottle_xfs_bytes_start_flusher",
+ "filestore_wbthrottle_xfs_bytes_hard_limit",
+ "filestore_wbthrottle_xfs_ios_start_flusher",
+ "filestore_wbthrottle_xfs_ios_hard_limit",
+ "filestore_wbthrottle_xfs_inodes_start_flusher",
+ "filestore_wbthrottle_xfs_inodes_hard_limit",
+ NULL
+ };
+ return KEYS;
+}
+
+void WBThrottle::set_from_conf()
+{
+ assert(lock.is_locked());
+ if (fs == BTRFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
+ } else if (fs == XFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
+ } else {
+ assert(0 == "invalid value for fs");
+ }
+ cond.Signal();
+}
+
+void WBThrottle::handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed)
+{
+ Mutex::Locker l(lock);
+ for (const char** i = get_tracked_conf_keys(); *i; ++i) {
+ if (changed.count(*i)) {
+ set_from_conf();
+ return;
+ }
+ }
+}
+
+bool WBThrottle::get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next)
+{
+ assert(lock.is_locked());
+ assert(next);
+ while (!stopping &&
+ cur_ios < io_limits.first &&
+ pending_wbs.size() < fd_limits.first &&
+ cur_size < size_limits.first)
+ cond.Wait(lock);
+ if (stopping)
+ return false;
+ assert(!pending_wbs.empty());
+ hobject_t obj(pop_object());
+
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(obj);
+ *next = boost::make_tuple(obj, i->second.second, i->second.first);
+ pending_wbs.erase(i);
+ return true;
+}
+
+
+void *WBThrottle::entry()
+{
+ Mutex::Locker l(lock);
+ boost::tuple<hobject_t, FDRef, PendingWB> wb;
+ while (get_next_should_flush(&wb)) {
+ clearing = wb.get<0>();
+ lock.Unlock();
+ ::fsync(**wb.get<1>());
+ if (wb.get<2>().nocache)
+ posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
+ lock.Lock();
+ clearing = hobject_t();
+ cur_ios -= wb.get<2>().ios;
+ logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
+ cur_size -= wb.get<2>().size;
+ logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
+ logger->dec(l_wbthrottle_inodes_dirtied);
+ cond.Signal();
+ wb = boost::tuple<hobject_t, FDRef, PendingWB>();
+ }
+ return 0;
+}
+
+void WBThrottle::queue_wb(
+ FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len,
+ bool nocache)
+{
+ Mutex::Locker l(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
+ pending_wbs.find(hoid);
+ if (wbiter == pending_wbs.end()) {
+ wbiter = pending_wbs.insert(
+ make_pair(hoid,
+ make_pair(
+ PendingWB(),
+ fd))).first;
+ logger->inc(l_wbthrottle_inodes_dirtied);
+ } else {
+ remove_object(hoid);
+ }
+
+ cur_ios++;
+ logger->inc(l_wbthrottle_ios_dirtied);
+ cur_size += len;
+ logger->inc(l_wbthrottle_bytes_dirtied, len);
+
+ wbiter->second.first.add(nocache, len, 1);
+ insert_object(hoid);
+ cond.Signal();
+}
+
+void WBThrottle::clear()
+{
+ Mutex::Locker l(lock);
+ for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.begin();
+ i != pending_wbs.end();
+ ++i) {
+ cur_ios -= i->second.first.ios;
+ logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
+ cur_size -= i->second.first.size;
+ logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
+ logger->dec(l_wbthrottle_inodes_dirtied);
+ }
+ pending_wbs.clear();
+ lru.clear();
+ rev_lru.clear();
+ cond.Signal();
+ assert(cur_ios == 0);
+ assert(cur_size == 0);
+}
+
+void WBThrottle::clear_object(const hobject_t &hoid)
+{
+ Mutex::Locker l(lock);
+ while (clearing == hoid)
+ cond.Wait(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(hoid);
+ if (i == pending_wbs.end())
+ return;
+
+ cur_ios -= i->second.first.ios;
+ cur_size -= i->second.first.size;
+
+ pending_wbs.erase(i);
+ remove_object(hoid);
+}
+
+void WBThrottle::throttle()
+{
+ Mutex::Locker l(lock);
+ while (!stopping && !(
+ cur_ios < io_limits.second &&
+ pending_wbs.size() < fd_limits.second &&
+ cur_size < size_limits.second)) {
+ cond.Wait(lock);
+ }
+}
diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h
new file mode 100644
index 00000000000..070de08e123
--- /dev/null
+++ b/src/os/WBThrottle.h
@@ -0,0 +1,171 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef WBTHROTTLE_H
+#define WBTHROTTLE_H
+
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <tr1/memory>
+#include "include/buffer.h"
+#include "common/Formatter.h"
+#include "os/hobject.h"
+#include "include/interval_set.h"
+#include "FDCache.h"
+#include "common/Thread.h"
+#include "common/ceph_context.h"
+
+class PerfCounters;
+enum {
+ l_wbthrottle_first = 999090,
+ l_wbthrottle_bytes_dirtied,
+ l_wbthrottle_bytes_wb,
+ l_wbthrottle_ios_dirtied,
+ l_wbthrottle_ios_wb,
+ l_wbthrottle_inodes_dirtied,
+ l_wbthrottle_inodes_wb,
+ l_wbthrottle_last
+};
+
+/**
+ * WBThrottle
+ *
+ * Tracks, throttles, and flushes outstanding IO
+ */
+class WBThrottle : Thread, public md_config_obs_t {
+ hobject_t clearing;
+
+ /* *_limits.first is the start_flusher limit and
+ * *_limits.second is the hard limit
+ */
+
+ /// Limits on unflushed bytes
+ pair<uint64_t, uint64_t> size_limits;
+
+ /// Limits on unflushed ios
+ pair<uint64_t, uint64_t> io_limits;
+
+ /// Limits on unflushed objects
+ pair<uint64_t, uint64_t> fd_limits;
+
+ uint64_t cur_ios; /// Currently unflushed IOs
+ uint64_t cur_size; /// Currently unflushed bytes
+
+ /**
+ * PendingWB tracks the ios pending on an object.
+ */
+ class PendingWB {
+ public:
+ bool nocache;
+ uint64_t size;
+ uint64_t ios;
+ PendingWB() : nocache(true), size(0), ios(0) {}
+ void add(bool _nocache, uint64_t _size, uint64_t _ios) {
+ if (!_nocache)
+ nocache = false; // only nocache if all writes are nocache
+ size += _size;
+ ios += _ios;
+ }
+ };
+
+ CephContext *cct;
+ PerfCounters *logger;
+ bool stopping;
+ Mutex lock;
+ Cond cond;
+
+
+ /**
+ * Flush objects in lru order
+ */
+ list<hobject_t> lru;
+ map<hobject_t, list<hobject_t>::iterator> rev_lru;
+ void remove_object(const hobject_t &hoid) {
+ assert(lock.is_locked());
+ map<hobject_t, list<hobject_t>::iterator>::iterator iter =
+ rev_lru.find(hoid);
+ if (iter == rev_lru.end())
+ return;
+
+ lru.erase(iter->second);
+ rev_lru.erase(iter);
+ }
+ hobject_t pop_object() {
+ assert(!lru.empty());
+ hobject_t hoid(lru.front());
+ lru.pop_front();
+ rev_lru.erase(hoid);
+ return hoid;
+ }
+ void insert_object(const hobject_t &hoid) {
+ assert(rev_lru.find(hoid) == rev_lru.end());
+ lru.push_back(hoid);
+ rev_lru.insert(make_pair(hoid, --lru.end()));
+ }
+
+ map<hobject_t, pair<PendingWB, FDRef> > pending_wbs;
+
+ /// get next flush to perform
+ bool get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush
+ ); ///< @return false if we are shutting down
+public:
+ enum FS {
+ BTRFS,
+ XFS
+ };
+
+private:
+ FS fs;
+
+ void set_from_conf();
+public:
+ WBThrottle(CephContext *cct);
+ ~WBThrottle();
+
+ /// Set fs as XFS or BTRFS
+ void set_fs(FS new_fs) {
+ Mutex::Locker l(lock);
+ fs = new_fs;
+ set_from_conf();
+ }
+
+ /// Queue wb on hoid, fd taking throttle (does not block)
+ void queue_wb(
+ FDRef fd, ///< [in] FDRef to hoid
+ const hobject_t &hoid, ///< [in] object
+ uint64_t offset, ///< [in] offset written
+ uint64_t len, ///< [in] length written
+ bool nocache ///< [in] try to clear out of cache after write
+ );
+
+ /// Clear all wb (probably due to sync)
+ void clear();
+
+ /// Clear object
+ void clear_object(const hobject_t &hoid);
+
+ /// Block until there is throttle available
+ void throttle();
+
+ /// md_config_obs_t
+ const char** get_tracked_conf_keys() const;
+ void handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed);
+
+ /// Thread
+ void *entry();
+};
+
+#endif
diff --git a/src/os/hobject.h b/src/os/hobject.h
index 28e0da0d82a..47fcb3dda39 100644
--- a/src/os/hobject.h
+++ b/src/os/hobject.h
@@ -15,6 +15,9 @@
#ifndef __CEPH_OS_HOBJECT_H
#define __CEPH_OS_HOBJECT_H
+#include <string.h>
+#include "include/types.h"
+#include "include/rados.h"
#include "include/object.h"
#include "include/cmp.h"
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 761a77cd69c..de60a6a9205 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -4383,6 +4383,11 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
if (oid.snap == CEPH_SNAPDIR) {
// return head or snapdir, whichever exists.
ObjectContext *obc = get_object_context(head, oloc, can_create);
+ if (obc && !obc->obs.exists) {
+ // ignore it if the obc exists but the object doesn't
+ put_object_context(obc);
+ obc = NULL;
+ }
if (!obc) {
obc = get_object_context(snapdir, oloc, can_create);
}
@@ -5288,7 +5293,6 @@ void ReplicatedPG::submit_push_data(
void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
{
- remove_snap_mapped_object(*t, recovery_info.soid);
t->collection_move(coll, get_temp_coll(t), recovery_info.soid);
for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
recovery_info.clone_subset.begin();