summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-02-20 10:23:17 -0800
committerSage Weil <sage@newdream.net>2009-02-20 17:16:20 -0800
commit5cb6c31076a56338472e5e58ca1650c19ac7b72d (patch)
tree891216f18f55ac31d22bbe1c568425e1301e77a1
parent978c598f90b0f3f4eb84c165c23c68a601376743 (diff)
downloadceph-5cb6c31076a56338472e5e58ca1650c19ac7b72d.tar.gz
kclient: initial pass at async create, writeback
-rw-r--r--src/common/Timer.cc1
-rw-r--r--src/include/ceph_fs.h3
-rw-r--r--src/kernel/caps.c7
-rw-r--r--src/kernel/dir.c16
-rw-r--r--src/kernel/export.c2
-rw-r--r--src/kernel/file.c4
-rw-r--r--src/kernel/inode.c155
-rw-r--r--src/kernel/ioctl.c2
-rw-r--r--src/kernel/mds_client.c12
-rw-r--r--src/kernel/mds_client.h9
-rw-r--r--src/kernel/super.c2
-rw-r--r--src/kernel/super.h17
12 files changed, 183 insertions, 47 deletions
diff --git a/src/common/Timer.cc b/src/common/Timer.cc
index 75b0de42101..e9024582da0 100644
--- a/src/common/Timer.cc
+++ b/src/common/Timer.cc
@@ -329,6 +329,7 @@ bool SafeTimer::cancel_event(Context *c)
canceled[c] = scheduled[c];
}
scheduled.erase(c);
+ return true;
}
void SafeTimer::cancel_all()
diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
index f2cade77fd7..83d63790f08 100644
--- a/src/include/ceph_fs.h
+++ b/src/include/ceph_fs.h
@@ -729,6 +729,7 @@ static inline const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_MKDIR: return "mkdir";
case CEPH_MDS_OP_RMDIR: return "rmdir";
case CEPH_MDS_OP_SYMLINK: return "symlink";
+ case CEPH_MDS_OP_CREATE: return "create";
case CEPH_MDS_OP_OPEN: return "open";
case CEPH_MDS_OP_TRUNCATE: return "truncate";
case CEPH_MDS_OP_LTRUNCATE: return "ltruncate";
@@ -980,7 +981,7 @@ static inline int ceph_flags_to_mode(int flags)
#define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_WRBUFFER)
#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
-#define CEPH_CAP_ANY (CEPH_CAP_ANY_WR|CEPH_CAP_ANY_RD)
+#define CEPH_CAP_ANY (CEPH_CAP_PIN|CEPH_CAP_ANY_WR|CEPH_CAP_ANY_RD)
/*
* these cap bits time out, if no others are held and nothing is
diff --git a/src/kernel/caps.c b/src/kernel/caps.c
index 1e4850dd371..06756cc1da0 100644
--- a/src/kernel/caps.c
+++ b/src/kernel/caps.c
@@ -975,6 +975,8 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
&ci->vfs_inode, ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref);
}
+ if (got & CEPH_CAP_FILE_EXCL)
+ ci->i_excl_ref++;
}
/*
@@ -1086,6 +1088,9 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
}
}
}
+ if (had & CEPH_CAP_FILE_EXCL)
+ if (--ci->i_excl_ref == 0)
+ last++;
spin_unlock(&inode->i_lock);
dout(30, "put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had),
@@ -1323,7 +1328,7 @@ start:
if (cap->issued & ~newcaps) {
dout(10, "revocation: %s -> %s\n", ceph_cap_string(cap->issued),
ceph_cap_string(newcaps));
- if ((used & ~newcaps) & CEPH_CAP_FILE_WRBUFFER) {
+ if ((used & ~newcaps) & (CEPH_CAP_FILE_WRBUFFER|CEPH_CAP_FILE_EXCL)) {
writeback = 1; /* will delay ack */
} else if (dirty & ~newcaps)
reply = 2; /* initiate writeback in check_caps */
diff --git a/src/kernel/dir.c b/src/kernel/dir.c
index 136350ed94b..54c47b317e3 100644
--- a/src/kernel/dir.c
+++ b/src/kernel/dir.c
@@ -90,7 +90,7 @@ nextfrag:
req->r_direct_hash = frag_value(frag);
req->r_direct_is_hash = true;
req->r_args.readdir.frag = cpu_to_le32(frag);
- err = ceph_mdsc_do_request(mdsc, NULL, req);
+ err = ceph_mdsc_do_request(mdsc, req, NULL);
if (err < 0) {
ceph_mdsc_put_request(req);
return err;
@@ -312,7 +312,7 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
return ERR_PTR(PTR_ERR(req));
req->r_args.stat.mask = cpu_to_le32(mask);
req->r_locked_dir = dentry->d_parent->d_inode; /* by the VFS */
- err = ceph_mdsc_do_request(mdsc, NULL, req);
+ err = ceph_mdsc_do_request(mdsc, req, NULL);
dentry = ceph_finish_lookup(req, dentry, err);
ceph_mdsc_put_request(req); /* will dput(dentry) */
dout(20, "do_lookup result=%p\n", dentry);
@@ -378,7 +378,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
req->r_args.mknod.rdev = cpu_to_le32(rdev);
if ((issued & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, dir);
if (!err && req->r_reply_info.trace_numd == 0) {
/*
* no trace. do lookup, in case we are called from create
@@ -450,7 +450,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
req->r_locked_dir = dir;
if ((issued & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, dir);
ceph_mdsc_put_request(req);
if (err)
d_drop(dentry);
@@ -500,7 +500,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
if ((issued & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, dir);
ceph_mdsc_put_request(req);
if (err < 0)
d_drop(dentry);
@@ -531,7 +531,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
if ((ceph_caps_issued(ceph_inode(dir)) & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, dir);
if (err) {
d_drop(dentry);
} else if (req->r_reply_info.trace_numd == 0) {
@@ -590,7 +590,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN);
ceph_release_caps(inode, CEPH_CAP_LINK_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, dir);
ceph_mdsc_put_request(req);
return err;
}
@@ -623,7 +623,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
if ((ceph_caps_issued(ceph_inode(new_dir)) & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(new_dir, CEPH_CAP_FILE_RDCACHE);
ceph_mdsc_lease_release(mdsc, new_dir, new_dentry, CEPH_LOCK_DN);
- err = ceph_mdsc_do_request(mdsc, old_dir, req);
+ err = ceph_mdsc_do_request(mdsc, req, old_dir);
if (!err && req->r_reply_info.trace_numd == 0) {
/*
* no trace
diff --git a/src/kernel/export.c b/src/kernel/export.c
index a6e3537fd05..f92712fffe0 100644
--- a/src/kernel/export.c
+++ b/src/kernel/export.c
@@ -88,7 +88,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
USE_ANY_MDS);
if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req));
- err = ceph_mdsc_do_request(mdsc, NULL, req);
+ err = ceph_mdsc_do_request(mdsc, req, NULL);
ceph_mdsc_put_request(req);
inode = ceph_find_inode(sb, vino);
if (!inode)
diff --git a/src/kernel/file.c b/src/kernel/file.c
index 5bb93b612a0..b75368cf91a 100644
--- a/src/kernel/file.c
+++ b/src/kernel/file.c
@@ -127,7 +127,7 @@ int ceph_open(struct inode *inode, struct file *file)
err = PTR_ERR(req);
goto out;
}
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
if (!err)
err = ceph_init_file(inode, file, req->r_fmode);
ceph_mdsc_put_request(req);
@@ -173,7 +173,7 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
(ceph_caps_issued(ceph_inode(dir)) & CEPH_CAP_FILE_EXCL) == 0)
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
req->r_locked_dir = dir; /* caller holds dir->i_mutex */
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
dentry = ceph_finish_lookup(req, dentry, err);
if (!err)
err = ceph_init_file(req->r_last_inode, file, req->r_fmode);
diff --git a/src/kernel/inode.c b/src/kernel/inode.c
index 2830972a50a..9a9667cd8db 100644
--- a/src/kernel/inode.c
+++ b/src/kernel/inode.c
@@ -285,6 +285,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_wr_ref = 0;
ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0;
+ ci->i_excl_ref = 0;
ci->i_rdcache_gen = 0;
ci->i_rdcache_revoking = 0;
@@ -300,6 +301,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_listener_list);
spin_lock_init(&ci->i_listener_lock);
+ INIT_LIST_HEAD(&ci->i_new_child);
+ INIT_LIST_HEAD(&ci->i_new_children);
return &ci->vfs_inode;
}
@@ -434,45 +437,57 @@ static void init_inode_ops(struct inode *inode)
&ceph_client(inode->i_sb)->backing_dev_info;
}
+/*
+ * caller must hold dir->i_mutex
+ */
int ceph_async_create(struct inode *dir, struct dentry *dentry,
int issued, int mode, const char *symdest)
{
struct ceph_vino vino;
struct inode *inode;
- struct ceph_inode_info *ci;
+ struct ceph_inode_info *dirci = ceph_inode(dir), *ci;
struct ceph_client *client = ceph_client(dir->i_sb);
struct ceph_mds_client *mdsc;
-
+ int got;
+ int err;
+
if ((client->mount_args.flags & CEPH_MOUNT_ASYNCMETA) == 0)
return -EPERM;
mdsc = &client->mdsc;
dout(10, "async_create %p dn %p issued %s mode 0%o\n",
dir, dentry, ceph_cap_string(issued), mode);
- if ((issued & CEPH_CAP_FILE_EXCL) == 0 ||
- (issued & CEPH_CAP_AUTH_RDCACHE) == 0)
- return -EPERM;
- if ((ceph_inode(dir)->i_ceph_flags & CEPH_I_COMPLETE) == 0)
- return -EPERM;
+ err = -EPERM;
+ if (ceph_get_cap_refs(dirci, CEPH_CAP_FILE_EXCL,
+ CEPH_CAP_FILE_EXCL, &got, 0) == 0)
+ goto out;
+
+ if ((dirci->i_ceph_flags & CEPH_I_COMPLETE) == 0)
+ goto out_put;
vino.ino = ceph_mdsc_prealloc_dequeue(mdsc);
if (!vino.ino)
- return -EAGAIN;
+ goto out_put;
vino.snap = CEPH_NOSNAP;
+ err = -EIO;
inode = ceph_get_inode(dir->i_sb, vino);
if (!inode)
- return -EIO;
+ goto out_put;
ci = ceph_inode(inode);
if (symdest) {
inode->i_size = strlen(symdest);
ci->i_symlink = kmalloc(inode->i_size + 1, GFP_NOFS);
+ err = -ENOMEM;
if (!ci->i_symlink)
- return -ENOMEM;
+ goto out_iput;
strcpy(ci->i_symlink, symdest);
}
+ /* add to dir child list */
+ list_add(&ci->i_new_child, &dirci->i_new_children);
+
inode->i_uid = current->fsuid;
if (dir->i_mode & S_ISGID) {
inode->i_gid = dir->i_gid;
@@ -488,7 +503,7 @@ int ceph_async_create(struct inode *dir, struct dentry *dentry,
ci->i_version = 1;
ci->i_ceph_flags = CEPH_I_NEW | CEPH_I_COMPLETE;
- if (S_ISDIR(mode))
+ if (S_ISDIR(mode))
ci->i_rsubdirs = 1;
else
ci->i_rfiles = 1;
@@ -500,6 +515,107 @@ int ceph_async_create(struct inode *dir, struct dentry *dentry,
d_add(dentry, inode);
return 0;
+
+out_iput:
+ iput(inode);
+out_put:
+ ceph_put_cap_refs(ceph_inode(dir), got);
+out:
+ return err;
+}
+
+
+static void ceph_create_completion(struct ceph_mds_client *mds,
+ struct ceph_mds_request *req)
+{
+ struct inode *inode = req->r_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+
+ dout(10, "create_completion %p on %p\n", req, inode);
+ mutex_lock(&mdsc->create_mutex);
+ if (ci->i_ceph_flags & CEPH_I_NEW) {
+ ci->i_ceph_flags &= ~(CEPH_I_NEW|CEPH_I_CREATING);
+ list_del_init(&ci->i_new_child);
+ ceph_put_cap_refs(ceph_inode(d_find_alias(inode)->d_parent->d_inode),
+ CEPH_CAP_FILE_EXCL);
+ }
+ mutex_unlock(&mdsc->create_mutex);
+ ceph_mdsc_put_request(req);
+}
+
+static int __flush_create(struct dentry *dentry)
+{
+ struct inode *inode = dentry->d_inode;
+ struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_mds_request *req;
+ struct inode *dir = dentry->d_parent->d_inode;
+ struct ceph_inode_info *dirci = ceph_inode(dir);
+
+ /* make sure parent is created first */
+ if ((dirci->i_ceph_flags & CEPH_I_NEW) &&
+ (dirci->i_ceph_flags & CEPH_I_CREATING) == 0)
+ __flush_create(dentry->d_parent);
+
+ dout(10, "__flush_create dn %p inode %p\n", dentry, inode);
+ BUG_ON((ci->i_ceph_flags & CEPH_I_NEW) == 0);
+
+ req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_CREATE,
+ dentry, NULL, NULL, NULL,
+ USE_AUTH_MDS);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+ req->r_callback = ceph_create_completion;
+ req->r_last_inode = igrab(inode);
+
+ spin_lock(&inode->i_lock);
+ req->r_args.create.uid = cpu_to_le32(inode->i_uid);
+ req->r_args.create.gid = cpu_to_le32(inode->i_gid);
+ req->r_args.create.mode = cpu_to_le32(inode->i_mode);
+ req->r_args.create.rdev = cpu_to_le32(inode->i_rdev);
+ req->r_args.create.size = cpu_to_le64(inode->i_size);
+ ceph_encode_timespec(&req->r_args.create.ctime, &inode->i_ctime);
+ ceph_encode_timespec(&req->r_args.create.mtime, &inode->i_mtime);
+ ceph_encode_timespec(&req->r_args.create.atime, &inode->i_atime);
+ req->r_args.create.caps = cpu_to_le32(__ceph_caps_issued(ci, NULL));
+ req->r_args.create.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ spin_unlock(&inode->i_lock);
+
+ ceph_mdsc_submit_request(mdsc, req, dir);
+
+ ci->i_ceph_flags |= CEPH_I_CREATING;
+ return 0;
+}
+
+int ceph_flush_create(struct dentry *dentry)
+{
+ struct ceph_mds_client *mdsc = &ceph_client(dentry->d_sb)->mdsc;
+ int err = 0;
+
+ dout(10, "flush_create %p %p\n", dentry, dentry->d_inode);
+ mutex_lock(&mdsc->create_mutex);
+ if (ceph_inode(dentry->d_inode)->i_ceph_flags & CEPH_I_NEW)
+ err = __flush_create(dentry);
+ mutex_unlock(&mdsc->create_mutex);
+ return err;
+}
+
+static int flush_child_creates(struct inode *dir)
+{
+ struct ceph_mds_client *mdsc = &ceph_client(dir->i_sb)->mdsc;
+ struct list_head *p, *n;
+ struct ceph_inode_info *dirci = ceph_inode(dir);
+
+ dout(10, "flush_child_creates %p\n", dir);
+ mutex_lock(&mdsc->create_mutex);
+ list_for_each_safe(p, n, &dirci->i_new_children) {
+ struct ceph_inode_info *ci =
+ list_entry(p, struct ceph_inode_info, i_new_child);
+ __flush_create(d_find_alias(&ci->vfs_inode));
+ }
+ mutex_unlock(&mdsc->create_mutex);
+ return 0;
}
/*
@@ -1391,7 +1507,10 @@ void ceph_inode_writeback(struct work_struct *work)
struct inode *inode = &ci->vfs_inode;
dout(10, "writeback %p\n", inode);
- filemap_fdatawrite(&inode->i_data);
+ if (S_ISDIR(inode->i_mode))
+ flush_child_creates(inode);
+ else
+ filemap_fdatawrite(&inode->i_data);
iput(inode);
}
@@ -1597,7 +1716,7 @@ static int ceph_setattr_chown(struct dentry *dentry, struct iattr *attr)
}
req->r_args.chown.mask = cpu_to_le32(mask);
ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
dout(10, "chown result %d\n", err);
return err;
@@ -1629,7 +1748,7 @@ static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr)
return PTR_ERR(req);
req->r_args.chmod.mode = cpu_to_le32(attr->ia_mode);
ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
dout(10, "chmod result %d\n", err);
return err;
@@ -1697,7 +1816,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
req->r_args.utime.mask |= cpu_to_le32(CEPH_UTIME_MTIME);
ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
dout(10, "utime result %d\n", err);
return err;
@@ -1740,7 +1859,7 @@ static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr)
req->r_args.truncate.length = cpu_to_le64(attr->ia_size);
req->r_args.truncate.old_length = cpu_to_le64(inode->i_size);
//ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
dout(10, "truncate result %d\n", err);
__ceph_do_pending_vmtruncate(inode);
@@ -2155,7 +2274,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
req->r_request->hdr.data_off = cpu_to_le16(0);
ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
out:
@@ -2192,7 +2311,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
return PTR_ERR(req);
ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
return err;
}
diff --git a/src/kernel/ioctl.c b/src/kernel/ioctl.c
index ee0ffd3b076..401635cf302 100644
--- a/src/kernel/ioctl.c
+++ b/src/kernel/ioctl.c
@@ -43,7 +43,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
USE_AUTH_MDS);
req->r_args.setlayout.layout = layout;
ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+ err = ceph_mdsc_do_request(mdsc, req, parent_inode);
ceph_mdsc_put_request(req);
return err;
}
diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c
index 6bf4d9be722..5328e09040b 100644
--- a/src/kernel/mds_client.c
+++ b/src/kernel/mds_client.c
@@ -736,7 +736,7 @@ static int request_prealloc(struct ceph_mds_client *mdsc)
return PTR_ERR(req);
req->r_args.prealloc.num = cpu_to_le32(want);
req->r_callback = prealloc_completion;
- ceph_mdsc_submit_request(mdsc, req);
+ ceph_mdsc_submit_request(mdsc, req, NULL);
return 0;
}
@@ -1344,11 +1344,12 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
}
void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *req)
+ struct ceph_mds_request *req,
+ struct inode *listener)
{
dout(30, "submit_request on %p\n", req);
mutex_lock(&mdsc->mutex);
- __register_request(mdsc, req, NULL);
+ __register_request(mdsc, req, listener);
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
}
@@ -1358,8 +1359,8 @@ void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
* session setup, forwarding, retry details.
*/
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *listener,
- struct ceph_mds_request *req)
+ struct ceph_mds_request *req,
+ struct inode *listener)
{
int err;
@@ -2261,6 +2262,7 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
spin_lock_init(&mdsc->cap_delay_lock);
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
+ mutex_init(&mdsc->create_mutex);
}
/*
diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h
index 2bc424bb111..934bec6940e 100644
--- a/src/kernel/mds_client.h
+++ b/src/kernel/mds_client.h
@@ -244,6 +244,8 @@ struct ceph_mds_client {
spinlock_t cap_delay_lock; /* protects cap_delay_list */
struct list_head snap_flush_list; /* cap_snaps ready to flush */
spinlock_t snap_flush_lock;
+
+ struct mutex create_mutex; /* for doing async CREATE requests */
};
extern const char *ceph_mds_op_name(int op);
@@ -297,10 +299,11 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
const char *path1, const char *path2,
int mode);
extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *req);
+ struct ceph_mds_request *req,
+ struct inode *listener);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *listener,
- struct ceph_mds_request *req);
+ struct ceph_mds_request *req,
+ struct inode *listener);
extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
diff --git a/src/kernel/super.c b/src/kernel/super.c
index caa7c47f331..ec9688a988b 100644
--- a/src/kernel/super.c
+++ b/src/kernel/super.c
@@ -743,7 +743,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
req->r_started = started;
req->r_timeout = client->mount_args.mount_timeout * HZ;
req->r_args.stat.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
- err = ceph_mdsc_do_request(mdsc, NULL, req);
+ err = ceph_mdsc_do_request(mdsc, req, NULL);
if (err == 0) {
root = req->r_dentry;
dget(root);
diff --git a/src/kernel/super.h b/src/kernel/super.h
index a7efff3a527..4a8a5cabf95 100644
--- a/src/kernel/super.h
+++ b/src/kernel/super.h
@@ -217,6 +217,7 @@ struct ceph_inode_frag {
#define CEPH_I_COMPLETE 1 /* we have complete directory cached */
#define CEPH_I_READDIR 2 /* no dentries trimmed since readdir start */
#define CEPH_I_NEW 4 /* not yet created on mds */
+#define CEPH_I_CREATING 8 /* create request submitted to mds */
struct ceph_inode_info {
struct ceph_vino i_vino; /* ceph ino + snap */
@@ -278,7 +279,7 @@ struct ceph_inode_info {
/* held references to caps */
int i_rd_ref, i_rdcache_ref, i_wr_ref;
- int i_wrbuffer_ref, i_wrbuffer_ref_head;
+ int i_wrbuffer_ref, i_wrbuffer_ref_head, i_excl_ref;
u32 i_rdcache_gen; /* we increment this each time we get RDCACHE.
If it's non-zero, we _may_ have cached
pages. */
@@ -296,6 +297,7 @@ struct ceph_inode_info {
struct list_head i_listener_list; /* requests we pend on */
spinlock_t i_listener_lock;
+ struct list_head i_new_child, i_new_children; /* protected by i_mutex */
struct inode vfs_inode; /* at end */
};
@@ -453,14 +455,16 @@ static inline int __ceph_caps_used(struct ceph_inode_info *ci)
{
int used = 0;
if (ci->i_rd_ref)
- used |= CEPH_CAP_GRD;
+ used |= CEPH_CAP_FILE_RD;
if (ci->i_rdcache_ref || ci->i_rdcache_gen)
- used |= CEPH_CAP_GRDCACHE;
+ used |= CEPH_CAP_FILE_RDCACHE;
if (ci->i_wr_ref)
- used |= CEPH_CAP_GWR;
+ used |= CEPH_CAP_FILE_WR;
if (ci->i_wrbuffer_ref)
- used |= CEPH_CAP_GWRBUFFER;
- return CEPH_CAP_FILE(used);
+ used |= CEPH_CAP_FILE_WRBUFFER;
+ if (ci->i_excl_ref)
+ used |= CEPH_CAP_FILE_EXCL;
+ return used;
}
/*
@@ -682,6 +686,7 @@ extern struct inode *ceph_alloc_inode(struct super_block *sb);
extern void ceph_destroy_inode(struct inode *inode);
extern int ceph_async_create(struct inode *dir, struct dentry *dentry,
int issued, int mode, const char *symdest);
+extern int ceph_flush_create(struct dentry *dentry);
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);