]> nv-tegra.nvidia Code Review - linux-2.6.git/blobdiff - fs/ceph/dir.c
fs: push i_mutex and filemap_write_and_wait down into ->fsync() handlers
[linux-2.6.git] / fs / ceph / dir.c
index f94ed3c7f6a5a862a3276b4b222231ab4c40e97f..1065ac779840ac4bb4c7f740c47c1ec33ef79787 100644 (file)
@@ -1,4 +1,4 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/spinlock.h>
 #include <linux/fs_struct.h>
@@ -7,6 +7,7 @@
 #include <linux/sched.h>
 
 #include "super.h"
+#include "mds_client.h"
 
 /*
  * Directory operations: readdir, lookup, create, link, unlink,
@@ -27,7 +28,7 @@
 
 const struct inode_operations ceph_dir_iops;
 const struct file_operations ceph_dir_fops;
-struct dentry_operations ceph_dentry_ops;
+const struct dentry_operations ceph_dentry_ops;
 
 /*
  * Initialize ceph dentry state.
@@ -39,14 +40,15 @@ int ceph_init_dentry(struct dentry *dentry)
        if (dentry->d_fsdata)
                return 0;
 
-       if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
-               dentry->d_op = &ceph_dentry_ops;
+       if (dentry->d_parent == NULL ||   /* nfs fh_to_dentry */
+           ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
+               d_set_d_op(dentry, &ceph_dentry_ops);
        else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
-               dentry->d_op = &ceph_snapdir_dentry_ops;
+               d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
        else
-               dentry->d_op = &ceph_snap_dentry_ops;
+               d_set_d_op(dentry, &ceph_snap_dentry_ops);
 
-       di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS);
+       di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
        if (!di)
                return -ENOMEM;          /* oh well */
 
@@ -95,7 +97,6 @@ static unsigned fpos_off(loff_t p)
 static int __dcache_readdir(struct file *filp,
                            void *dirent, filldir_t filldir)
 {
-       struct inode *inode = filp->f_dentry->d_inode;
        struct ceph_file_info *fi = filp->private_data;
        struct dentry *parent = filp->f_dentry;
        struct inode *dir = parent->d_inode;
@@ -111,11 +112,11 @@ static int __dcache_readdir(struct file *filp,
        dout("__dcache_readdir %p at %llu (last %p)\n", dir, filp->f_pos,
             last);
 
-       spin_lock(&dcache_lock);
+       spin_lock(&parent->d_lock);
 
        /* start at beginning? */
-       if (filp->f_pos == 2 || (last &&
-                                filp->f_pos < ceph_dentry(last)->offset)) {
+       if (filp->f_pos == 2 || last == NULL ||
+           filp->f_pos < ceph_dentry(last)->offset) {
                if (list_empty(&parent->d_subdirs))
                        goto out_unlock;
                p = parent->d_subdirs.prev;
@@ -135,6 +136,7 @@ more:
                        fi->at_end = 1;
                        goto out_unlock;
                }
+               spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
                if (!d_unhashed(dentry) && dentry->d_inode &&
                    ceph_snap(dentry->d_inode) != CEPH_SNAPDIR &&
                    ceph_ino(dentry->d_inode) != CEPH_INO_CEPH &&
@@ -144,21 +146,22 @@ more:
                     dentry->d_name.len, dentry->d_name.name, di->offset,
                     filp->f_pos, d_unhashed(dentry) ? " unhashed" : "",
                     !dentry->d_inode ? " null" : "");
+               spin_unlock(&dentry->d_lock);
                p = p->prev;
                dentry = list_entry(p, struct dentry, d_u.d_child);
                di = ceph_dentry(dentry);
        }
 
-       atomic_inc(&dentry->d_count);
-       spin_unlock(&dcache_lock);
-       spin_unlock(&inode->i_lock);
+       dget_dlock(dentry);
+       spin_unlock(&dentry->d_lock);
+       spin_unlock(&parent->d_lock);
 
        dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, filp->f_pos,
             dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
        filp->f_pos = di->offset;
        err = filldir(dirent, dentry->d_name.name,
                      dentry->d_name.len, di->offset,
-                     dentry->d_inode->i_ino,
+                     ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino),
                      dentry->d_inode->i_mode >> 12);
 
        if (last) {
@@ -169,35 +172,30 @@ more:
                } else {
                        dput(last);
                }
-               last = NULL;
        }
