ocfs2: remove unnecessary dentry_unhash on rmdir/rename_dir
[linux-2.6.git] / fs / ceph / inode.c
index db68468..b54c97d 100644 (file)
@@ -1,8 +1,7 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/module.h>
 #include <linux/fs.h>
-#include <linux/smp_lock.h>
 #include <linux/slab.h>
 #include <linux/string.h>
 #include <linux/uaccess.h>
@@ -10,9 +9,11 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
+#include <linux/pagevec.h>
 
 #include "super.h"
-#include "decode.h"
+#include "mds_client.h"
+#include <linux/ceph/decode.h>
 
 /*
  * Ceph inode operations
 
 static const struct inode_operations ceph_symlink_iops;
 
-static void ceph_inode_invalidate_pages(struct work_struct *work);
+static void ceph_invalidate_work(struct work_struct *work);
+static void ceph_writeback_work(struct work_struct *work);
+static void ceph_vmtruncate_work(struct work_struct *work);
 
 /*
  * find or create an inode, given the ceph ino number
  */
+static int ceph_set_ino_cb(struct inode *inode, void *data)
+{
+       ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
+       inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
+       return 0;
+}
+
 struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
 {
        struct inode *inode;
@@ -66,7 +76,7 @@ struct inode *ceph_get_snapdir(struct inode *parent)
 
        BUG_ON(!S_ISDIR(parent->i_mode));
        if (IS_ERR(inode))
-               return ERR_PTR(PTR_ERR(inode));
+               return inode;
        inode->i_mode = parent->i_mode;
        inode->i_uid = parent->i_uid;
        inode->i_gid = parent->i_gid;
@@ -294,6 +304,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_release_count = 0;
        ci->i_symlink = NULL;
 
+       memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
+
        ci->i_fragtree = RB_ROOT;
        mutex_init(&ci->i_fragtree_mutex);
 
@@ -357,14 +369,23 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_snap_realm_item);
        INIT_LIST_HEAD(&ci->i_snap_flush_item);
 
-       INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
-       INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
+       INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
+       INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
 
        INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
        return &ci->vfs_inode;
 }
 
+static void ceph_i_callback(struct rcu_head *head)
+{
+       struct inode *inode = container_of(head, struct inode, i_rcu);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       INIT_LIST_HEAD(&inode->i_dentry);
+       kmem_cache_free(ceph_inode_cachep, ci);
+}
+
 void ceph_destroy_inode(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
@@ -375,6 +396,22 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_queue_caps_release(inode);
 
+       /*
+        * we may still have a snap_realm reference if there are stray
+        * caps in i_cap_exporting_issued or i_snap_caps.
+        */
+       if (ci->i_snap_realm) {
+               struct ceph_mds_client *mdsc =
+                       ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
+               struct ceph_snap_realm *realm = ci->i_snap_realm;
+
+               dout(" dropping residual ref to snap realm %p\n", realm);
+               spin_lock(&realm->inodes_with_caps_lock);
+               list_del_init(&ci->i_snap_realm_item);
+               spin_unlock(&realm->inodes_with_caps_lock);
+               ceph_put_snap_realm(mdsc, realm);
+       }
+
        kfree(ci->i_symlink);
        while ((n = rb_first(&ci->i_fragtree)) != NULL) {
                frag = rb_entry(n, struct ceph_inode_frag, node);
@@ -388,7 +425,7 @@ void ceph_destroy_inode(struct inode *inode)
        if (ci->i_xattrs.prealloc_blob)
                ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
-       kmem_cache_free(ceph_inode_cachep, ci);
+       call_rcu(&inode->i_rcu, ceph_i_callback);
 }
 
 
@@ -416,9 +453,18 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                        dout("truncate_seq %u -> %u\n",
                             ci->i_truncate_seq, truncate_seq);
                        ci->i_truncate_seq = truncate_seq;
-                       if (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
-                                     CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-                                     CEPH_CAP_FILE_EXCL)) {
+                       /*
+                        * If we hold relevant caps, or in the case where we're
+                        * not the only client referencing this file and we
+                        * don't hold those caps, then we need to check whether
+                        * the file is either opened or mmaped
+                        */
+                       if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
+                                      CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
+                                      CEPH_CAP_FILE_EXCL|
+                                      CEPH_CAP_FILE_LAZYIO)) ||
+                           mapping_mapped(inode->i_mapping) ||
+                           __ceph_caps_file_wanted(ci)) {
                                ci->i_truncate_pending++;
                                queue_trunc = 1;
                        }
