summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Durgin <josh.durgin@inktank.com>2012-12-05 15:55:35 -0800
committerJosh Durgin <josh.durgin@inktank.com>2012-12-05 15:55:35 -0800
commit930bb55006aff45f717462b95690d6d0d0fb8150 (patch)
treebcb13d491bb779c99f9a5c9eeda454756c0b8775
parent27793255963f8d3ce329ce70326316175b2a101e (diff)
parent2a5549cc0cc1e99a8f7eb5db092674425ccdb075 (diff)
downloadceph-930bb55006aff45f717462b95690d6d0d0fb8150.tar.gz
Merge branch 'next'
-rw-r--r--qa/run_xfstests_qemu.sh7
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/logrotate.conf6
-rw-r--r--src/mds/CDir.cc6
-rw-r--r--src/mds/CInode.cc36
-rw-r--r--src/mds/CInode.h5
-rw-r--r--src/mds/Capability.h6
-rw-r--r--src/mds/Locker.cc27
-rw-r--r--src/mds/Locker.h3
-rw-r--r--src/mds/MDCache.cc87
-rw-r--r--src/mds/MDCache.h8
-rw-r--r--src/mds/Migrator.cc1
-rw-r--r--src/mds/Mutation.cc31
-rw-r--r--src/mds/Mutation.h6
-rw-r--r--src/mds/Server.cc94
-rw-r--r--src/mds/mdstypes.h7
-rw-r--r--src/messages/MMDSSlaveRequest.h1
-rw-r--r--src/msg/Pipe.cc92
-rw-r--r--src/msg/Pipe.h58
-rw-r--r--src/os/FileStore.cc13
-rw-r--r--src/os/JournalingObjectStore.cc33
-rw-r--r--src/os/JournalingObjectStore.h8
-rw-r--r--src/osd/OSD.cc3
-rw-r--r--src/osd/PG.cc4
24 files changed, 405 insertions, 140 deletions
diff --git a/qa/run_xfstests_qemu.sh b/qa/run_xfstests_qemu.sh
new file mode 100644
index 00000000000..0b5b86de090
--- /dev/null
+++ b/qa/run_xfstests_qemu.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+mkdir /tmp/cephtest
+wget https://raw.github.com/ceph/ceph/master/qa/run_xfstests.sh
+chmod +x run_xfstests.sh
+# tests excluded require extra packages for advanced acl and quota support
+./run_xfstests.sh -c 1 -f xfs -t /dev/vdb -s /dev/vdc 1-26 28-49 51-63 65-83 85-233 235-291
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 0cdb16c3ebf..eedb6d6a2f5 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
+OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
+OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
+OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
diff --git a/src/logrotate.conf b/src/logrotate.conf
index c4857f00c10..9af310413d9 100644
--- a/src/logrotate.conf
+++ b/src/logrotate.conf
@@ -4,13 +4,13 @@
compress
sharedscripts
postrotate
- if which invoke-rc.d && [ -x `which invoke-rc.d` ]; then
+ if which invoke-rc.d > /dev/null && [ -x `which invoke-rc.d` ]; then
invoke-rc.d ceph reload >/dev/null
- elif which service && [ -x `which service` ]; then
+ elif which service > /dev/null && [ -x `which service` ]; then
service ceph reload >/dev/null
fi
# Possibly reload twice, but depending on ceph.conf the reload above may be a no-op
- if which initctl && [ -x `which initctl` ]; then
+ if which initctl > /dev/null && [ -x `which initctl` ]; then
# upstart reload isn't very helpful here:
# https://bugs.launchpad.net/upstart/+bug/1012938
for type in mon osd mds; do
diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc
index 55b76d3a298..4b1d3ef76bc 100644
--- a/src/mds/CDir.cc
+++ b/src/mds/CDir.cc
@@ -615,8 +615,12 @@ void CDir::unlink_inode_work( CDentry *dn )
void CDir::add_to_bloom(CDentry *dn)
{
- if (!bloom)
+ if (!bloom) {
+ /* not create bloom filter for incomplete dir that was added by log replay */
+ if (!is_complete())
+ return;
bloom = new bloom_filter(100, 0.05, 0);
+ }
/* This size and false positive probability is completely random.*/
bloom->insert(dn->name.c_str(), dn->name.size());
}
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index c12930837df..af70b681ffc 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -130,6 +130,7 @@ ostream& operator<<(ostream& out, CInode& in)
if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
if (in.is_frozen_inode()) out << " FROZEN";
+ if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
inode_t *pi = in.get_projected_inode();
if (pi->is_truncating())
@@ -1862,7 +1863,8 @@ void CInode::add_waiter(uint64_t tag, Context *c)
// wait on the directory?
// make sure its not the inode that is explicitly ambiguous|freezing|frozen
if (((tag & WAIT_SINGLEAUTH) && !state_test(STATE_AMBIGUOUSAUTH)) ||
- ((tag & WAIT_UNFREEZE) && !is_frozen_inode() && !is_freezing_inode())) {
+ ((tag & WAIT_UNFREEZE) &&
+ !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
dout(15) << "passing waiter up tree" << dendl;
parent->dir->add_waiter(tag, c);
return;
@@ -1885,8 +1887,10 @@ bool CInode::freeze_inode(int auth_pin_allowance)
dout(10) << "freeze_inode - frozen" << dendl;
assert(auth_pins == auth_pin_allowance);
- get(PIN_FROZEN);
- state_set(STATE_FROZEN);
+ if (!state_test(STATE_FROZEN)) {
+ get(PIN_FROZEN);
+ state_set(STATE_FROZEN);
+ }
return true;
}
@@ -1904,10 +1908,34 @@ void CInode::unfreeze_inode(list<Context*>& finished)
take_waiting(WAIT_UNFREEZE, finished);
}
+void CInode::unfreeze_inode()
+{
+ list<Context*> finished;
+ unfreeze_inode(finished);
+ mdcache->mds->queue_waiters(finished);
+}
+
+void CInode::freeze_auth_pin()
+{
+ assert(state_test(CInode::STATE_FROZEN));
+ state_set(CInode::STATE_FROZENAUTHPIN);
+}
+
+void CInode::unfreeze_auth_pin()
+{
+ assert(state_test(CInode::STATE_FROZENAUTHPIN));
+ state_clear(CInode::STATE_FROZENAUTHPIN);
+ if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
+ list<Context*> finished;
+ take_waiting(WAIT_UNFREEZE, finished);
+ mdcache->mds->queue_waiters(finished);
+ }
+}
// auth_pins
bool CInode::can_auth_pin() {
- if (is_freezing_inode() || is_frozen_inode()) return false;
+ if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
+ return false;
if (parent)
return parent->can_auth_pin();
return true;
diff --git a/src/mds/CInode.h b/src/mds/CInode.h
index b76b52414c9..e43ecf50fa3 100644
--- a/src/mds/CInode.h
+++ b/src/mds/CInode.h
@@ -181,6 +181,7 @@ public:
static const int STATE_DIRTYPARENT = (1<<14);
static const int STATE_DIRTYRSTAT = (1<<15);
static const int STATE_STRAYPINNED = (1<<16);
+ static const int STATE_FROZENAUTHPIN = (1<<17);
// -- waiters --
static const uint64_t WAIT_DIR = (1<<0);
@@ -856,6 +857,7 @@ public:
// -- freeze --
bool is_freezing_inode() { return state_test(STATE_FREEZING); }
bool is_frozen_inode() { return state_test(STATE_FROZEN); }
+ bool is_frozen_auth_pin() { return state_test(STATE_FROZENAUTHPIN); }
bool is_frozen();
bool is_frozen_dir();
bool is_freezing();
@@ -864,7 +866,10 @@ public:
* auth_pins it is itself holding/responsible for. */
bool freeze_inode(int auth_pin_allowance=0);
void unfreeze_inode(list<Context*>& finished);
+ void unfreeze_inode();
+ void freeze_auth_pin();
+ void unfreeze_auth_pin();
// -- reference counting --
void bad_put(int by) {
diff --git a/src/mds/Capability.h b/src/mds/Capability.h
index f3743281c90..6fe67f45b1d 100644
--- a/src/mds/Capability.h
+++ b/src/mds/Capability.h
@@ -297,7 +297,8 @@ public:
int newpending = other.pending | pending();
if (other.issued & ~newpending)
issue(other.issued | newpending);
- issue(newpending);
+ else
+ issue(newpending);
last_issue_stamp = other.last_issue_stamp;
client_follows = other.client_follows;
@@ -311,7 +312,8 @@ public:
int newpending = pending();
if (otherissued & ~newpending)
issue(otherissued | newpending);
- issue(newpending);
+ else
+ issue(newpending);
// wanted
_wanted = _wanted | otherwanted;
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
index c29ac34ec85..ee4799e18f8 100644
--- a/src/mds/Locker.cc
+++ b/src/mds/Locker.cc
@@ -174,7 +174,8 @@ bool Locker::acquire_locks(MDRequest *mdr,
set<SimpleLock*> &rdlocks,
set<SimpleLock*> &wrlocks,
set<SimpleLock*> &xlocks,
- map<SimpleLock*,int> *remote_wrlocks)
+ map<SimpleLock*,int> *remote_wrlocks,
+ CInode *auth_pin_freeze)
{
if (mdr->done_locking &&
!mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
@@ -196,6 +197,8 @@ bool Locker::acquire_locks(MDRequest *mdr,
// augment xlock with a versionlock?
if ((*p)->get_type() == CEPH_LOCK_DN) {
CDentry *dn = (CDentry*)(*p)->get_parent();
+ if (!dn->is_auth())
+ continue;
if (xlocks.count(&dn->versionlock))
continue; // we're xlocking the versionlock too; don't wrlock it!
@@ -213,6 +216,8 @@ bool Locker::acquire_locks(MDRequest *mdr,
if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
// inode version lock?
CInode *in = (CInode*)(*p)->get_parent();
+ if (!in->is_auth())
+ continue;
if (mdr->is_master()) {
// master. wrlock versionlock so we can pipeline inode updates to journal.
wrlocks.insert(&in->versionlock);
@@ -282,11 +287,12 @@ bool Locker::acquire_locks(MDRequest *mdr,
continue;
if (!object->is_auth()) {
+ if (!mdr->locks.empty())
+ mds->locker->drop_locks(mdr);
if (object->is_ambiguous_auth()) {
// wait
dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
- mds->locker->drop_locks(mdr);
mdr->drop_local_auth_pins();
return false;
}
@@ -331,7 +337,9 @@ bool Locker::acquire_locks(MDRequest *mdr,
dout(10) << " req remote auth_pin of " << **q << dendl;
MDSCacheObjectInfo info;
(*q)->set_object_info(info);
- req->get_authpins().push_back(info);
+ req->get_authpins().push_back(info);
+ if (*q == auth_pin_freeze)
+ (*q)->set_object_info(req->get_authpin_freeze());
mdr->pin(*q);
}
mds->send_message_mds(req, p->first);
@@ -845,8 +853,8 @@ void Locker::try_eval(MDSCacheObject *p, int mask)
return;
}
- if (p->is_auth() && !p->can_auth_pin()) {
- dout(7) << "try_eval can't auth_pin, waiting on " << *p << dendl;
+ if (p->is_auth() && p->is_frozen()) {
+ dout(7) << "try_eval frozen, waiting on " << *p << dendl;
p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
return;
}
@@ -3920,6 +3928,7 @@ void Locker::local_wrlock_grab(LocalLock *lock, Mutation *mut)
dout(7) << "local_wrlock_grab on " << *lock
<< " on " << *lock->get_parent() << dendl;
+ assert(lock->get_parent()->is_auth());
assert(lock->can_wrlock());
assert(!mut->wrlocks.count(lock));
lock->get_wrlock(mut->get_client());
@@ -3932,6 +3941,7 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequest *mut)
dout(7) << "local_wrlock_start on " << *lock
<< " on " << *lock->get_parent() << dendl;
+ assert(lock->get_parent()->is_auth());
if (lock->can_wrlock()) {
assert(!mut->wrlocks.count(lock));
lock->get_wrlock(mut->get_client());
@@ -3963,6 +3973,7 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequest *mut)
dout(7) << "local_xlock_start on " << *lock
<< " on " << *lock->get_parent() << dendl;
+ assert(lock->get_parent()->is_auth());
if (!lock->can_xlock_local()) {
lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
return false;
@@ -4397,8 +4408,12 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
if (lock->get_state() == LOCK_MIX_LOCK ||
lock->get_state() == LOCK_MIX_LOCK2 ||
lock->get_state() == LOCK_MIX_EXCL ||
- lock->get_state() == LOCK_MIX_TSYN)
+ lock->get_state() == LOCK_MIX_TSYN) {
lock->decode_locked_state(m->get_data());
+ // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
+ // delay calling scatter_writebehind().
+ lock->clear_flushed();
+ }
if (lock->is_gathering()) {
dout(7) << "handle_file_lock " << *in << " from " << from
diff --git a/src/mds/Locker.h b/src/mds/Locker.h
index a1cf59e3185..b3b9919e7fd 100644
--- a/src/mds/Locker.h
+++ b/src/mds/Locker.h
@@ -88,7 +88,8 @@ public:
set<SimpleLock*> &rdlocks,
set<SimpleLock*> &wrlocks,
set<SimpleLock*> &xlocks,
- map<SimpleLock*,int> *remote_wrlocks=NULL);
+ map<SimpleLock*,int> *remote_wrlocks=NULL,
+ CInode *auth_pin_freeze=NULL);
void cancel_locking(Mutation *mut, set<CInode*> *pneed_issue);
void drop_locks(Mutation *mut, set<CInode*> *pneed_issue=0);
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index 5f0ba16defd..58a8b8a2a34 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -5498,9 +5498,10 @@ void MDCache::trim_dentry(CDentry *dn, map<int, MCacheExpire*>& expiremap)
// adjust the dir state
// NOTE: we can safely remove a clean, null dentry without effecting
// directory completeness.
- // (do this _before_ we unlink the inode, below!)
+ // (check this _before_ we unlink the inode, below!)
+ bool clear_complete = false;
if (!(dnl->is_null() && dn->is_clean()))
- dir->state_clear(CDir::STATE_COMPLETE);
+ clear_complete = true;
// unlink the dentry
if (dnl->is_remote()) {
@@ -5520,6 +5521,9 @@ void MDCache::trim_dentry(CDentry *dn, map<int, MCacheExpire*>& expiremap)
// remove dentry
dir->add_to_bloom(dn);
dir->remove_dentry(dn);
+
+ if (clear_complete)
+ dir->state_clear(CDir::STATE_COMPLETE);
// reexport?
if (dir->get_num_head_items() == 0 && dir->is_subtree_root())
@@ -5708,9 +5712,8 @@ void MDCache::trim_non_auth()
else {
assert(dnl->is_null());
}
- dir->add_to_bloom(dn);
+
dir->remove_dentry(dn);
-
// adjust the dir state
dir->state_clear(CDir::STATE_COMPLETE); // dir incomplete!
}
@@ -5811,7 +5814,6 @@ bool MDCache::trim_non_auth_subtree(CDir *dir)
dout(20) << "trim_non_auth_subtree(" << dir << ") removing inode " << in << " with dentry" << dn << dendl;
dir->unlink_inode(dn);
remove_inode(in);
- dir->add_to_bloom(dn);
dir->remove_dentry(dn);
} else {
dout(20) << "trim_non_auth_subtree(" << dir << ") keeping inode " << in << " with dentry " << dn <<dendl;
@@ -5928,6 +5930,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
continue;
}
assert(!(parent_dir->is_auth() && parent_dir->is_exporting()) ||
+ migrator->get_export_state(parent_dir) <= Migrator::EXPORT_PREPPING ||
(migrator->get_export_state(parent_dir) == Migrator::EXPORT_WARNING &&
!migrator->export_has_warned(parent_dir, from)));
@@ -6706,11 +6709,11 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh
// can we conclude ENOENT?
if (dnl && dnl->is_null()) {
- if (dn->lock.can_read(client)) {
- dout(12) << "traverse: miss on null+readable dentry " << path[depth] << " " << *dn << dendl;
+ if (mds->locker->rdlock_try(&dn->lock, client, NULL)) {
+ dout(10) << "traverse: miss on null+readable dentry " << path[depth] << " " << *dn << dendl;
return -ENOENT;
} else {
- dout(12) << "miss on dentry " << *dn << ", can't read due to lock" << dendl;
+ dout(10) << "miss on dentry " << *dn << ", can't read due to lock" << dendl;
dn->lock.add_waiter(SimpleLock::WAIT_RD, _get_waiter(mdr, req, fin));
return 1;
}
@@ -6730,7 +6733,8 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, Context *fin, // wh
} else {
dout(7) << "remote link to " << dnl->get_remote_ino() << ", which i don't have" << dendl;
assert(mdr); // we shouldn't hit non-primary dentries doing a non-mdr traversal!
- open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin));
+ open_remote_ino(dnl->get_remote_ino(), _get_waiter(mdr, req, fin),
+ (null_okay && depth == path.depth() - 1));
if (mds->logger) mds->logger->inc(l_mds_trino);
return 1;
}
@@ -7016,12 +7020,13 @@ CInode *MDCache::get_dentry_inode(CDentry *dn, MDRequest *mdr, bool projected)
class C_MDC_RetryOpenRemoteIno : public Context {
MDCache *mdcache;
inodeno_t ino;
+ bool want_xlocked;
Context *onfinish;
public:
- C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c) :
- mdcache(mdc), ino(i), onfinish(c) {}
+ C_MDC_RetryOpenRemoteIno(MDCache *mdc, inodeno_t i, Context *c, bool wx) :
+ mdcache(mdc), ino(i), want_xlocked(wx), onfinish(c) {}
void finish(int r) {
- mdcache->open_remote_ino(ino, onfinish);
+ mdcache->open_remote_ino(ino, onfinish, want_xlocked);
}
};
@@ -7031,19 +7036,20 @@ class C_MDC_OpenRemoteIno : public Context {
inodeno_t ino;
inodeno_t hadino;
version_t hadv;
+ bool want_xlocked;
Context *onfinish;
public:
vector<Anchor> anchortrace;
- C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, inodeno_t hi, version_t hv, Context *c) :
- mdcache(mdc), ino(i), hadino(hi), hadv(hv), onfinish(c) {}
- C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, vector<Anchor>& at, Context *c) :
- mdcache(mdc), ino(i), hadino(0), hadv(0), onfinish(c), anchortrace(at) {}
+ C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, bool wx, inodeno_t hi, version_t hv, Context *c) :
+ mdcache(mdc), ino(i), hadino(hi), hadv(hv), want_xlocked(wx), onfinish(c) {}
+ C_MDC_OpenRemoteIno(MDCache *mdc, inodeno_t i, bool wx, vector<Anchor>& at, Context *c) :
+ mdcache(mdc), ino(i), hadino(0), hadv(0), want_xlocked(wx), onfinish(c), anchortrace(at) {}
void finish(int r) {
assert(r == 0);
if (r == 0)
- mdcache->open_remote_ino_2(ino, anchortrace, hadino, hadv, onfinish);
+ mdcache->open_remote_ino_2(ino, anchortrace, want_xlocked, hadino, hadv, onfinish);
else {
onfinish->finish(r);
delete onfinish;
@@ -7051,18 +7057,18 @@ public:
}
};
-void MDCache::open_remote_ino(inodeno_t ino, Context *onfinish, inodeno_t hadino, version_t hadv)
+void MDCache::open_remote_ino(inodeno_t ino, Context *onfinish, bool want_xlocked,
+ inodeno_t hadino, version_t hadv)
{
- dout(7) << "open_remote_ino on " << ino << dendl;
+ dout(7) << "open_remote_ino on " << ino << (want_xlocked ? " want_xlocked":"") << dendl;
- C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, hadino, hadv, onfinish);
+ C_MDC_OpenRemoteIno *c = new C_MDC_OpenRemoteIno(this, ino, want_xlocked,
+ hadino, hadv, onfinish);
mds->anchorclient->lookup(ino, c->anchortrace, c);
}
-void MDCache::open_remote_ino_2(inodeno_t ino,
- vector<Anchor>& anchortrace,
- inodeno_t hadino, version_t hadv,
- Context *onfinish)
+void MDCache::open_remote_ino_2(inodeno_t ino, vector<Anchor>& anchortrace, bool want_xlocked,
+ inodeno_t hadino, version_t hadv, Context *onfinish)
{
dout(7) << "open_remote_ino_2 on " << ino
<< ", trace depth is " << anchortrace.size() << dendl;
@@ -7105,7 +7111,7 @@ void MDCache::open_remote_ino_2(inodeno_t ino,
if (!in->dirfragtree.contains(frag)) {
dout(10) << "frag " << frag << " not valid, requerying anchortable" << dendl;
- open_remote_ino(ino, onfinish);
+ open_remote_ino(ino, onfinish, want_xlocked);
return;
}
@@ -7115,14 +7121,15 @@ void MDCache::open_remote_ino_2(inodeno_t ino,
dout(10) << "opening remote dirfrag " << frag << " under " << *in << dendl;
/* we re-query the anchortable just to avoid a fragtree update race */
open_remote_dirfrag(in, frag,
- new C_MDC_RetryOpenRemoteIno(this, ino, onfinish));
+ new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked));
return;
}
if (!dir && in->is_auth()) {
if (in->is_frozen_dir()) {
dout(7) << "traverse: " << *in << " is frozen_dir, waiting" << dendl;
- in->parent->dir->add_waiter(CDir::WAIT_UNFREEZE, onfinish);
+ in->parent->dir->add_waiter(CDir::WAIT_UNFREEZE,
+ new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked));
return;
}
dir = in->get_or_open_dirfrag(this, frag);
@@ -7144,20 +7151,22 @@ void MDCache::open_remote_ino_2(inodeno_t ino,
<< " in complete dir " << *dir
<< ", requerying anchortable"
<< dendl;
- open_remote_ino(ino, onfinish, anchortrace[i].ino, anchortrace[i].updated);
+ open_remote_ino(ino, onfinish, want_xlocked,
+ anchortrace[i].ino, anchortrace[i].updated);
}
} else {
dout(10) << "need ino " << anchortrace[i].ino
<< ", fetching incomplete dir " << *dir
<< dendl;
- dir->fetch(new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish));
+ dir->fetch(new C_MDC_OpenRemoteIno(this, ino, want_xlocked, anchortrace, onfinish));
}
} else {
// hmm, discover.
dout(10) << "have remote dirfrag " << *dir << ", discovering "
<< anchortrace[i].ino << dendl;
- discover_ino(dir, anchortrace[i].ino,
- new C_MDC_OpenRemoteIno(this, ino, anchortrace, onfinish));
+ discover_ino(dir, anchortrace[i].ino,
+ new C_MDC_RetryOpenRemoteIno(this, ino, onfinish, want_xlocked),
+ (want_xlocked && i == anchortrace.size() - 1));
}
}
@@ -7476,13 +7485,17 @@ void MDCache::request_finish(MDRequest *mdr)
void MDCache::request_forward(MDRequest *mdr, int who, int port)
{
- dout(7) << "request_forward " << *mdr << " to mds." << who << " req " << *mdr << dendl;
-
- mds->forward_message_mds(mdr->client_request, who);
- mdr->client_request = 0;
+ if (mdr->client_request->get_source().is_client()) {
+ dout(7) << "request_forward " << *mdr << " to mds." << who << " req "
+ << *mdr->client_request << dendl;
+ mds->forward_message_mds(mdr->client_request, who);
+ mdr->client_request = 0;
+ if (mds->logger) mds->logger->inc(l_mds_fw);
+ } else {
+ dout(7) << "request_forward drop " << *mdr << " req " << *mdr->client_request
+ << " was from mds" << dendl;
+ }
request_cleanup(mdr);
-
- if (mds->logger) mds->logger->inc(l_mds_fw);
}
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index 64290aa97b9..31c7467bf41 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -701,11 +701,11 @@ public:
void open_remote_dirfrag(CInode *diri, frag_t fg, Context *fin);
CInode *get_dentry_inode(CDentry *dn, MDRequest *mdr, bool projected=false);
- void open_remote_ino(inodeno_t ino, Context *fin, inodeno_t hadino=0, version_t hadv=0);
+ void open_remote_ino(inodeno_t ino, Context *fin, bool want_xlocked=false,
+ inodeno_t hadino=0, version_t hadv=0);
void open_remote_ino_2(inodeno_t ino,
- vector<Anchor>& anchortrace,
- inodeno_t hadino, version_t hadv,
- Context *onfinish);
+ vector<Anchor>& anchortrace, bool want_xlocked,
+ inodeno_t hadino, version_t hadv, Context *onfinish);
void open_remote_dentry(CDentry *dn, bool projected, Context *fin);
void _open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin);
diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc
index b66b54cbc83..a804eab7731 100644
--- a/src/mds/Migrator.cc
+++ b/src/mds/Migrator.cc
@@ -1051,6 +1051,7 @@ void Migrator::finish_export_inode_caps(CInode *in)
mds->send_message_client_counted(m, it->first);
}
in->clear_client_caps_after_export();
+ mds->locker->eval(in, CEPH_CAP_LOCKS);
}
void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished)
diff --git a/src/mds/Mutation.cc b/src/mds/Mutation.cc
index 6321ffc160a..a9c35134bc8 100644
--- a/src/mds/Mutation.cc
+++ b/src/mds/Mutation.cc
@@ -82,8 +82,39 @@ void Mutation::auth_unpin(MDSCacheObject *object)
auth_pins.erase(object);
}
+bool Mutation::freeze_auth_pin(CInode *inode)
+{
+ assert(!auth_pin_freeze || auth_pin_freeze == inode);
+ auth_pin_freeze = inode;
+ auth_pin(inode);
+ if (!inode->freeze_inode(1))
+ return false;
+
+ inode->freeze_auth_pin();
+ inode->unfreeze_inode();
+ return true;
+}
+
+void Mutation::unfreeze_auth_pin(CInode *inode)
+{
+ assert(auth_pin_freeze == inode);
+ assert(is_auth_pinned(inode));
+ if (inode->is_frozen_auth_pin())
+ inode->unfreeze_auth_pin();
+ else
+ inode->unfreeze_inode();
+ auth_pin_freeze = NULL;
+}
+
+bool Mutation::can_auth_pin(MDSCacheObject *object)
+{
+ return object->can_auth_pin() || (is_auth_pinned(object) && object == auth_pin_freeze);
+}
+
void Mutation::drop_local_auth_pins()
{
+ if (auth_pin_freeze)
+ unfreeze_auth_pin(auth_pin_freeze);
for (set<MDSCacheObject*>::iterator it = auth_pins.begin();
it != auth_pins.end();
it++) {
diff --git a/src/mds/Mutation.h b/src/mds/Mutation.h
index cba6223864e..37cc764254d 100644
--- a/src/mds/Mutation.h
+++ b/src/mds/Mutation.h
@@ -50,6 +50,7 @@ struct Mutation {
// auth pins
set< MDSCacheObject* > remote_auth_pins;
set< MDSCacheObject* > auth_pins;
+ CInode *auth_pin_freeze;
// held locks
set< SimpleLock* > rdlocks; // always local.
@@ -81,12 +82,14 @@ struct Mutation {
: attempt(0),
ls(0),
slave_to_mds(-1),
+ auth_pin_freeze(NULL),
locking(NULL),
done_locking(false), committing(false), aborted(false), killed(false) { }
Mutation(metareqid_t ri, __u32 att=0, int slave_to=-1)
: reqid(ri), attempt(att),
ls(0),
slave_to_mds(slave_to),
+ auth_pin_freeze(NULL),
locking(NULL),
done_locking(false), committing(false), aborted(false), killed(false) { }
virtual ~Mutation() {
@@ -120,6 +123,9 @@ struct Mutation {
bool is_auth_pinned(MDSCacheObject *object);
void auth_pin(MDSCacheObject *object);
void auth_unpin(MDSCacheObject *object);
+ bool freeze_auth_pin(CInode *inode);
+ void unfreeze_auth_pin(CInode *inode);
+ bool can_auth_pin(MDSCacheObject *object);
void drop_local_auth_pins();
void add_projected_inode(CInode *in);
void pop_and_dirty_projected_inodes();
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index d5548a8493c..ba436566dec 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -1487,6 +1487,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
// build list of objects
list<MDSCacheObject*> objects;
+ CInode *auth_pin_freeze = NULL;
bool fail = false;
for (vector<MDSCacheObjectInfo>::iterator p = mdr->slave_request->get_authpins().begin();
@@ -1500,6 +1501,8 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
}
objects.push_back(object);
+ if (*p == mdr->slave_request->get_authpin_freeze())
+ auth_pin_freeze = dynamic_cast<CInode*>(object);
}
// can we auth pin them?
@@ -1512,8 +1515,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
fail = true;
break;
}
- if (!mdr->is_auth_pinned(*p) &&
- !(*p)->can_auth_pin()) {
+ if (!mdr->can_auth_pin(*p)) {
// wait
dout(10) << " waiting for authpinnable on " << **p << dendl;
(*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
@@ -1527,6 +1529,22 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
if (fail) {
mdr->drop_local_auth_pins(); // just in case
} else {
+ /* handle_slave_rename_prep() call freeze_inode() to wait for all other operations
+ * on the source inode to complete. This happens after all locks for the rename
+ * operation are acquired. But to acquire locks, we need auth pin locks' parent
+ * objects first. So there is an ABBA deadlock if someone auth pins the source inode
+ * after locks are acquired and before Server::handle_slave_rename_prep() is called.
+ * The solution is freeze the inode and prevent other MDRequests from getting new
+ * auth pins.
+ */
+ if (auth_pin_freeze) {
+ dout(10) << " freezing auth pin on " << *auth_pin_freeze << dendl;
+ if (!mdr->freeze_auth_pin(auth_pin_freeze)) {
+ auth_pin_freeze->add_waiter(CInode::WAIT_FROZEN, new C_MDS_RetryRequest(mdcache, mdr));
+ mds->mdlog->flush();
+ return;
+ }
+ }
for (list<MDSCacheObject*>::iterator p = objects.begin();
p != objects.end();
++p) {
@@ -1923,7 +1941,8 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, int n,
// do NOT proceed if freezing, as cap release may defer in that case, and
// we could deadlock when we try to lock @ref.
// if we're already auth_pinned, continue; the release has already been processed.
- if (ref->is_frozen() || (ref->is_freezing() && !mdr->is_auth_pinned(ref))) {
+ if (ref->is_frozen() || ref->is_frozen_auth_pin() ||
+ (ref->is_freezing() && !mdr->is_auth_pinned(ref))) {
dout(7) << "waiting for !frozen/authpinnable on " << *ref << dendl;
ref->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
/* If we have any auth pins, this will deadlock.
@@ -5202,25 +5221,27 @@ void Server::handle_client_rename(MDRequest *mdr)
wrlocks.insert(&straydn->get_dir()->inode->nestlock);
}
- // xlock versionlock on srci if remote?
- // this ensures it gets safely remotely auth_pinned, avoiding deadlock;
- // strictly speaking, having the slave node freeze the inode is
- // otherwise sufficient for avoiding conflicts with inode locks, etc.
- if (!srcdn->is_auth() && srcdnl->is_primary()) // xlock versionlock on srci if there are any witnesses
- xlocks.insert(&srci->versionlock);
-
// xlock versionlock on dentries if there are witnesses.
// replicas can't see projected dentry linkages, and will get
// confused if we try to pipeline things.
if (!witnesses.empty()) {
- if (srcdn->is_projected())
- xlocks.insert(&srcdn->versionlock);
- if (destdn->is_projected())
- xlocks.insert(&destdn->versionlock);
- // also take rdlock on all ancestor dentries for destdn. this ensures that the
- // destdn can be traversed to by the witnesses.
- for (int i=0; i<(int)desttrace.size(); i++)
- xlocks.insert(&desttrace[i]->versionlock);
+ // take xlock on all projected ancestor dentries for srcdn and destdn.
+ // this ensures the srcdn and destdn can be traversed to by the witnesses.
+ for (int i= 0; i<(int)srctrace.size(); i++) {
+ if (srctrace[i]->is_auth() && srctrace[i]->is_projected())
+ xlocks.insert(&srctrace[i]->versionlock);
+ }
+ for (int i=0; i<(int)desttrace.size(); i++) {
+ if (desttrace[i]->is_auth() && desttrace[i]->is_projected())
+ xlocks.insert(&desttrace[i]->versionlock);
+ }
+ // xlock srci and oldin's primary dentries, so witnesses can call
+ // open_remote_ino() with 'want_locked=true' when the srcdn or destdn
+ // is traversed.
+ if (srcdnl->is_remote())
+ xlocks.insert(&srci->get_projected_parent_dn()->lock);
+ if (destdnl->is_remote())
+ xlocks.insert(&oldin->get_projected_parent_dn()->lock);
}
// we need to update srci's ctime. xlock its least contended lock to do that...
@@ -5244,7 +5265,9 @@ void Server::handle_client_rename(MDRequest *mdr)
// take any locks needed for anchor creation/verification
mds->mdcache->anchor_create_prep_locks(mdr, srci, rdlocks, xlocks);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, &remote_wrlocks))
+ CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : NULL;
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks,
+ &remote_wrlocks, auth_pin_freeze))
return;
if (oldin &&
@@ -5681,9 +5704,10 @@ void Server::_rename_prepare(MDRequest *mdr,
} else if (destdnl->is_remote()) {
if (oldin->is_auth()) {
// auth for targeti
- metablob->add_dir_context(oldin->get_parent_dir());
- mdcache->journal_cow_dentry(mdr, metablob, oldin->parent, CEPH_NOSNAP, 0, destdnl);
- metablob->add_primary_dentry(oldin->parent, true, oldin);
+ metablob->add_dir_context(oldin->get_projected_parent_dir());
+ mdcache->journal_cow_dentry(mdr, metablob, oldin->get_projected_parent_dn(),
+ CEPH_NOSNAP, 0, destdnl);
+ metablob->add_primary_dentry(oldin->get_projected_parent_dn(), true, oldin);
}
if (destdn->is_auth()) {
// auth for dn, not targeti
@@ -5702,10 +5726,10 @@ void Server::_rename_prepare(MDRequest *mdr,
if (destdn->is_auth())
metablob->add_remote_dentry(destdn, true, srcdnl->get_remote_ino(), srcdnl->get_remote_d_type());
- if (srci->get_parent_dn()->is_auth()) { // it's remote
- metablob->add_dir_context(srci->get_parent_dir());
- mdcache->journal_cow_dentry(mdr, metablob, srci->get_parent_dn(), CEPH_NOSNAP, 0, srcdnl);
- metablob->add_primary_dentry(srci->get_parent_dn(), true, srci);
+ if (srci->get_projected_parent_dn()->is_auth()) { // it's remote
+ metablob->add_dir_context(srci->get_projected_parent_dir());
+ mdcache->journal_cow_dentry(mdr, metablob, srci->get_projected_parent_dn(), CEPH_NOSNAP, 0, srcdnl);
+ metablob->add_primary_dentry(srci->get_projected_parent_dn(), true, srci);
}
} else {
if (destdn->is_auth() && !destdnl->is_null())
@@ -5994,9 +6018,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
// am i srcdn auth?
if (srcdn->is_auth()) {
- if (srcdnl->is_primary() &&
- !srcdnl->get_inode()->is_freezing_inode() &&
- !srcdnl->get_inode()->is_frozen_inode()) {
+ if (srcdnl->is_primary()) {
// set ambiguous auth for srci
/*
* NOTE: we don't worry about ambiguous cache expire as we do
@@ -6013,7 +6035,13 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
int allowance = 2; // 1 for the mdr auth_pin, 1 for the link lock
allowance += srcdnl->get_inode()->is_dir(); // for the snap lock
dout(10) << " freezing srci " << *srcdnl->get_inode() << " with allowance " << allowance << dendl;
- if (!srcdnl->get_inode()->freeze_inode(allowance)) {
+ bool frozen_inode = srcdnl->get_inode()->freeze_inode(allowance);
+
+ // unfreeze auth pin after freezing the inode to avoid queueing waiters
+ if (srcdnl->get_inode()->is_frozen_auth_pin())
+ mdr->unfreeze_auth_pin(srcdnl->get_inode());
+
+ if (!frozen_inode) {
srcdnl->get_inode()->add_waiter(CInode::WAIT_FROZEN, new C_MDS_RetryRequest(mdcache, mdr));
return;
}
@@ -6181,8 +6209,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
destdnl->get_inode()->take_waiting(CInode::WAIT_SINGLEAUTH, finished);
// unfreeze
- assert(destdnl->get_inode()->is_frozen_inode() ||
- destdnl->get_inode()->is_freezing_inode());
+ assert(destdnl->get_inode()->is_frozen_inode());
destdnl->get_inode()->unfreeze_inode(finished);
mds->queue_waiters(finished);
@@ -6205,8 +6232,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
destdnl->get_inode()->take_waiting(CInode::WAIT_SINGLEAUTH, finished);
// unfreeze
- assert(destdnl->get_inode()->is_frozen_inode() ||
- destdnl->get_inode()->is_freezing_inode());
+ assert(destdnl->get_inode()->is_frozen_inode());
destdnl->get_inode()->unfreeze_inode(finished);
mds->queue_waiters(finished);
diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
index db4dbf1ac61..22e754eb2a1 100644
--- a/src/mds/mdstypes.h
+++ b/src/mds/mdstypes.h
@@ -1250,6 +1250,13 @@ public:
}
};
+inline bool operator==(const MDSCacheObjectInfo& l, const MDSCacheObjectInfo& r) {
+ if (l.ino || r.ino)
+ return l.ino == r.ino && l.snapid == r.snapid;
+ else
+ return l.dirfrag == r.dirfrag && l.dname == r.dname;
+}
+
WRITE_CLASS_ENCODER(MDSCacheObjectInfo)
diff --git a/src/messages/MMDSSlaveRequest.h b/src/messages/MMDSSlaveRequest.h
index 4f2bb5948bd..03ec582c49e 100644
--- a/src/messages/MMDSSlaveRequest.h
+++ b/src/messages/MMDSSlaveRequest.h
@@ -112,6 +112,7 @@ public:
int get_lock_type() { return lock_type; }
MDSCacheObjectInfo &get_object_info() { return object_info; }
+ MDSCacheObjectInfo &get_authpin_freeze() { return object_info; }
vector<MDSCacheObjectInfo>& get_authpins() { return authpins; }
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index e273ad49f2b..1ebf2854473 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -54,6 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
+ delay_thread(NULL),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
sd(-1), port(0),
@@ -94,6 +95,7 @@ Pipe::~Pipe()
if (connection_state)
connection_state->put();
delete session_security;
+ delete delay_thread;
}
void Pipe::handle_ack(uint64_t seq)
@@ -127,6 +129,16 @@ void Pipe::start_reader()
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
+void Pipe::maybe_start_delay_thread()
+{
+ if (!delay_thread &&
+ msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+ delay_thread = new DelayedDelivery(this);
+ delay_thread->create();
+ }
+}
+
void Pipe::start_writer()
{
assert(pipe_lock.is_locked());
@@ -146,14 +158,54 @@ void Pipe::join_reader()
reader_needs_join = false;
}
+void Pipe::DelayedDelivery::discard()
+{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ pipe->msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+}
-void Pipe::queue_received(Message *m, int priority)
+void Pipe::DelayedDelivery::flush()
{
- assert(pipe_lock.is_locked());
- in_q->enqueue(m, priority, conn_id);
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
}
+void *Pipe::DelayedDelivery::entry()
+{
+ Mutex::Locker locker(delay_lock);
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl;
+ while (!stop_delayed_delivery) {
+ if (delay_queue.empty()) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond.Wait(delay_lock);
+ continue;
+ }
+ utime_t release = delay_queue.front().first;
+ if (release > ceph_clock_now(pipe->msgr->cct)) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
+ delay_cond.WaitUntil(delay_lock, release);
+ continue;
+ }
+ Message *m = delay_queue.front().second;
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
+ return NULL;
+}
int Pipe::accept()
{
@@ -503,7 +555,11 @@ int Pipe::accept()
// make existing Connection reference us
existing->connection_state->reset_pipe(this);
-
+
+ // flush/queue any existing delayed messages
+ if (existing->delay_thread)
+ existing->delay_thread->flush();
+
// steal incoming queue
uint64_t replaced_conn_id = conn_id;
conn_id = existing->conn_id;
@@ -587,6 +643,9 @@ int Pipe::accept()
}
ldout(msgr->cct,20) << "accept done" << dendl;
pipe_lock.Unlock();
+
+ maybe_start_delay_thread();
+
return 0; // success.
fail_unlocked:
@@ -936,6 +995,7 @@ int Pipe::connect()
ldout(msgr->cct,20) << "connect starting reader" << dendl;
start_reader();
}
+ maybe_start_delay_thread();
delete authorizer;
return 0;
}
@@ -1020,7 +1080,6 @@ void Pipe::discard_out_queue()
out_q.clear();
}
-
void Pipe::fault(bool onread)
{
const md_config_t *conf = msgr->cct->_conf;
@@ -1057,6 +1116,8 @@ void Pipe::fault(bool onread)
msgr->lock.Unlock();
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
// disconnect from Connection, and mark it failed. future messages
@@ -1068,6 +1129,10 @@ void Pipe::fault(bool onread)
return;
}
+ // queue delayed items immediately
+ if (delay_thread)
+ delay_thread->flush();
+
// requeue sent items
requeue_sent();
@@ -1075,7 +1140,7 @@ void Pipe::fault(bool onread)
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
- }
+ }
if (state != STATE_CONNECTING) {
if (policy.server) {
@@ -1122,6 +1187,8 @@ void Pipe::was_session_reset()
ldout(msgr->cct,10) << "was_session_reset" << dendl;
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
msgr->dispatch_queue.queue_remote_reset(connection_state);
@@ -1243,7 +1310,18 @@ void Pipe::reader()
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
- queue_received(m);
+
+ if (delay_thread) {
+ utime_t release;
+ if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = m->get_recv_stamp();
+ release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+ }
+ delay_thread->queue(release, m);
+ } else {
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ }
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index cc92b2f6f8b..1bcc8263f4a 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -60,6 +60,46 @@ class DispatchQueue;
} writer_thread;
friend class Writer;
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+ class DelayedDelivery: public Thread {
+ Pipe *pipe;
+ std::deque< pair<utime_t,Message*> > delay_queue;
+ Mutex delay_lock;
+ Cond delay_cond;
+ bool stop_delayed_delivery;
+
+ public:
+ DelayedDelivery(Pipe *p)
+ : pipe(p),
+ delay_lock("Pipe::DelayedDelivery::delay_lock"),
+ stop_delayed_delivery(false) { }
+ ~DelayedDelivery() {
+ discard();
+ }
+ void *entry();
+ void queue(utime_t release, Message *m) {
+ Mutex::Locker l(delay_lock);
+ delay_queue.push_back(make_pair(release, m));
+ delay_cond.Signal();
+ }
+ void discard();
+ void flush();
+ void stop() {
+ delay_lock.Lock();
+ stop_delayed_delivery = true;
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ }
+ } *delay_thread;
+ friend class DelayedDelivery;
+
public:
Pipe(SimpleMessenger *r, int st, Connection *con);
~Pipe();
@@ -166,25 +206,13 @@ class DispatchQueue;
void start_reader();
void start_writer();
+ void maybe_start_delay_thread();
void join_reader();
// public constructors
static const Pipe& Server(int s);
static const Pipe& Client(const entity_addr_t& pi);
- //we have two queue_received's to allow local signal delivery
- // via Message * (that doesn't actually point to a Message)
- void queue_received(Message *m, int priority);
-
- void queue_received(Message *m) {
- // this is just to make sure that a changeset is working
- // properly; if you start using the refcounting more and have
- // multiple people hanging on to a message, ditch the assert!
- assert(m->nref.read() == 1);
-
- queue_received(m, m->get_priority());
- }
-
__u32 get_out_seq() { return out_seq; }
bool is_queued() { return !out_q.empty() || keepalive; }
@@ -208,6 +236,10 @@ class DispatchQueue;
writer_thread.join();
if (reader_thread.is_started())
reader_thread.join();
+ if (delay_thread) {
+ delay_thread->stop();
+ delay_thread->join();
+ }
}
void stop();
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 3feb92924a9..2c66a5ea7db 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
void FileStore::queue_op(OpSequencer *osr, Op *o)
{
- // mark apply start _now_, because we need to drain the entire apply
- // queue during commit in order to put the store in a consistent
- // state.
- apply_manager.op_apply_start(o->op);
+ // queue op on sequencer, then queue sequencer for the threadpool,
+ // so that regardless of which order the threads pick up the
+ // sequencer, the op order will be preserved.
osr->queue(o);
@@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr)
{
osr->apply_lock.Lock();
Op *o = osr->peek_queue();
-
+ apply_manager.op_apply_start(o->op);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
-
- /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
- << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
- */
}
void FileStore::_finish_op(OpSequencer *osr)
diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc
index 42b95c96a58..b1aee62eca8 100644
--- a/src/os/JournalingObjectStore.cc
+++ b/src/os/JournalingObjectStore.cc
@@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
Mutex::Locker l(apply_lock);
// if we ops are blocked, or there are already people (left) in
// line, get in line.
- if (blocked || !ops_apply_blocked.empty()) {
+ if (op > max_applying_seq &&
+ (blocked || !ops_apply_blocked.empty())) {
Cond cond;
ops_apply_blocked.push_back(&cond);
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
@@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
ops_apply_blocked.front()->Signal();
}
}
- dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
- assert(!blocked);
+ dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
+ << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
+ if (op > max_applying_seq)
+ max_applying_seq = op;
+ assert(op > committed_seq);
open_ops++;
return op;
}
@@ -136,15 +140,18 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
Mutex::Locker l(apply_lock);
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
- << " -> " << (open_ops-1) << dendl;
+ << " -> " << (open_ops-1)
+ << ", max_applying_seq " << max_applying_seq
+ << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
+ << dendl;
if (--open_ops == 0)
open_ops_cond.Signal();
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
- if (op > applied_seq)
- applied_seq = op;
+ if (op > max_applied_seq)
+ max_applied_seq = op;
}
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
@@ -185,19 +192,21 @@ bool JournalingObjectStore::ApplyManager::commit_start()
{
Mutex::Locker l(apply_lock);
- dout(10) << "commit_start "
- << ", applied_seq " << applied_seq << dendl;
+ dout(10) << "commit_start max_applying_seq " << max_applying_seq
+ << ", max_applied_seq " << max_applied_seq
+ << dendl;
blocked = true;
while (open_ops > 0) {
- dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+ dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
+ << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
open_ops_cond.Wait(apply_lock);
}
assert(open_ops == 0);
-
+ assert(max_applied_seq == max_applying_seq);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
Mutex::Locker l(com_lock);
- if (applied_seq == committed_seq) {
+ if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
if (!ops_apply_blocked.empty())
@@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start()
goto out;
}
- committing_seq = applied_seq;
+ committing_seq = max_applying_seq;
dout(10) << "commit_start committing " << committing_seq
<< ", still blocked" << dendl;
diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h
index dff49d43cbb..ae74c32cd25 100644
--- a/src/os/JournalingObjectStore.h
+++ b/src/os/JournalingObjectStore.h
@@ -54,7 +54,8 @@ protected:
Cond blocked_cond;
int open_ops;
Cond open_ops_cond;
- uint64_t applied_seq;
+ uint64_t max_applying_seq;
+ uint64_t max_applied_seq;
Mutex com_lock;
map<version_t, vector<Context*> > commit_waiters;
@@ -68,7 +69,8 @@ protected:
apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context),
blocked(false),
open_ops(0),
- applied_seq(0),
+ max_applying_seq(0),
+ max_applied_seq(0),
com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context),
committing_seq(0), committed_seq(0) {}
void add_waiter(uint64_t, Context*);
@@ -97,7 +99,7 @@ protected:
}
{
Mutex::Locker l(apply_lock);
- applied_seq = fs_op_seq;
+ max_applying_seq = max_applied_seq = fs_op_seq;
}
}
} apply_manager;
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 58859a5741a..6018587cacc 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -5133,6 +5133,9 @@ void OSD::handle_pg_query(OpRequestRef op)
continue;
}
+ if (!osdmap->have_pg_pool(pgid.pool()))
+ continue;
+
// get active crush mapping
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pgid, up, acting);
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 617ba9e250f..8c4c29ba7e7 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -4531,10 +4531,6 @@ void PG::proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &oinfo)
dirty_info = true;
osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
- assert(oinfo.last_epoch_started == info.last_epoch_started);
- assert(info.history.last_epoch_started == oinfo.last_epoch_started);
- assert(oinfo.history.last_epoch_started == oinfo.last_epoch_started);
-
// Handle changes to purged_snaps ONLY IF we have caught up
if (last_complete_ondisk.epoch >= info.history.last_epoch_started) {
interval_set<snapid_t> p;