-
-       spin_lock(&inode->i_lock);
-       spin_lock(&dcache_lock);
-
        last = dentry;
 
        if (err < 0)
-               goto out_unlock;
+               goto out;
 
-       p = p->prev;
        filp->f_pos++;
 
-       /* make sure a dentry wasn't dropped while we didn't have dcache_lock */
-       if ((ceph_inode(dir)->i_ceph_flags & CEPH_I_COMPLETE))
-               goto more;
-       dout(" lost I_COMPLETE on %p; falling back to mds\n", dir);
-       err = -EAGAIN;
+       /* make sure a dentry wasn't dropped while we didn't have parent lock */
+       if (!ceph_i_test(dir, CEPH_I_COMPLETE)) {
+               dout(" lost I_COMPLETE on %p; falling back to mds\n", dir);
+               err = -EAGAIN;
+               goto out;
+       }
 
-out_unlock:
-       spin_unlock(&dcache_lock);
+       spin_lock(&parent->d_lock);
+       p = p->prev;    /* advance to next dentry */
+       goto more;
 
-       if (last) {
-               spin_unlock(&inode->i_lock);
+out_unlock:
+       spin_unlock(&parent->d_lock);
+out:
+       if (last)
                dput(last);
-               spin_lock(&inode->i_lock);
-       }
-
        return err;
 }
 
@@ -225,15 +223,15 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
        struct ceph_file_info *fi = filp->private_data;
        struct inode *inode = filp->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_client *client = ceph_inode_to_client(inode);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned frag = fpos_frag(filp->f_pos);
        int off = fpos_off(filp->f_pos);
        int err;
        u32 ftype;
        struct ceph_mds_reply_info_parsed *rinfo;
-       const int max_entries = client->mount_args->max_readdir;
-       const int max_bytes = client->mount_args->max_readdir_bytes;
+       const int max_entries = fsc->mount_options->max_readdir;
+       const int max_bytes = fsc->mount_options->max_readdir_bytes;
 
        dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
        if (fi->at_end)
@@ -247,15 +245,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
 
                dout("readdir off 0 -> '.'\n");
                if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
-                           inode->i_ino, inode->i_mode >> 12) < 0)
+                           ceph_translate_ino(inode->i_sb, inode->i_ino),
+                           inode->i_mode >> 12) < 0)
                        return 0;
                filp->f_pos = 1;
                off = 1;
        }
        if (filp->f_pos == 1) {
+               ino_t ino = parent_ino(filp->f_dentry);
                dout("readdir off 1 -> '..'\n");
                if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1),
-                           filp->f_dentry->d_parent->d_inode->i_ino,
+                           ceph_translate_ino(inode->i_sb, ino),
                            inode->i_mode >> 12) < 0)
                        return 0;
                filp->f_pos = 2;