@@ -442,7 +488,9 @@ void ceph_fill_file_time(struct inode *inode, int issued,
 
        if (issued & (CEPH_CAP_FILE_EXCL|
                      CEPH_CAP_FILE_WR|
-                     CEPH_CAP_FILE_BUFFER)) {
+                     CEPH_CAP_FILE_BUFFER|
+                     CEPH_CAP_AUTH_EXCL|
+                     CEPH_CAP_XATTR_EXCL)) {
                if (timespec_compare(ctime, &inode->i_ctime) > 0) {
                        dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
                             inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
@@ -482,7 +530,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
                        warn = 1;
                }
        } else {
-               /* we have no write caps; whatever the MDS says is true */
+               /* we have no write|excl caps; whatever the MDS says is true */
                if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
                        inode->i_ctime = *ctime;
                        inode->i_mtime = *mtime;
@@ -538,12 +586,17 @@ static int fill_inode(struct inode *inode,
 
        /*
         * provided version will be odd if inode value is projected,
-        * even if stable.  skip the update if we have a newer info
-        * (e.g., due to inode info racing form multiple MDSs), or if
-        * we are getting projected (unstable) inode info.
+        * even if stable.  skip the update if we have newer stable
+        * info (ours>=theirs, e.g. due to racing mds replies), unless
+        * we are getting projected (unstable) info (in which case the
+        * version is odd, and we want ours>theirs).
+        *   us   them
+        *   2    2     skip
+        *   3    2     skip
+        *   3    3     update
         */
        if (le64_to_cpu(info->version) > 0 &&
-           (ci->i_version & ~1) > le64_to_cpu(info->version))
+           (ci->i_version & ~1) >= le64_to_cpu(info->version))
                goto no_change;
 
        issued = __ceph_caps_issued(ci, &implemented);
@@ -577,7 +630,14 @@ static int fill_inode(struct inode *inode,
                            le32_to_cpu(info->time_warp_seq),
                            &ctime, &mtime, &atime);
 
-       ci->i_max_size = le64_to_cpu(info->max_size);
+       /* only update max_size on auth cap */
+       if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+           ci->i_max_size != le64_to_cpu(info->max_size)) {
+               dout("max_size %lld -> %llu\n", ci->i_max_size,
+                    le64_to_cpu(info->max_size));
+               ci->i_max_size = le64_to_cpu(info->max_size);
+       }
+
        ci->i_layout = info->layout;
        inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
@@ -592,11 +652,12 @@ static int fill_inode(struct inode *inode,
                        memcpy(ci->i_xattrs.blob->vec.iov_base,
                               iinfo->xattr_data, iinfo->xattr_len);
                ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
+               xattr_blob = NULL;
        }
 
        inode->i_mapping->a_ops = &ceph_aops;
        inode->i_mapping->backing_dev_info =
