diff options
author | Josh Durgin <josh.durgin@inktank.com> | 2012-12-05 15:55:35 -0800 |
---|---|---|
committer | Josh Durgin <josh.durgin@inktank.com> | 2012-12-05 15:55:35 -0800 |
commit | 930bb55006aff45f717462b95690d6d0d0fb8150 (patch) | |
tree | bcb13d491bb779c99f9a5c9eeda454756c0b8775 | |
parent | 27793255963f8d3ce329ce70326316175b2a101e (diff) | |
parent | 2a5549cc0cc1e99a8f7eb5db092674425ccdb075 (diff) | |
download | ceph-930bb55006aff45f717462b95690d6d0d0fb8150.tar.gz |
Merge branch 'next'
-rw-r--r-- | qa/run_xfstests_qemu.sh | 7 | ||||
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/logrotate.conf | 6 | ||||
-rw-r--r-- | src/mds/CDir.cc | 6 | ||||
-rw-r--r-- | src/mds/CInode.cc | 36 | ||||
-rw-r--r-- | src/mds/CInode.h | 5 | ||||
-rw-r--r-- | src/mds/Capability.h | 6 | ||||
-rw-r--r-- | src/mds/Locker.cc | 27 | ||||
-rw-r--r-- | src/mds/Locker.h | 3 | ||||
-rw-r--r-- | src/mds/MDCache.cc | 87 | ||||
-rw-r--r-- | src/mds/MDCache.h | 8 | ||||
-rw-r--r-- | src/mds/Migrator.cc | 1 | ||||
-rw-r--r-- | src/mds/Mutation.cc | 31 | ||||
-rw-r--r-- | src/mds/Mutation.h | 6 | ||||
-rw-r--r-- | src/mds/Server.cc | 94 | ||||
-rw-r--r-- | src/mds/mdstypes.h | 7 | ||||
-rw-r--r-- | src/messages/MMDSSlaveRequest.h | 1 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 92 | ||||
-rw-r--r-- | src/msg/Pipe.h | 58 | ||||
-rw-r--r-- | src/os/FileStore.cc | 13 | ||||
-rw-r--r-- | src/os/JournalingObjectStore.cc | 33 | ||||
-rw-r--r-- | src/os/JournalingObjectStore.h | 8 | ||||
-rw-r--r-- | src/osd/OSD.cc | 3 | ||||
-rw-r--r-- | src/osd/PG.cc | 4 |
24 files changed, 405 insertions, 140 deletions
diff --git a/qa/run_xfstests_qemu.sh b/qa/run_xfstests_qemu.sh new file mode 100644 index 00000000000..0b5b86de090 --- /dev/null +++ b/qa/run_xfstests_qemu.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +mkdir /tmp/cephtest +wget https://raw.github.com/ceph/ceph/master/qa/run_xfstests.sh +chmod +x run_xfstests.sh +# tests excluded require extra packages for advanced acl and quota support +./run_xfstests.sh -c 1 -f xfs -t /dev/vdb -s /dev/vdc 1-26 28-49 51-63 65-83 85-233 235-291 diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 0cdb16c3ebf..eedb6d6a2f5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) OPTION(ms_inject_socket_failures, OPT_U64, 0) +OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed +OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds +OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id") OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster diff --git a/src/logrotate.conf b/src/logrotate.conf index c4857f00c10..9af310413d9 100644 --- a/src/logrotate.conf +++ b/src/logrotate.conf @@ -4,13 +4,13 @@ compress sharedscripts postrotate - if which invoke-rc.d && [ -x `which invoke-rc.d` ]; then + if which invoke-rc.d > /dev/null && [ -x `which invoke-rc.d` ]; then invoke-rc.d ceph reload >/dev/null - elif which service && [ -x `which service` ]; then + elif which service > /dev/null && [ -x `which service` ]; then service ceph reload >/dev/null fi # Possibly reload twice, but depending on ceph.conf the reload above may be a no-op - if which initctl && [ -x `which initctl` ]; then + if which initctl > /dev/null && [ -x `which initctl` ]; then # upstart reload isn't very helpful here: # https://bugs.launchpad.net/upstart/+bug/1012938 for type in mon osd mds; do diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index 55b76d3a298..4b1d3ef76bc 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -615,8 +615,12 @@ void CDir::unlink_inode_work( CDentry *dn ) void CDir::add_to_bloom(CDentry *dn) { - if (!bloom) + if (!bloom) { + /* not create bloom filter for incomplete dir that was added by log replay */ + if (!is_complete()) + return; bloom = new bloom_filter(100, 0.05, 0); + } /* This size and false positive probability is completely random.*/ bloom->insert(dn->name.c_str(), dn->name.size()); } diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index c12930837df..af70b681ffc 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -130,6 +130,7 @@ ostream& operator<<(ostream& out, CInode& in) if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent"; if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance; if (in.is_frozen_inode()) out << " FROZEN"; + if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN"; inode_t *pi = in.get_projected_inode(); if (pi->is_truncating()) @@ -1862,7 +1863,8 @@ void CInode::add_waiter(uint64_t tag, Context *c) // wait on the directory? // make sure its not the inode that is explicitly ambiguous|freezing|frozen if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) || - ((tag & WAIT_UNFREEZE) && !is_frozen_inode() && !is_freezing_inode())) { + ((tag & WAIT_UNFREEZE) && + !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) { dout(15) << "passing waiter up tree" << dendl; parent->dir->add_waiter(tag, c); return; @@ -1885,8 +1887,10 @@ bool CInode::freeze_inode(int auth_pin_allowance) dout(10) << "freeze_inode - frozen" << dendl; assert(auth_pins == auth_pin_allowance); - get(PIN_FROZEN); - state_set(STATE_FROZEN); + if (!state_test(STATE_FROZEN)) { + get(PIN_FROZEN); + state_set(STATE_FROZEN); + } return true; } @@ -1904,10 +1908,34 @@ void CInode::unfreeze_inode(list<Context*>& finished) take_waiting(WAIT_UNFREEZE, finished); } +void CInode::unfreeze_inode() +{ + list<Context*> finished; + unfreeze_inode(finished); + mdcache->mds->queue_waiters(finished); +} + +void CInode::freeze_auth_pin() +{ + assert(state_test(CInode::STATE_FROZEN)); + state_set(CInode::STATE_FROZENAUTHPIN); +} + +void CInode::unfreeze_auth_pin() +{ + assert(state_test(CInode::STATE_FROZENAUTHPIN)); + state_clear(CInode::STATE_FROZENAUTHPIN); + if (!state_test(STATE_FREEZING|STATE_FROZEN)) { + list<Context*> finished; + take_waiting(WAIT_UNFREEZE, finished); + mdcache->mds->queue_waiters(finished); + } +} // auth_pins bool CInode::can_auth_pin() { - if (is_freezing_inode() || is_frozen_inode()) return false; + if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin()) + return false; if (parent) return parent->can_auth_pin(); return true; diff --git a/src/mds/CInode.h b/src/mds/CInode.h index b76b52414c9..e43ecf50fa3 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -181,6 +181,7 @@ public: static const int STATE_DIRTYPARENT = (1<<14); static const int STATE_DIRTYRSTAT = (1<<15); static const int STATE_STRAYPINNED = (1<<16); + static const int STATE_FROZENAUTHPIN = (1<<17); // -- waiters -- static const uint64_t WAIT_DIR = (1<<0); @@ -856,6 +857,7 @@ public: // -- freeze -- bool is_freezing_inode() { return state_test(STATE_FREEZING); } bool is_frozen_inode() { return state_test(STATE_FROZEN); } + bool is_frozen_auth_pin() { return state_test(STATE_FROZENAUTHPIN); } bool is_frozen(); bool is_frozen_dir(); bool is_freezing(); @@ -864,7 +866,10 @@ public: * auth_pins it is itself holding/responsible for. */ bool freeze_inode(int auth_pin_allowance=0); void unfreeze_inode(list<Context*>& finished); + void unfreeze_inode(); + void freeze_auth_pin(); + void unfreeze_auth_pin(); // -- reference counting -- void bad_put(int by) { diff --git a/src/mds/Capability.h b/src/mds/Capability.h index f3743281c90..6fe67f45b1d 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -297,7 +297,8 @@ public: int newpending = other.pending | pending(); if (other.issued & ~newpending) issue(other.issued | newpending); - issue(newpending); + else + issue(newpending); last_issue_stamp = other.last_issue_stamp; client_follows = other.client_follows; @@ -311,7 +312,8 @@ public: int newpending = pending(); if (otherissued & ~newpending) issue(otherissued | newpending); - issue(newpending); + else + issue(newpending); // wanted _wanted = _wanted | otherwanted; diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index c29ac34ec85..ee4799e18f8 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -174,7 +174,8 @@ bool Locker::acquire_locks(MDRequest *mdr, set<SimpleLock*> &rdlocks, set<SimpleLock*> &wrlocks, set<SimpleLock*> &xlocks, - map<SimpleLock*,int> *remote_wrlocks) + map<SimpleLock*,int> *remote_wrlocks, + CInode *auth_pin_freeze) { if (mdr->done_locking && !mdr->is_slave()) { // not on slaves! master requests locks piecemeal. @@ -196,6 +197,8 @@ bool Locker::acquire_locks(MDRequest *mdr, // augment xlock with a versionlock? if ((*p)->get_type() == CEPH_LOCK_DN) { CDentry *dn = (CDentry*)(*p)->get_parent(); + if (!dn->is_auth()) + continue; if (xlocks.count(&dn->versionlock)) continue; // we're xlocking the versionlock too; don't wrlock it! @@ -213,6 +216,8 @@ bool Locker::acquire_locks(MDRequest *mdr, if ((*p)->get_type() > CEPH_LOCK_IVERSION) { // inode version lock? CInode *in = (CInode*)(*p)->get_parent(); + if (!in->is_auth()) + continue; if (mdr->is_master()) { // master. wrlock versionlock so we can pipeline inode updates to journal. wrlocks.insert(&in->versionlock); @@ -282,11 +287,12 @@ bool Locker::acquire_locks(MDRequest *mdr, continue; if (!object->is_auth()) { + if (!mdr->locks.empty()) + mds->locker->drop_locks(mdr); if (object->is_ambiguous_auth()) { // wait dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl; object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr)); - mds->locker->drop_locks(mdr); mdr->drop_local_auth_pins(); return false; } @@ -331,7 +337,9 @@ bool Locker::acquire_locks(MDRequest *mdr, dout(10) << " req remote auth_pin of " << **q << dendl; MDSCacheObjectInfo info; (*q)->set_object_info(info); - req->get_authpins().push_back(info); + req->get_authpins().push_back(info); + if (*q == auth_pin_freeze) + (*q)->set_object_info(req->get_authpin_freeze()); mdr->pin(*q); } mds->send_message_mds(req, p->first); @@ -845,8 +853,8 @@ void Locker::try_eval(MDSCacheObject *p, int mask) return; } - if (p->is_auth() && !p->can_auth_pin()) { - dout(7) << "try_eval can't auth_pin, waiting on " << *p << dendl; + if (p->is_auth() && p->is_frozen()) { + dout(7) << "try_eval frozen, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask)); return; } @@ -3920,6 +3928,7 @@ void Locker::local_wrlock_grab(LocalLock *lock, Mutation *mut) dout(7) << "local_wrlock_grab on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); assert(lock->can_wrlock()); assert(!mut->wrlocks.count(lock)); lock->get_wrlock(mut->get_client()); @@ -3932,6 +3941,7 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequest *mut) dout(7) << "local_wrlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); if (lock->can_wrlock()) { assert(!mut->wrlocks.count(lock)); lock->get_wrlock(mut->get_client()); @@ -3963,6 +3973,7 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequest *mut) dout(7) << "local_xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; + assert(lock->get_parent()->is_auth()); if (!lock->can_xlock_local()) { lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); return false; @@ -4397,8 +4408,12 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m) if (lock->get_state() == LOCK_MIX_LOCK || lock->get_state() == LOCK_MIX_LOCK2 || lock->get_state() == LOCK_MIX_EXCL || - lock->get_state() == LOCK_MIX_TSYN) + lock->get_state() == LOCK_MIX_TSYN) { lock->decode_locked_state(m->get_data()); + // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not + // delay calling scatter_writebehind(). + lock->clear_flushed(); + } if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from diff --git a/src/mds/Locker.h b/src/mds/Locker.h index a1cf59e3185..b3b9919e7fd 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -88,7 +88,8 @@ public: set<SimpleLock*> &rdlocks, set<SimpleLock*> &wrlocks, set<SimpleLock*> &xlocks, - map<SimpleLock*,int> *remote_wrlocks=NULL); + map<SimpleLock*,int> *remote_wrlocks=NULL, + CInode *auth_pin_freeze=NULL); void cancel_locking(Mutation *mut, set<CInode*> *pneed_issue); void drop_locks(Mutation *mut, set<CInode*> *pneed_issue=0); diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 5f0ba16defd..58a8b8a2a34 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -5498,9 +5498,10 @@ void MDCache::trim_dentry(CDentry *dn, map<int, MCacheExpire*>& expiremap) // adjust the dir state // NOTE: we can safely remove a clean, null dentry without effecting // directory completeness. - // (do this _before_ we unlink the inode, below!) + // (check this _before_ we unlink the inode, below!) + bool clear_complete = false; if (!(dnl->is_null() && dn->is_clean())) - dir->state_clear(CDir::STATE_COMPLETE); + clear_complete = true; // unlink the dentry if (dnl->is_remote()) { @@ -5520,6 +5521,9 @@ void MDCache::trim_dentry(CDentry *dn, map<int, MCacheExpire*>& expiremap) // remove dentry dir->add_to_bloom(dn); dir->remove_dentry(dn); + + if (clear_complete) + dir->state_clear(CDir::STATE_COMPLETE); // reexport? if (dir->get_num_head_items() == 0 && dir->is_subtree_root()) @@ -5708,9 +5712,8 @@ void MDCache::trim_non_auth() else { assert(dnl->is_null()); } - dir->add_to_bloom(dn); + dir->remove_dentry(dn); - // adjust the dir state dir->state_clear(CDir::STATE_COMPLETE); // dir incomplete! } @@ -5811,7 +5814,6 @@ bool MDCache::trim_non_auth_subtree(CDir *dir) dout(20) << "trim_non_auth_subtree(" << dir << ") removing inode " << in << " with dentry" << dn << dendl; dir->unlink_inode(dn); remove_inode(in); - dir->add_to_bloom(dn); dir->remove_dentry(dn); } else { dout(20) << "trim_non_auth_subtree(" << dir << ") keeping inode " << in << " with dentry " << dn <<dendl; @@ -5928,6 +5930,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m) continue; } assert(!(parent_dir->is_auth() && parent_dir->is_exporting()) || + migrator->get_export_state(parent_dir) <= Migrator::EXPORT_PREPPING || (migrator->get_export_state(parent_dir) == Migrator::EXPORT_WARNING && !migrator->export_has_warned(parent_dir, from))); @@ -6706,11 +6709,11 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh // can we conclude ENOENT? if (dnl && dnl->is_null()) { - if (dn->lock.can_read(client)) { - dout(12) << "traverse: miss on null+readable dentry " << path[depth] << " " << *dn << dendl; + if (mds->locker->rdlock_try(&dn->lock, client, NULL)) { + dout(10) << "traverse: miss on null+readable dentry " << path[depth] << " " << *dn << dendl; return -ENOENT; } else { - dout(12) << "miss on dentry " << *dn << ", can't read due to lock" << dendl; + dout(10) << "miss on dentry " << *dn << ", can't read due to lock" << dendl; dn->lock.add_waiter(SimpleLock::WAIT_RD, _get_waiter(mdr, req, fin)); return 1; } @@ -6730,7 +6733,8 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh } else { dout(7) << "remote link to " << dnl->get_remote_ino() << ", which i don't have" << dendl; assert(mdr); // we shouldn't hit non-primary dentries doing a non-mdr traversal! - open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin)); + open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin), + (null_okay && depth == path.depth() - 1)); if (mds->logger) mds->logger->inc(l_mds_trino); return 1; } @@ -7016,12 +7020,13 @@ CInode *MDCache::get_dentry_inode(CDentry *dn, MDRequest *mdr, bool projected) class C_MDC_RetryOpenRemoteIno : public Context { MDCache *mdcache; inodeno_t ino; + bool want_xlocked; Context *onfinish; public: - C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c) : - mdcache(mdc), ino(i), onfinish(c) {} + C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c, bool wx) : + mdcache(mdc), ino(i), want_xlocked(wx), onfinish(c) {} void finish(int r) { - mdcache->open_remote_ino(ino, onfinish); + mdcache->open_remote_ino(ino, onfinish, want_xlocked); } }; @@ -7031,19 +7036,20 @@ class C_MDC_OpenRemoteIno : public Context { inodeno_t ino; inodeno_t hadino; version_t hadv; + bool want_xlocked; Context *onfinish; public: vector<Anchor> anchortrace; - C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, inodeno_t hi, version_t hv, Context *c) : - mdcache(mdc), ino(i), hadino(hi), hadv(hv), onfinish(c) {} - C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, vector<Anchor>& at, Context *c) : - mdcache(mdc), ino(i), hadino(0), hadv(0), onfinish(c), anchortrace(at) {} + C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, bool wx, inodeno_t hi, version_t hv, Context *c) : + mdcache(mdc), ino(i), hadino(hi), hadv(hv), want_xlocked(wx), onfinish(c) {} + C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, bool wx, vector<Anchor>& at, Context *c) : + mdcache(mdc), ino(i), hadino(0), hadv(0), want_xlocked(wx), onfinish(c), anchortrace(at) {} void finish(int r) { assert(r == 0); if (r == 0) - mdcache->open_remote_ino_2(ino, anchortrace, hadino, hadv, onfinish); + mdcache->open_remote_ino_2(ino, anchortrace, want_xlocked, hadino, hadv, onfinish); else { onfinish->finish(r); delete onfinish; @@ -7051,18 +7057,18 @@ public: } }; -void MDCache::open_remote_ino(inodeno_t ino, Context *onfinish, inodeno_t hadino, version_t hadv) +void MDCache::open_remote_ino(inodeno_t ino, Context *onfinish, bool want_xlocked, + inodeno_t hadino, version_t hadv) { - dout(7) << "open_remote_ino on " << ino << dendl; + dout(7) << "open_remote_ino on " << ino << (want_xlocked ? " want_xlocked":"") << dendl; - C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, hadino, hadv, onfinish); + C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, want_xlocked, + hadino, hadv, onfinish); mds->anchorclient->lookup(ino, c->anchortrace, c); } -void MDCache::open_remote_ino_2(inodeno_t ino, - vector<Anchor>& anchortrace, - inodeno_t hadino, version_t hadv, - Context *onfinish) +void MDCache::open_remote_ino_2(inodeno_t ino, vector<Anchor>& anchortrace, bool want_xlocked, + inodeno_t hadino, version_t hadv, Context *onfinish) { dout(7) << "open_remote_ino_2 on " << ino << ", trace depth is " << anchortrace.size() << dendl; @@ -7105,7 +7111,7 @@ void MDCache::open_remote_ino_2(inodeno_t ino, if (!in->dirfragtree.contains(frag)) { dout(10) << "frag " << frag << " not valid, requerying anchortable" << dendl; - open_remote_ino(ino, onfinish); + open_remote_ino(ino, onfinish, want_xlocked); return; } @@ -7115,14 +7121,15 @@ void MDCache::open_remote_ino_2(inodeno_t ino, dout(10) << "opening remote dirfrag " << frag << " under " << *in << dendl; /* we re-query the anchortable just to avoid a fragtree update race */ open_remote_dirfrag(in, frag, - new C_MDC_RetryOpenRemoteIno(this, ino, onfinish)); + new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked)); return; } if (!dir && in->is_auth()) { if (in->is_frozen_dir()) { dout(7) << "traverse: " << *in << " is frozen_dir, waiting" << dendl; - in->parent->dir->add_waiter(CDir::WAIT_UNFREEZE, onfinish); + in->parent->dir->add_waiter(CDir::WAIT_UNFREEZE, + new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked)); return; } dir = in->get_or_open_dirfrag(this, frag); @@ -7144,20 +7151,22 @@ void MDCache::open_remote_ino_2(inodeno_t ino, << " in complete dir " << *dir << ", requerying anchortable" << dendl; - open_remote_ino(ino, onfinish, anchortrace[i].ino, anchortrace[i].updated); + open_remote_ino(ino, onfinish, want_xlocked, + anchortrace[i].ino, anchortrace[i].updated); } } else { dout(10) << "need ino " << anchortrace[i].ino << ", fetching incomplete dir " << *dir << dendl; - dir->fetch(new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish)); + dir->fetch(new C_MDC_OpenRemoteIno(this, ino, want_xlocked, anchortrace, onfinish)); } } else { // hmm, discover. dout(10) << "have remote dirfrag " << *dir << ", discovering " << anchortrace[i].ino << dendl; - discover_ino(dir, anchortrace[i].ino, - new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish)); + discover_ino(dir, anchortrace[i].ino, + new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked), + (want_xlocked && i == anchortrace.size() - 1)); } } @@ -7476,13 +7485,17 @@ void MDCache::request_finish(MDRequest *mdr) void MDCache::request_forward(MDRequest *mdr, int who, int port) { - dout(7) << "request_forward " << *mdr << " to mds." << who << " req " << *mdr << dendl; - - mds->forward_message_mds(mdr->client_request, who); - mdr->client_request = 0; + if (mdr->client_request->get_source().is_client()) { + dout(7) << "request_forward " << *mdr << " to mds." << who << " req " + << *mdr->client_request << dendl; + mds->forward_message_mds(mdr->client_request, who); + mdr->client_request = 0; + if (mds->logger) mds->logger->inc(l_mds_fw); + } else { + dout(7) << "request_forward drop " << *mdr << " req " << *mdr->client_request + << " was from mds" << dendl; + } request_cleanup(mdr); - - if (mds->logger) mds->logger->inc(l_mds_fw); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 64290aa97b9..31c7467bf41 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -701,11 +701,11 @@ public: void open_remote_dirfrag(CInode *diri, frag_t fg, Context *fin); CInode *get_dentry_inode(CDentry *dn, MDRequest *mdr, bool projected=false); - void open_remote_ino(inodeno_t ino, Context *fin, inodeno_t hadino=0, version_t hadv=0); + void open_remote_ino(inodeno_t ino, Context *fin, bool want_xlocked=false, + inodeno_t hadino=0, version_t hadv=0); void open_remote_ino_2(inodeno_t ino, - vector<Anchor>& anchortrace, - inodeno_t hadino, version_t hadv, - Context *onfinish); + vector<Anchor>& anchortrace, bool want_xlocked, + inodeno_t hadino, version_t hadv, Context *onfinish); void open_remote_dentry(CDentry *dn, bool projected, Context *fin); void _open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin); diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index b66b54cbc83..a804eab7731 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -1051,6 +1051,7 @@ void Migrator::finish_export_inode_caps(CInode *in) mds->send_message_client_counted(m, it->first); } in->clear_client_caps_after_export(); + mds->locker->eval(in, CEPH_CAP_LOCKS); } void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished) diff --git a/src/mds/Mutation.cc b/src/mds/Mutation.cc index 6321ffc160a..a9c35134bc8 100644 --- a/src/mds/Mutation.cc +++ b/src/mds/Mutation.cc @@ -82,8 +82,39 @@ void Mutation::auth_unpin(MDSCacheObject *object) auth_pins.erase(object); } +bool Mutation::freeze_auth_pin(CInode *inode) +{ + assert(!auth_pin_freeze || auth_pin_freeze == inode); + auth_pin_freeze = inode; + auth_pin(inode); + if (!inode->freeze_inode(1)) + return false; + + inode->freeze_auth_pin(); + inode->unfreeze_inode(); + return true; +} + +void Mutation::unfreeze_auth_pin(CInode *inode) +{ + assert(auth_pin_freeze == inode); + assert(is_auth_pinned(inode)); + if (inode->is_frozen_auth_pin()) + inode->unfreeze_auth_pin(); + else + inode->unfreeze_inode(); + auth_pin_freeze = NULL; +} + +bool Mutation::can_auth_pin(MDSCacheObject *object) +{ + return object->can_auth_pin() || (is_auth_pinned(object) && object == auth_pin_freeze); +} + void Mutation::drop_local_auth_pins() { + if (auth_pin_freeze) + unfreeze_auth_pin(auth_pin_freeze); for (set<MDSCacheObject*>::iterator it = auth_pins.begin(); it != auth_pins.end(); it++) { diff --git a/src/mds/Mutation.h b/src/mds/Mutation.h index cba6223864e..37cc764254d 100644 --- a/src/mds/Mutation.h +++ b/src/mds/Mutation.h @@ -50,6 +50,7 @@ struct Mutation { // auth pins set< MDSCacheObject* > remote_auth_pins; set< MDSCacheObject* > auth_pins; + CInode *auth_pin_freeze; // held locks set< SimpleLock* > rdlocks; // always local. @@ -81,12 +82,14 @@ struct Mutation { : attempt(0), ls(0), slave_to_mds(-1), + auth_pin_freeze(NULL), locking(NULL), done_locking(false), committing(false), aborted(false), killed(false) { } Mutation(metareqid_t ri, __u32 att=0, int slave_to=-1) : reqid(ri), attempt(att), ls(0), slave_to_mds(slave_to), + auth_pin_freeze(NULL), locking(NULL), done_locking(false), committing(false), aborted(false), killed(false) { } virtual ~Mutation() { @@ -120,6 +123,9 @@ struct Mutation { bool is_auth_pinned(MDSCacheObject *object); void auth_pin(MDSCacheObject *object); void auth_unpin(MDSCacheObject *object); + bool freeze_auth_pin(CInode *inode); + void unfreeze_auth_pin(CInode *inode); + bool can_auth_pin(MDSCacheObject *object); void drop_local_auth_pins(); void add_projected_inode(CInode *in); void pop_and_dirty_projected_inodes(); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index d5548a8493c..ba436566dec 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -1487,6 +1487,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) // build list of objects list<MDSCacheObject*> objects; + CInode *auth_pin_freeze = NULL; bool fail = false; for (vector<MDSCacheObjectInfo>::iterator p = mdr->slave_request->get_authpins().begin(); @@ -1500,6 +1501,8 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) } objects.push_back(object); + if (*p == mdr->slave_request->get_authpin_freeze()) + auth_pin_freeze = dynamic_cast<CInode*>(object); } // can we auth pin them? @@ -1512,8 +1515,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) fail = true; break; } - if (!mdr->is_auth_pinned(*p) && - !(*p)->can_auth_pin()) { + if (!mdr->can_auth_pin(*p)) { // wait dout(10) << " waiting for authpinnable on " << **p << dendl; (*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); @@ -1527,6 +1529,22 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) if (fail) { mdr->drop_local_auth_pins(); // just in case } else { + /* handle_slave_rename_prep() call freeze_inode() to wait for all other operations + * on the source inode to complete. This happens after all locks for the rename + * operation are acquired. But to acquire locks, we need auth pin locks' parent + * objects first. So there is an ABBA deadlock if someone auth pins the source inode + * after locks are acquired and before Server::handle_slave_rename_prep() is called. + * The solution is freeze the inode and prevent other MDRequests from getting new + * auth pins. + */ + if (auth_pin_freeze) { + dout(10) << " freezing auth pin on " << *auth_pin_freeze << dendl; + if (!mdr->freeze_auth_pin(auth_pin_freeze)) { + auth_pin_freeze->add_waiter(CInode::WAIT_FROZEN, new C_MDS_RetryRequest(mdcache, mdr)); + mds->mdlog->flush(); + return; + } + } for (list<MDSCacheObject*>::iterator p = objects.begin(); p != objects.end(); ++p) { @@ -1923,7 +1941,8 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, int n, // do NOT proceed if freezing, as cap release may defer in that case, and // we could deadlock when we try to lock @ref. // if we're already auth_pinned, continue; the release has already been processed. - if (ref->is_frozen() || (ref->is_freezing() && !mdr->is_auth_pinned(ref))) { + if (ref->is_frozen() || ref->is_frozen_auth_pin() || + (ref->is_freezing() && !mdr->is_auth_pinned(ref))) { dout(7) << "waiting for !frozen/authpinnable on " << *ref << dendl; ref->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); /* If we have any auth pins, this will deadlock. @@ -5202,25 +5221,27 @@ void Server::handle_client_rename(MDRequest *mdr) wrlocks.insert(&straydn->get_dir()->inode->nestlock); } - // xlock versionlock on srci if remote? - // this ensures it gets safely remotely auth_pinned, avoiding deadlock; - // strictly speaking, having the slave node freeze the inode is - // otherwise sufficient for avoiding conflicts with inode locks, etc. - if (!srcdn->is_auth() && srcdnl->is_primary()) // xlock versionlock on srci if there are any witnesses - xlocks.insert(&srci->versionlock); - // xlock versionlock on dentries if there are witnesses. // replicas can't see projected dentry linkages, and will get // confused if we try to pipeline things. if (!witnesses.empty()) { - if (srcdn->is_projected()) - xlocks.insert(&srcdn->versionlock); - if (destdn->is_projected()) - xlocks.insert(&destdn->versionlock); - // also take rdlock on all ancestor dentries for destdn. this ensures that the - // destdn can be traversed to by the witnesses. - for (int i=0; i<(int)desttrace.size(); i++) - xlocks.insert(&desttrace[i]->versionlock); + // take xlock on all projected ancestor dentries for srcdn and destdn. + // this ensures the srcdn and destdn can be traversed to by the witnesses. + for (int i= 0; i<(int)srctrace.size(); i++) { + if (srctrace[i]->is_auth() && srctrace[i]->is_projected()) + xlocks.insert(&srctrace[i]->versionlock); + } + for (int i=0; i<(int)desttrace.size(); i++) { + if (desttrace[i]->is_auth() && desttrace[i]->is_projected()) + xlocks.insert(&desttrace[i]->versionlock); + } + // xlock srci and oldin's primary dentries, so witnesses can call + // open_remote_ino() with 'want_locked=true' when the srcdn or destdn + // is traversed. + if (srcdnl->is_remote()) + xlocks.insert(&srci->get_projected_parent_dn()->lock); + if (destdnl->is_remote()) + xlocks.insert(&oldin->get_projected_parent_dn()->lock); } // we need to update srci's ctime. xlock its least contended lock to do that... @@ -5244,7 +5265,9 @@ void Server::handle_client_rename(MDRequest *mdr) // take any locks needed for anchor creation/verification mds->mdcache->anchor_create_prep_locks(mdr, srci, rdlocks, xlocks); - if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, &remote_wrlocks)) + CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : NULL; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, + &remote_wrlocks, auth_pin_freeze)) return; if (oldin && @@ -5681,9 +5704,10 @@ void Server::_rename_prepare(MDRequest *mdr, } else if (destdnl->is_remote()) { if (oldin->is_auth()) { // auth for targeti - metablob->add_dir_context(oldin->get_parent_dir()); - mdcache->journal_cow_dentry(mdr, metablob, oldin->parent, CEPH_NOSNAP, 0, destdnl); - metablob->add_primary_dentry(oldin->parent, true, oldin); + metablob->add_dir_context(oldin->get_projected_parent_dir()); + mdcache->journal_cow_dentry(mdr, metablob, oldin->get_projected_parent_dn(), + CEPH_NOSNAP, 0, destdnl); + metablob->add_primary_dentry(oldin->get_projected_parent_dn(), true, oldin); } if (destdn->is_auth()) { // auth for dn, not targeti @@ -5702,10 +5726,10 @@ void Server::_rename_prepare(MDRequest *mdr, if (destdn->is_auth()) metablob->add_remote_dentry(destdn, true, srcdnl->get_remote_ino(), srcdnl->get_remote_d_type()); - if (srci->get_parent_dn()->is_auth()) { // it's remote - metablob->add_dir_context(srci->get_parent_dir()); - mdcache->journal_cow_dentry(mdr, metablob, srci->get_parent_dn(), CEPH_NOSNAP, 0, srcdnl); - metablob->add_primary_dentry(srci->get_parent_dn(), true, srci); + if (srci->get_projected_parent_dn()->is_auth()) { // it's remote + metablob->add_dir_context(srci->get_projected_parent_dir()); + mdcache->journal_cow_dentry(mdr, metablob, srci->get_projected_parent_dn(), CEPH_NOSNAP, 0, srcdnl); + metablob->add_primary_dentry(srci->get_projected_parent_dn(), true, srci); } } else { if (destdn->is_auth() && !destdnl->is_null()) @@ -5994,9 +6018,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) // am i srcdn auth? if (srcdn->is_auth()) { - if (srcdnl->is_primary() && - !srcdnl->get_inode()->is_freezing_inode() && - !srcdnl->get_inode()->is_frozen_inode()) { + if (srcdnl->is_primary()) { // set ambiguous auth for srci /* * NOTE: we don't worry about ambiguous cache expire as we do @@ -6013,7 +6035,13 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) int allowance = 2; // 1 for the mdr auth_pin, 1 for the link lock allowance += srcdnl->get_inode()->is_dir(); // for the snap lock dout(10) << " freezing srci " << *srcdnl->get_inode() << " with allowance " << allowance << dendl; - if (!srcdnl->get_inode()->freeze_inode(allowance)) { + bool frozen_inode = srcdnl->get_inode()->freeze_inode(allowance); + + // unfreeze auth pin after freezing the inode to avoid queueing waiters + if (srcdnl->get_inode()->is_frozen_auth_pin()) + mdr->unfreeze_auth_pin(srcdnl->get_inode()); + + if (!frozen_inode) { srcdnl->get_inode()->add_waiter(CInode::WAIT_FROZEN, new C_MDS_RetryRequest(mdcache, mdr)); return; } @@ -6181,8 +6209,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, destdnl->get_inode()->take_waiting(CInode::WAIT_SINGLEAUTH, finished); // unfreeze - assert(destdnl->get_inode()->is_frozen_inode() || - destdnl->get_inode()->is_freezing_inode()); + assert(destdnl->get_inode()->is_frozen_inode()); destdnl->get_inode()->unfreeze_inode(finished); mds->queue_waiters(finished); @@ -6205,8 +6232,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, destdnl->get_inode()->take_waiting(CInode::WAIT_SINGLEAUTH, finished); // unfreeze - assert(destdnl->get_inode()->is_frozen_inode() || - destdnl->get_inode()->is_freezing_inode()); + assert(destdnl->get_inode()->is_frozen_inode()); destdnl->get_inode()->unfreeze_inode(finished); mds->queue_waiters(finished); diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index db4dbf1ac61..22e754eb2a1 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -1250,6 +1250,13 @@ public: } }; +inline bool operator==(const MDSCacheObjectInfo& l, const MDSCacheObjectInfo& r) { + if (l.ino || r.ino) + return l.ino == r.ino && l.snapid == r.snapid; + else + return l.dirfrag == r.dirfrag && l.dname == r.dname; +} + WRITE_CLASS_ENCODER(MDSCacheObjectInfo) diff --git a/src/messages/MMDSSlaveRequest.h b/src/messages/MMDSSlaveRequest.h index 4f2bb5948bd..03ec582c49e 100644 --- a/src/messages/MMDSSlaveRequest.h +++ b/src/messages/MMDSSlaveRequest.h @@ -112,6 +112,7 @@ public: int get_lock_type() { return lock_type; } MDSCacheObjectInfo &get_object_info() { return object_info; } + MDSCacheObjectInfo &get_authpin_freeze() { return object_info; } vector<MDSCacheObjectInfo>& get_authpins() { return authpins; } diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index e273ad49f2b..1ebf2854473 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -54,6 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) : reader_thread(this), writer_thread(this), + delay_thread(NULL), msgr(r), conn_id(r->dispatch_queue.get_id()), sd(-1), port(0), @@ -94,6 +95,7 @@ Pipe::~Pipe() if (connection_state) connection_state->put(); delete session_security; + delete delay_thread; } void Pipe::handle_ack(uint64_t seq) @@ -127,6 +129,16 @@ void Pipe::start_reader() reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); } +void Pipe::maybe_start_delay_thread() +{ + if (!delay_thread && + msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) { + lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; + delay_thread = new DelayedDelivery(this); + delay_thread->create(); + } +} + void Pipe::start_writer() { assert(pipe_lock.is_locked()); @@ -146,14 +158,54 @@ void Pipe::join_reader() reader_needs_join = false; } +void Pipe::DelayedDelivery::discard() +{ + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + pipe->msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } +} -void Pipe::queue_received(Message *m, int priority) +void Pipe::DelayedDelivery::flush() { - assert(pipe_lock.is_locked()); - in_q->enqueue(m, priority, conn_id); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } } +void *Pipe::DelayedDelivery::entry() +{ + Mutex::Locker locker(delay_lock); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl; + while (!stop_delayed_delivery) { + if (delay_queue.empty()) { + lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond.Wait(delay_lock); + continue; + } + utime_t release = delay_queue.front().first; + if (release > ceph_clock_now(pipe->msgr->cct)) { + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; + delay_cond.WaitUntil(delay_lock, release); + continue; + } + Message *m = delay_queue.front().second; + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; + return NULL; +} int Pipe::accept() { @@ -503,7 +555,11 @@ int Pipe::accept() // make existing Connection reference us existing->connection_state->reset_pipe(this); - + + // flush/queue any existing delayed messages + if (existing->delay_thread) + existing->delay_thread->flush(); + // steal incoming queue uint64_t replaced_conn_id = conn_id; conn_id = existing->conn_id; @@ -587,6 +643,9 @@ int Pipe::accept() } ldout(msgr->cct,20) << "accept done" << dendl; pipe_lock.Unlock(); + + maybe_start_delay_thread(); + return 0; // success. fail_unlocked: @@ -936,6 +995,7 @@ int Pipe::connect() ldout(msgr->cct,20) << "connect starting reader" << dendl; start_reader(); } + maybe_start_delay_thread(); delete authorizer; return 0; } @@ -1020,7 +1080,6 @@ void Pipe::discard_out_queue() out_q.clear(); } - void Pipe::fault(bool onread) { const md_config_t *conf = msgr->cct->_conf; @@ -1057,6 +1116,8 @@ void Pipe::fault(bool onread) msgr->lock.Unlock(); in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); // disconnect from Connection, and mark it failed. future messages @@ -1068,6 +1129,10 @@ void Pipe::fault(bool onread) return; } + // queue delayed items immediately + if (delay_thread) + delay_thread->flush(); + // requeue sent items requeue_sent(); @@ -1075,7 +1140,7 @@ void Pipe::fault(bool onread) ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; return; - } + } if (state != STATE_CONNECTING) { if (policy.server) { @@ -1122,6 +1187,8 @@ void Pipe::was_session_reset() ldout(msgr->cct,10) << "was_session_reset" << dendl; in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); msgr->dispatch_queue.queue_remote_reset(connection_state); @@ -1243,7 +1310,18 @@ void Pipe::reader() ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; - queue_received(m); + + if (delay_thread) { + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); + } else { + in_q->enqueue(m, m->get_priority(), conn_id); + } } else if (tag == CEPH_MSGR_TAG_CLOSE) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index cc92b2f6f8b..1bcc8263f4a 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -60,6 +60,46 @@ class DispatchQueue; } writer_thread; friend class Writer; + /** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * in_q (SimpleMessenger::dispatch_queue). + * Please note that this probably has issues with Pipe shutdown and + * replacement semantics. I've tried, but no guarantees. + */ + class DelayedDelivery: public Thread { + Pipe *pipe; + std::deque< pair<utime_t,Message*> > delay_queue; + Mutex delay_lock; + Cond delay_cond; + bool stop_delayed_delivery; + + public: + DelayedDelivery(Pipe *p) + : pipe(p), + delay_lock("Pipe::DelayedDelivery::delay_lock"), + stop_delayed_delivery(false) { } + ~DelayedDelivery() { + discard(); + } + void *entry(); + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard(); + void flush(); + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + } *delay_thread; + friend class DelayedDelivery; + public: Pipe(SimpleMessenger *r, int st, Connection *con); ~Pipe(); @@ -166,25 +206,13 @@ class DispatchQueue; void start_reader(); void start_writer(); + void maybe_start_delay_thread(); void join_reader(); // public constructors static const Pipe& Server(int s); static const Pipe& Client(const entity_addr_t& pi); - //we have two queue_received's to allow local signal delivery - // via Message * (that doesn't actually point to a Message) - void queue_received(Message *m, int priority); - - void queue_received(Message *m) { - // this is just to make sure that a changeset is working - // properly; if you start using the refcounting more and have - // multiple people hanging on to a message, ditch the assert! - assert(m->nref.read() == 1); - - queue_received(m, m->get_priority()); - } - __u32 get_out_seq() { return out_seq; } bool is_queued() { return !out_q.empty() || keepalive; } @@ -208,6 +236,10 @@ class DispatchQueue; writer_thread.join(); if (reader_thread.is_started()) reader_thread.join(); + if (delay_thread) { + delay_thread->stop(); + delay_thread->join(); + } } void stop(); diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 3feb92924a9..2c66a5ea7db 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls, void FileStore::queue_op(OpSequencer *osr, Op *o) { - // mark apply start _now_, because we need to drain the entire apply - // queue during commit in order to put the store in a consistent - // state. - apply_manager.op_apply_start(o->op); + // queue op on sequencer, then queue sequencer for the threadpool, + // so that regardless of which order the threads pick up the + // sequencer, the op order will be preserved. osr->queue(o); @@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr) { osr->apply_lock.Lock(); Op *o = osr->peek_queue(); - + apply_manager.op_apply_start(o->op); dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl; int r = do_transactions(o->tls, o->op); apply_manager.op_apply_finish(o->op); dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; - - /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " - << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; - */ } void FileStore::_finish_op(OpSequencer *osr) diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 42b95c96a58..b1aee62eca8 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) Mutex::Locker l(apply_lock); // if we ops are blocked, or there are already people (left) in // line, get in line. - if (blocked || !ops_apply_blocked.empty()) { + if (op > max_applying_seq && + (blocked || !ops_apply_blocked.empty())) { Cond cond; ops_apply_blocked.push_back(&cond); dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; @@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) ops_apply_blocked.front()->Signal(); } } - dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl; - assert(!blocked); + dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) + << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl; + if (op > max_applying_seq) + max_applying_seq = op; + assert(op > committed_seq); open_ops++; return op; } @@ -136,15 +140,18 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) { Mutex::Locker l(apply_lock); dout(10) << "op_apply_finish " << op << " open_ops " << open_ops - << " -> " << (open_ops-1) << dendl; + << " -> " << (open_ops-1) + << ", max_applying_seq " << max_applying_seq + << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq) + << dendl; if (--open_ops == 0) open_ops_cond.Signal(); // there can be multiple applies in flight; track the max value we // note. note that we can't _read_ this value and learn anything // meaningful unless/until we've quiesced all in-flight applies. - if (op > applied_seq) - applied_seq = op; + if (op > max_applied_seq) + max_applied_seq = op; } uint64_t JournalingObjectStore::SubmitManager::op_submit_start() @@ -185,19 +192,21 @@ bool JournalingObjectStore::ApplyManager::commit_start() { Mutex::Locker l(apply_lock); - dout(10) << "commit_start " - << ", applied_seq " << applied_seq << dendl; + dout(10) << "commit_start max_applying_seq " << max_applying_seq + << ", max_applied_seq " << max_applied_seq + << dendl; blocked = true; while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; + dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, " + << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl; open_ops_cond.Wait(apply_lock); } assert(open_ops == 0); - + assert(max_applied_seq == max_applying_seq); dout(10) << "commit_start blocked, all open_ops have completed" << dendl; { Mutex::Locker l(com_lock); - if (applied_seq == committed_seq) { + if (max_applied_seq == committed_seq) { dout(10) << "commit_start nothing to do" << dendl; blocked = false; if (!ops_apply_blocked.empty()) @@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start() goto out; } - committing_seq = applied_seq; + committing_seq = max_applying_seq; dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl; diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index dff49d43cbb..ae74c32cd25 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -54,7 +54,8 @@ protected: Cond blocked_cond; int open_ops; Cond open_ops_cond; - uint64_t applied_seq; + uint64_t max_applying_seq; + uint64_t max_applied_seq; Mutex com_lock; map<version_t, vector<Context*> > commit_waiters; @@ -68,7 +69,8 @@ protected: apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context), blocked(false), open_ops(0), - applied_seq(0), + max_applying_seq(0), + max_applied_seq(0), com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context), committing_seq(0), committed_seq(0) {} void add_waiter(uint64_t, Context*); @@ -97,7 +99,7 @@ protected: } { Mutex::Locker l(apply_lock); - applied_seq = fs_op_seq; + max_applying_seq = max_applied_seq = fs_op_seq; } } } apply_manager; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 58859a5741a..6018587cacc 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5133,6 +5133,9 @@ void OSD::handle_pg_query(OpRequestRef op) continue; } + if (!osdmap->have_pg_pool(pgid.pool())) + continue; + // get active crush mapping vector<int> up, acting; osdmap->pg_to_up_acting_osds(pgid, up, acting); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 617ba9e250f..8c4c29ba7e7 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4531,10 +4531,6 @@ void PG::proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &oinfo) dirty_info = true; osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); - assert(oinfo.last_epoch_started == info.last_epoch_started); - assert(info.history.last_epoch_started == oinfo.last_epoch_started); - assert(oinfo.history.last_epoch_started == oinfo.last_epoch_started); - // Handle changes to purged_snaps ONLY IF we have caught up if (last_complete_ondisk.epoch >= info.history.last_epoch_started) { interval_set<snapid_t> p; |