@@ -265,17 +265,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
        /* can we use the dcache? */
        spin_lock(&inode->i_lock);
        if ((filp->f_pos == 2 || fi->dentry) &&
-           !ceph_test_opt(client, NOASYNCREADDIR) &&
+           !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
            ceph_snap(inode) != CEPH_SNAPDIR &&
            (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
            __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+               spin_unlock(&inode->i_lock);
                err = __dcache_readdir(filp, dirent, filldir);
-               if (err != -EAGAIN) {
-                       spin_unlock(&inode->i_lock);
+               if (err != -EAGAIN)
                        return err;
-               }
+       } else {
+               spin_unlock(&inode->i_lock);
        }
-       spin_unlock(&inode->i_lock);
        if (fi->dentry) {
                err = note_last_dentry(fi, fi->dentry->d_name.name,
                                       fi->dentry->d_name.len);
@@ -308,7 +308,8 @@ more:
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
                if (IS_ERR(req))
                        return PTR_ERR(req);
-               req->r_inode = igrab(inode);
+               req->r_inode = inode;
+               ihold(inode);
                req->r_dentry = dget(filp->f_dentry);
                /* hints to request -> mds selection code */
                req->r_direct_mode = USE_AUTH_MDS;
@@ -342,7 +343,10 @@ more:
                if (req->r_reply_info.dir_end) {
                        kfree(fi->last_name);
                        fi->last_name = NULL;
-                       fi->next_offset = 2;
+                       if (ceph_frag_is_rightmost(frag))
+                               fi->next_offset = 2;
+                       else
+                               fi->next_offset = 0;
                } else {
                        rinfo = &req->r_reply_info;
                        err = note_last_dentry(fi,
@@ -357,22 +361,27 @@ more:
        rinfo = &fi->last_readdir->r_reply_info;
        dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
             rinfo->dir_nr, off, fi->offset);
-       while (off - fi->offset >= 0 && off - fi->offset < rinfo->dir_nr) {
+       while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
                u64 pos = ceph_make_fpos(frag, off);
                struct ceph_mds_reply_inode *in =
                        rinfo->dir_in[off - fi->offset].in;
+               struct ceph_vino vino;
+               ino_t ino;
+
                dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
                     off, off - fi->offset, rinfo->dir_nr, pos,
                     rinfo->dir_dname_len[off - fi->offset],
                     rinfo->dir_dname[off - fi->offset], in);
                BUG_ON(!in);
                ftype = le32_to_cpu(in->mode) >> 12;
+               vino.ino = le64_to_cpu(in->ino);
+               vino.snap = le64_to_cpu(in->snapid);
+               ino = ceph_vino_to_ino(vino);
                if (filldir(dirent,
                            rinfo->dir_dname[off - fi->offset],
                            rinfo->dir_dname_len[off - fi->offset],
                            pos,
-                           le64_to_cpu(in->ino),
-                           ftype) < 0) {
+                           ceph_translate_ino(inode->i_sb, ino), ftype) < 0) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
@@ -404,7 +413,7 @@ more:
        spin_lock(&inode->i_lock);
        if (ci->i_release_count == fi->dir_release_count) {
                dout(" marking %p complete\n", inode);
-               ci->i_ceph_flags |= CEPH_I_COMPLETE;
+               /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
                ci->i_max_offset = filp->f_pos;
        }
        spin_unlock(&inode->i_lock);
@@ -420,6 +429,7 @@ static void reset_readdir(struct ceph_file_info *fi)
                fi->last_readdir = NULL;
        }
        kfree(fi->last_name);
+       fi->last_name = NULL;
        fi->next_offset = 2;  /* compensate for . and .. */
        if (fi->dentry) {
                dput(fi->dentry);
@@ -436,14 +446,19 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
        loff_t retval;
 
        mutex_lock(&inode->i_mutex);
+       retval = -EINVAL;
        switch (origin) {
        case SEEK_END:
                offset += inode->i_size + 2;   /* FIXME */
                break;
        case SEEK_CUR:
                offset += file->f_pos;
+       case SEEK_SET:
+               break;
+       default:
+               goto out;
        }
-       retval = -EINVAL;
+
        if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
                if (offset != file->f_pos) {
                        file->f_pos = offset;
@@ -467,6 +482,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
                if (offset > old_offset)
                        fi->dir_release_count--;
        }
+out:
        mutex_unlock(&inode->i_mutex);
        return retval;
 }
@@ -485,14 +501,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
                                  struct dentry *dentry, int err)
 {
-       struct ceph_client *client = ceph_sb_to_client(dentry->d_sb);
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
        struct inode *parent = dentry->d_parent->d_inode;
 
        /* .snap dir? */
        if (err == -ENOENT &&
-           ceph_vino(parent).ino != CEPH_INO_ROOT && /* no .snap in root dir */
+           ceph_snap(parent) == CEPH_NOSNAP &&
            strcmp(dentry->d_name.name,
-                  client->mount_args->snapdir_name) == 0) {
+                  fsc->mount_options->snapdir_name) == 0) {
                struct inode *inode = ceph_get_snapdir(parent);
                dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n",
                     dentry, dentry->d_name.len, dentry->d_name.name, inode);
@@ -537,8 +553,8 @@ static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
                                  struct nameidata *nd)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int op;
        int err;