-               &ceph_client(inode->i_sb)->backing_dev_info;
+               &ceph_sb_to_client(inode->i_sb)->backing_dev_info;
 
        switch (inode->i_mode & S_IFMT) {
        case S_IFIFO:
@@ -637,6 +698,8 @@ static int fill_inode(struct inode *inode,
                inode->i_op = &ceph_dir_iops;
                inode->i_fop = &ceph_dir_fops;
 
+               ci->i_dir_layout = iinfo->dir_layout;
+
                ci->i_files = le64_to_cpu(info->files);
                ci->i_subdirs = le64_to_cpu(info->subdirs);
                ci->i_rbytes = le64_to_cpu(info->rbytes);
@@ -647,15 +710,13 @@ static int fill_inode(struct inode *inode,
                /* set dir completion flag? */
                if (ci->i_files == 0 && ci->i_subdirs == 0 &&
                    ceph_snap(inode) == CEPH_NOSNAP &&
-                   (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED)) {
+                   (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
+                   (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+                   (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
                        dout(" marking %p complete (empty)\n", inode);
-                       ci->i_ceph_flags |= CEPH_I_COMPLETE;
+                       /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
                        ci->i_max_offset = 2;
                }
-
-               /* it may be better to set st_size in getattr instead? */
-               if (ceph_test_opt(ceph_client(inode->i_sb), RBYTES))
-                       inode->i_size = ci->i_rbytes;
                break;
        default:
                pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
@@ -667,9 +728,7 @@ no_change:
 
        /* queue truncate if we saw i_size decrease */
        if (queue_trunc)
-               if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                              &ci->i_vmtruncate_work))
-                       igrab(inode);
+               ceph_queue_vmtruncate(inode);
 
        /* populate frag tree */
        /* FIXME: move me up, if/when version reflects fragtree changes */
@@ -708,6 +767,10 @@ no_change:
                                __ceph_get_fmode(ci, cap_fmode);
                        spin_unlock(&inode->i_lock);
                }
+       } else if (cap_fmode >= 0) {
+               pr_warning("mds issued no caps on %llx.%llx\n",
+                          ceph_vinop(inode));
+               __ceph_get_fmode(ci, cap_fmode);
        }
 
        /* update delegation info? */
@@ -773,6 +836,37 @@ out_unlock:
 }
 
 /*
+ * Set dentry's directory position based on the current dir's max, and
+ * order it in d_subdirs, so that dcache_readdir behaves.
+ */
+static void ceph_set_dentry_offset(struct dentry *dn)
+{
+       struct dentry *dir = dn->d_parent;
+       struct inode *inode = dn->d_parent->d_inode;
+       struct ceph_dentry_info *di;
+
+       BUG_ON(!inode);
+
+       di = ceph_dentry(dn);
+
+       spin_lock(&inode->i_lock);
+       if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
+               spin_unlock(&inode->i_lock);
+               return;
+       }
+       di->offset = ceph_inode(inode)->i_max_offset++;
+       spin_unlock(&inode->i_lock);
+
+       spin_lock(&dir->d_lock);
+       spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
+       list_move(&dn->d_u.d_child, &dir->d_subdirs);
+       dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
+            dn->d_u.d_child.prev, dn->d_u.d_child.next);
+       spin_unlock(&dn->d_lock);
+       spin_unlock(&dir->d_lock);
+}
+
+/*
  * splice a dentry to an inode.
  * caller must hold directory i_mutex for this to be safe.
  *
@@ -781,17 +875,19 @@ out_unlock:
  * the caller) if we fail.
  */
 static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
-                                   bool *prehash)
+                                   bool *prehash, bool set_offset)
 {
        struct dentry *realdn;
 
+       BUG_ON(dn->d_inode);
+
        /* dn must be unhashed */
        if (!d_unhashed(dn))
                d_drop(dn);
        realdn = d_materialise_unique(dn, in);
        if (IS_ERR(realdn)) {
-               pr_err("splice_dentry error %p inode %p ino %llx.%llx\n",
-                      dn, in, ceph_vinop(in));
+               pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
+                      PTR_ERR(realdn), dn, in, ceph_vinop(in));
                if (prehash)
                        *prehash = false; /* don't rehash on error */
                dn = realdn; /* note realdn contains the error */
@@ -799,19 +895,20 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
        } else if (realdn) {
                dout("dn %p (%d) spliced with %p (%d) "
                     "inode %p ino %llx.%llx\n",
-                    dn, atomic_read(&dn->d_count),
-                    realdn, atomic_read(&realdn->d_count),
+                    dn, dn->d_count,
+                    realdn, realdn->d_count,
                     realdn->d_inode, ceph_vinop(realdn->d_inode));
                dput(dn);
                dn = realdn;
        } else {
                BUG_ON(!ceph_dentry(dn));
-
                dout("dn %p attached to %p ino %llx.%llx\n",
                     dn, dn->d_inode, ceph_vinop(dn->d_inode));
        }
        if ((!prehash || *prehash) && d_unhashed(dn))
                d_rehash(dn);
+       if (set_offset)
+               ceph_set_dentry_offset(dn);
 out:
        return dn;
 }
@@ -834,6 +931,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct inode *in = NULL;
        struct ceph_mds_reply_inode *ininfo;
        struct ceph_vino vino;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
        int i = 0;
        int err = 0;
 
@@ -876,18 +974,29 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 
        if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
                dout("fill_trace reply is empty!\n");