@@ -556,7 +572,6 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
        /* open (but not create!) intent? */
        if (nd &&
            (nd->flags & LOOKUP_OPEN) &&
-           (nd->flags & LOOKUP_CONTINUE) == 0 && /* only open last component */
            !(nd->intent.open.flags & O_CREAT)) {
                int mode = nd->intent.open.create_mode & ~current->fs->umask;
                return ceph_lookup_open(dir, dentry, nd, mode, 1);
@@ -570,7 +585,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
                spin_lock(&dir->i_lock);
                dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
                if (strncmp(dentry->d_name.name,
-                           client->mount_args->snapdir_name,
+                           fsc->mount_options->snapdir_name,
                            dentry->d_name.len) &&
                    !is_root_ceph_dentry(dir, dentry) &&
                    (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
@@ -627,8 +642,8 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
 static int ceph_mknod(struct inode *dir, struct dentry *dentry,
                      int mode, dev_t rdev)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -683,8 +698,8 @@ static int ceph_create(struct inode *dir, struct dentry *dentry, int mode,
 static int ceph_symlink(struct inode *dir, struct dentry *dentry,
                            const char *dest)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -714,8 +729,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 
 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err = -EROFS;
        int op;
@@ -756,8 +771,8 @@ out:
 static int ceph_link(struct dentry *old_dentry, struct inode *dir,
                     struct dentry *dentry)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -778,10 +793,12 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
-       if (err)
+       if (err) {
                d_drop(dentry);
-       else if (!req->r_reply_info.head->is_dentry)
-               d_instantiate(dentry, igrab(old_dentry->d_inode));
+       } else if (!req->r_reply_info.head->is_dentry) {
+               ihold(old_dentry->d_inode);
+               d_instantiate(dentry, old_dentry->d_inode);
+       }
        ceph_mdsc_put_request(req);
        return err;
 }
@@ -811,8 +828,8 @@ static int drop_caps_for_unlink(struct inode *inode)
  */
 static int ceph_unlink(struct inode *dir, struct dentry *dentry)
 {
-       struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct inode *inode = dentry->d_inode;
        struct ceph_mds_request *req;
        int err = -EROFS;
@@ -852,8 +869,8 @@ out:
 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
                       struct inode *new_dir, struct dentry *new_dentry)
 {
-       struct ceph_client *client = ceph_sb_to_client(old_dir->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -985,7 +1002,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
  */
 static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
 {
-       struct inode *dir = dentry->d_parent->d_inode;
+       struct inode *dir;
+
+       if (nd && nd->flags & LOOKUP_RCU)
+               return -ECHILD;
+
+       dir = dentry->d_parent->d_inode;
 
        dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry,
             dentry->d_name.len, dentry->d_name.name, dentry->d_inode,
@@ -1013,30 +1035,13 @@ out_touch:
 }
 
 /*
- * When a dentry is released, clear the dir I_COMPLETE if it was part
- * of the current dir gen or if this is in the snapshot namespace.
+ * Release our ceph_dentry_info.
  */
-static void ceph_dentry_release(struct dentry *dentry)
+static void ceph_d_release(struct dentry *dentry)
 {
        struct ceph_dentry_info *di = ceph_dentry(dentry);
-       struct inode *parent_inode = dentry->d_parent->d_inode;
-       u64 snapid = ceph_snap(parent_inode);
-
-       dout("dentry_release %p parent %p\n", dentry, parent_inode);
 
-       if (parent_inode && snapid != CEPH_SNAPDIR) {
-               struct ceph_inode_info *ci = ceph_inode(parent_inode);
-
-               spin_lock(&parent_inode->i_lock);
-               if (ci->i_shared_gen == di->lease_shared_gen ||
-                   snapid <= CEPH_MAXSNAP) {
-                       dout(" clearing %p complete (d_release)\n",
-                            parent_inode);
-                       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                       ci->i_release_count++;
-               }
-               spin_unlock(&parent_inode->i_lock);
-       }
+       dout("d_release %p\n", dentry);
        if (di) {
                ceph_dentry_lru_del(dentry);
                if (di->lease_session)
@@ -1069,16 +1074,17 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int left;
+       const int bufsize = 1024;
 
-       if (!ceph_test_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
+       if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
                return -EISDIR;
 
        if (!cf->dir_info) {
-               cf->dir_info = kmalloc(1024, GFP_NOFS);
+               cf->dir_info = kmalloc(bufsize, GFP_NOFS);
                if (!cf->dir_info)
                        return -ENOMEM;
                cf->dir_info_len =
-                       sprintf(cf->dir_info,
+                       snprintf(cf->dir_info, bufsize,
                                "entries:   %20lld\n"
                                " files:    %20lld\n"
                                " subdirs:  %20lld\n"
@@ -1112,7 +1118,8 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
  * an fsync() on a dir will wait for any uncommitted directory
  * operations to commit.
  */
-static int ceph_dir_fsync(struct file *file, int datasync)
+static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
+                         int datasync)
 {
        struct inode *inode = file->f_path.dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1122,6 +1129,11 @@ static int ceph_dir_fsync(struct file *file, int datasync)
        int ret = 0;
 
        dout("dir_fsync %p\n", inode);
+       ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
+       if (ret)
+               return ret;
+       mutex_lock(&inode->i_mutex);
+
        spin_lock(&ci->i_unsafe_lock);
        if (list_empty(head))
                goto out;
@@ -1155,6 +1167,8 @@ static int ceph_dir_fsync(struct file *file, int datasync)
        } while (req->r_tid < last_tid);
 out:
        spin_unlock(&ci->i_unsafe_lock);
+       mutex_unlock(&inode->i_mutex);
+
        return ret;
 }
 
@@ -1171,7 +1185,7 @@ void ceph_dentry_lru_add(struct dentry *dn)
        dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
             dn->d_name.len, dn->d_name.name);
        if (di) {
-               mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
+               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
                spin_lock(&mdsc->dentry_lru_lock);
                list_add_tail(&di->lru, &mdsc->dentry_lru);
                mdsc->num_dentry++;
@@ -1187,7 +1201,7 @@ void ceph_dentry_lru_touch(struct dentry *dn)
        dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
             dn->d_name.len, dn->d_name.name, di->offset);
        if (di) {
-               mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
+               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
                spin_lock(&mdsc->dentry_lru_lock);
                list_move_tail(&di->lru, &mdsc->dentry_lru);
                spin_unlock(&mdsc->dentry_lru_lock);
@@ -1202,7 +1216,7 @@ void ceph_dentry_lru_del(struct dentry *dn)
        dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
             dn->d_name.len, dn->d_name.name);
        if (di) {
-               mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
+               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
                spin_lock(&mdsc->dentry_lru_lock);
                list_del_init(&di->lru);
                mdsc->num_dentry--;
@@ -1210,6 +1224,26 @@ void ceph_dentry_lru_del(struct dentry *dn)
        }
 }
 
+/*
+ * Return name hash for a given dentry.  This is dependent on
+ * the parent directory's hash function.
+ */
+unsigned ceph_dentry_hash(struct dentry *dn)
+{
+       struct inode *dir = dn->d_parent->d_inode;
+       struct ceph_inode_info *dci = ceph_inode(dir);
+
+       switch (dci->i_dir_layout.dl_dir_hash) {
+       case 0: /* for backward compat */
+       case CEPH_STR_HASH_LINUX:
+               return dn->d_name.hash;
+
+       default:
+               return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
+                                    dn->d_name.name, dn->d_name.len);
+       }
+}
+
 const struct file_operations ceph_dir_fops = {
        .read = ceph_read_dir,
        .readdir = ceph_readdir,
@@ -1239,16 +1273,16 @@ const struct inode_operations ceph_dir_iops = {
        .create = ceph_create,
 };
 
-struct dentry_operations ceph_dentry_ops = {
+const struct dentry_operations ceph_dentry_ops = {
        .d_revalidate = ceph_d_revalidate,
-       .d_release = ceph_dentry_release,
+       .d_release = ceph_d_release,
 };
 
-struct dentry_operations ceph_snapdir_dentry_ops = {
+const struct dentry_operations ceph_snapdir_dentry_ops = {
        .d_revalidate = ceph_snapdir_d_revalidate,
-       .d_release = ceph_dentry_release,
+       .d_release = ceph_d_release,
 };
 
-struct dentry_operations ceph_snap_dentry_ops = {
-       .d_release = ceph_dentry_release,
+const struct dentry_operations ceph_snap_dentry_ops = {
+       .d_release = ceph_d_release,
 };