-               if (rinfo->head->result == 0 && req->r_locked_dir) {
-                       struct ceph_inode_info *ci =
-                               ceph_inode(req->r_locked_dir);
-                       dout(" clearing %p complete (empty trace)\n",
-                            req->r_locked_dir);
-                       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                       ci->i_release_count++;
-               }
+               if (rinfo->head->result == 0 && req->r_locked_dir)
+                       ceph_invalidate_dir_request(req);
                return 0;
        }
 
        if (rinfo->head->is_dentry) {
+               struct inode *dir = req->r_locked_dir;
+
+               err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+                                session, req->r_request_started, -1,
+                                &req->r_caps_reservation);
+               if (err < 0)
+                       return err;
+       }
+
+       /*
+        * ignore null lease/binding on snapdir ENOENT, or else we
+        * will have trouble splicing in the virtual snapdir later
+        */
+       if (rinfo->head->is_dentry && !req->r_aborted &&
+           (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
+                                              fsc->mount_options->snapdir_name,
+                                              req->r_dentry->d_name.len))) {
                /*
                 * lookup link rename   : null -> possibly existing inode
                 * mknod symlink mkdir  : null -> new inode
@@ -905,12 +1014,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                BUG_ON(ceph_snap(dir) !=
                       le64_to_cpu(rinfo->diri.in->snapid));
 
-               err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
-                                session, req->r_request_started, -1,
-                                &req->r_caps_reservation);
-               if (err < 0)
-                       return err;
-
                /* do we have a lease on the whole dir? */
                have_dir_cap =
                        (le32_to_cpu(rinfo->diri.in->cap.caps) &
@@ -933,15 +1036,28 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                             dn, dn->d_name.len, dn->d_name.name);
                        dout("fill_trace doing d_move %p -> %p\n",
                             req->r_old_dentry, dn);
+
                        d_move(req->r_old_dentry, dn);
                        dout(" src %p '%.*s' dst %p '%.*s'\n",
                             req->r_old_dentry,
                             req->r_old_dentry->d_name.len,
                             req->r_old_dentry->d_name.name,
                             dn, dn->d_name.len, dn->d_name.name);
-                       /* take overwritten dentry's readdir offset */
-                       ceph_dentry(req->r_old_dentry)->offset =
-                               ceph_dentry(dn)->offset;
+
+                       /* ensure target dentry is invalidated, despite
+                          rehashing bug in vfs_rename_dir */
+                       ceph_invalidate_dentry_lease(dn);
+
+                       /*
+                        * d_move() puts the renamed dentry at the end of
+                        * d_subdirs.  We need to assign it an appropriate
+                        * directory offset so we can behave when holding
+                        * I_COMPLETE.
+                        */
+                       ceph_set_dentry_offset(req->r_old_dentry);
+                       dout("dn %p gets new offset %lld\n", req->r_old_dentry, 
+                            ceph_dentry(req->r_old_dentry)->offset);
+
                        dn = req->r_old_dentry;  /* use old_dentry */
                        in = dn->d_inode;
                }
@@ -968,7 +1084,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                ininfo = rinfo->targeti.in;
                vino.ino = le64_to_cpu(ininfo->ino);
                vino.snap = le64_to_cpu(ininfo->snapid);
-               if (!dn->d_inode) {
+               in = dn->d_inode;
+               if (!in) {
                        in = ceph_get_inode(sb, vino);
                        if (IS_ERR(in)) {
                                pr_err("fill_trace bad get_inode "
@@ -977,7 +1094,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                                d_delete(dn);
                                goto done;
                        }
-                       dn = splice_dentry(dn, in, &have_lease);
+                       dn = splice_dentry(dn, in, &have_lease, true);
                        if (IS_ERR(dn)) {
                                err = PTR_ERR(dn);
                                goto done;
@@ -1020,7 +1137,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        goto done;
                }
                dout(" linking snapped dir %p to dn %p\n", in, dn);
-               dn = splice_dentry(dn, in, NULL);
+               dn = splice_dentry(dn, in, NULL, true);
                if (IS_ERR(dn)) {
                        err = PTR_ERR(dn);
                        goto done;
@@ -1116,8 +1233,10 @@ retry_lookup:
                                goto out;
                        }
                        err = ceph_init_dentry(dn);
-                       if (err < 0)
+                       if (err < 0) {
+                               dput(dn);
                                goto out;
+                       }
                } else if (dn->d_inode &&
                           (ceph_ino(dn->d_inode) != vino.ino ||
                            ceph_snap(dn->d_inode) != vino.snap)) {
@@ -1128,11 +1247,11 @@ retry_lookup:
                        goto retry_lookup;
                } else {
                        /* reorder parent's d_subdirs */
-                       spin_lock(&dcache_lock);
-                       spin_lock(&dn->d_lock);
+                       spin_lock(&parent->d_lock);
+                       spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
                        list_move(&dn->d_u.d_child, &parent->d_subdirs);
                        spin_unlock(&dn->d_lock);
-                       spin_unlock(&dcache_lock);
+                       spin_unlock(&parent->d_lock);
                }
 
                di = dn->d_fsdata;
@@ -1143,26 +1262,31 @@ retry_lookup:
                        in = dn->d_inode;
                } else {
                        in = ceph_get_inode(parent->d_sb, vino);
-                       if (in == NULL) {
+                       if (IS_ERR(in)) {
                                dout("new_inode badness\n");
                                d_delete(dn);
                                dput(dn);
-                               err = -ENOMEM;
+                               err = PTR_ERR(in);
                                goto out;
                        }
-                       dn = splice_dentry(dn, in, NULL);
+                       dn = splice_dentry(dn, in, NULL, false);
+                       if (IS_ERR(dn))
+                               dn = NULL;
                }
 
                if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
                               req->r_request_started, -1,
                               &req->r_caps_reservation) < 0) {
                        pr_err("fill_inode badness on %p\n", in);
-                       dput(dn);
-                       continue;
+                       goto next_item;
                }
-               update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                   req->r_session, req->r_request_started);
-               dput(dn);
+               if (dn)
+                       update_dentry_lease(dn, rinfo->dir_dlease[i],
+                                           req->r_session,
+                                           req->r_request_started);
+next_item:
+               if (dn)
+                       dput(dn);
        }
        req->r_did_prepopulate = true;
 
@@ -1198,7 +1322,18 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
  * Write back inode data in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-void ceph_inode_writeback(struct work_struct *work)
+void ceph_queue_writeback(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->wb_wq,
+                      &ceph_inode(inode)->i_wb_work)) {
+               dout("ceph_queue_writeback %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_writeback %p failed\n", inode);
+       }
+}
+
+static void ceph_writeback_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_wb_work);
@@ -1210,10 +1345,67 @@ void ceph_inode_writeback(struct work_struct *work)
 }
 
 /*
+ * queue an async invalidation
+ */
+void ceph_queue_invalidate(struct inode *inode)
+{
+       if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
+                      &ceph_inode(inode)->i_pg_inv_work)) {
+               dout("ceph_queue_invalidate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_invalidate %p failed\n", inode);
+       }
+}
+
+/*
+ * invalidate any pages that are not dirty or under writeback.  this
+ * includes pages that are clean and mapped.
+ */
+static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
+{
+       struct pagevec pvec;
+       pgoff_t next = 0;
+       int i;
+
+       pagevec_init(&pvec, 0);
+       while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
+               for (i = 0; i < pagevec_count(&pvec); i++) {
+                       struct page *page = pvec.pages[i];
+                       pgoff_t index;
+                       int skip_page =
+                               (PageDirty(page) || PageWriteback(page));
+
+                       if (!skip_page)
+                               skip_page = !trylock_page(page);
+
+                       /*
+                        * We really shouldn't be looking at the ->index of an
+                        * unlocked page.  But we're not allowed to lock these
+                        * pages.  So we rely upon nobody altering the ->index
+                        * of this (pinned-by-us) page.
+                        */
+                       index = page->index;
+                       if (index > next)
+                               next = index;
+                       next++;
+
+                       if (skip_page)
+                               continue;
+
+                       generic_error_remove_page(mapping, page);
+                       unlock_page(page);
+               }
+               pagevec_release(&pvec);
+               cond_resched();
+       }
+}
+
+/*
  * Invalidate inode pages in a worker thread.  (This can't be done
  * in the message handler context.)
  */
-static void ceph_inode_invalidate_pages(struct work_struct *work)
+static void ceph_invalidate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_pg_inv_work);
@@ -1224,29 +1416,27 @@ static void ceph_inode_invalidate_pages(struct work_struct *work)
        spin_lock(&inode->i_lock);
        dout("invalidate_pages %p gen %d revoking %d\n", inode,
             ci->i_rdcache_gen, ci->i_rdcache_revoking);
-       if (ci->i_rdcache_gen == 0 ||
-           ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-               BUG_ON(ci->i_rdcache_revoking > ci->i_rdcache_gen);
+       if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
                /* nevermind! */
-               ci->i_rdcache_revoking = 0;
                spin_unlock(&inode->i_lock);
                goto out;
        }
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&inode->i_lock);
 
-       truncate_inode_pages(&inode->i_data, 0);
+       ceph_invalidate_nondirty_pages(inode->i_mapping);
 
        spin_lock(&inode->i_lock);
-       if (orig_gen == ci->i_rdcache_gen) {
+       if (orig_gen == ci->i_rdcache_gen &&
+           orig_gen == ci->i_rdcache_revoking) {
                dout("invalidate_pages %p gen %d successful\n", inode,
                     ci->i_rdcache_gen);
-               ci->i_rdcache_gen = 0;
-               ci->i_rdcache_revoking = 0;
+               ci->i_rdcache_revoking--;
                check = 1;
        } else {
-               dout("invalidate_pages %p gen %d raced, gen now %d\n",
-                    inode, orig_gen, ci->i_rdcache_gen);
+               dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
+                    inode, orig_gen, ci->i_rdcache_gen,
+                    ci->i_rdcache_revoking);
        }
        spin_unlock(&inode->i_lock);
 
@@ -1262,7 +1452,7 @@ out:
  *
  * We also truncate in a separate thread as well.
  */
-void ceph_vmtruncate_work(struct work_struct *work)
+static void ceph_vmtruncate_work(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_vmtruncate_work);
@@ -1276,6 +1466,24 @@ void ceph_vmtruncate_work(struct work_struct *work)
 }
 
 /*
+ * Queue an async vmtruncate.  If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
+ */
+void ceph_queue_vmtruncate(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
+                      &ci->i_vmtruncate_work)) {
+               dout("ceph_queue_vmtruncate %p\n", inode);
+               igrab(inode);
+       } else {
+               dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
+                    inode, ci->i_truncate_pending);
+       }
+}
+
+/*
  * called with i_mutex held.
  *
  * Make sure any pending truncation is applied before doing anything
@@ -1325,7 +1533,7 @@ retry:
        if (wrbuffer_refs == 0)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
        if (wake)
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
 }
 
 
@@ -1354,12 +1562,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        struct inode *parent_inode = dentry->d_parent->d_inode;
        const unsigned int ia_valid = attr->ia_valid;
        struct ceph_mds_request *req;
-       struct ceph_mds_client *mdsc = &ceph_client(dentry->d_sb)->mdsc;
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
        int issued;
        int release = 0, dirtied = 0;
        int mask = 0;
        int err = 0;
-       int queue_trunc = 0;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
@@ -1473,11 +1680,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
                        inode->i_size = attr->ia_size;
-                       if (attr->ia_size < inode->i_size) {
-                               ci->i_truncate_size = attr->ia_size;
-                               ci->i_truncate_pending++;
-                               queue_trunc = 1;
-                       }
                        inode->i_blocks =
                                (attr->ia_size + (1 << 9) - 1) >> 9;
                        inode->i_ctime = attr->ia_ctime;
@@ -1530,9 +1732,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        release &= issued;
        spin_unlock(&inode->i_lock);
 
-       if (queue_trunc)
-               __ceph_do_pending_vmtruncate(inode);
-
        if (mask) {
                req->r_inode = igrab(inode);
                req->r_inode_drop = release;
@@ -1558,8 +1757,8 @@ out:
  */
 int ceph_do_getattr(struct inode *inode, int mask)
 {
-       struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -1568,7 +1767,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
                return 0;
        }
 
-       dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
+       dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
        if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
                return 0;
 
@@ -1589,12 +1788,17 @@ int ceph_do_getattr(struct inode *inode, int mask)
  * Check inode permissions.  We verify we have a valid value for
  * the AUTH cap, then call the generic handler.
  */
-int ceph_permission(struct inode *inode, int mask)
+int ceph_permission(struct inode *inode, int mask, unsigned int flags)
 {
-       int err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);
+       int err;
+
+       if (flags & IPERM_FLAG_RCU)
+               return -ECHILD;
+
+       err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);
 
        if (!err)
-               err = generic_permission(inode, mask, NULL);
+               err = generic_permission(inode, mask, flags, NULL);
        return err;
 }
 
@@ -1612,13 +1816,17 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
        err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
        if (!err) {
                generic_fillattr(inode, stat);
-               stat->ino = inode->i_ino;
+               stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
                if (ceph_snap(inode) != CEPH_NOSNAP)
                        stat->dev = ceph_snap(inode);
                else
                        stat->dev = 0;
                if (S_ISDIR(inode->i_mode)) {
-                       stat->size = ci->i_rbytes;
+                       if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
+                                               RBYTES))
+                               stat->size = ci->i_rbytes;
+                       else
+                               stat->size = ci->i_files + ci->i_subdirs;
                        stat->blocks = 0;
                        stat->blksize = 65536;
                }