Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
Linus Torvalds [Fri, 13 Jan 2012 18:29:21 +0000 (10:29 -0800)]
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: ensure prealloc_blob is in place when removing xattr
  rbd: initialize snap_rwsem in rbd_add()
  ceph: enable/disable dentry complete flags via mount option
  vfs: export symbol d_find_any_alias()
  ceph: always initialize the dentry in open_root_dentry()
  libceph: remove useless return value for osd_client __send_request()
  ceph: avoid iput() while holding spinlock in ceph_dir_fsync
  ceph: avoid useless dget/dput in encode_fh
  ceph: dereference pointer after checking for NULL
  crush: fix force for non-root TAKE
  ceph: remove unnecessary d_fsdata conditional checks
  ceph: Use kmemdup rather than duplicating its implementation

Fix up conflicts in fs/ceph/super.c (d_alloc_root() failure handling vs
always initialize the dentry in open_root_dentry)

14 files changed:
Documentation/filesystems/ceph.txt
drivers/block/rbd.c
fs/ceph/dir.c
fs/ceph/export.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
fs/dcache.c
include/linux/dcache.h
net/ceph/crush/mapper.c
net/ceph/crypto.c
net/ceph/osd_client.c

index 763d8eb..d6030aa 100644 (file)
@@ -119,12 +119,20 @@ Mount Options
        must rely on TCP's error correction to detect data corruption
        in the data payload.
 
-  noasyncreaddir
-       Disable client's use its local cache to satisfy readdir
-       requests.  (This does not change correctness; the client uses
-       cached metadata only when a lease or capability ensures it is
-       valid.)
+  dcache
+        Use the dcache contents to perform negative lookups and
+        readdir when the client has the entire directory contents in
+        its cache.  (This does not change correctness; the client uses
+        cached metadata only when a lease or capability ensures it is
+        valid.)
+
+  nodcache
+        Do not use the dcache as above.  This avoids a significant amount of
+        complex code, sacrificing performance without affecting correctness,
+        and is useful for tracking down bugs.
 
+  noasyncreaddir
+       Do not use the dcache as above for readdir.
 
 More Information
 ================
index 148ab94..3fd31de 100644 (file)
@@ -2184,6 +2184,8 @@ static ssize_t rbd_add(struct bus_type *bus,
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
 
+       init_rwsem(&rbd_dev->header.snap_rwsem);
+
        /* generate unique id: find highest unique id, add one */
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 
index 74fd747..618246b 100644 (file)
@@ -973,7 +973,7 @@ static int dentry_lease_is_valid(struct dentry *dentry)
 
        spin_lock(&dentry->d_lock);
        di = ceph_dentry(dentry);
-       if (di && di->lease_session) {
+       if (di->lease_session) {
                s = di->lease_session;
                spin_lock(&s->s_cap_lock);
                gen = s->s_cap_gen;
@@ -1072,13 +1072,11 @@ static void ceph_d_release(struct dentry *dentry)
        struct ceph_dentry_info *di = ceph_dentry(dentry);
 
        dout("d_release %p\n", dentry);
-       if (di) {
-               ceph_dentry_lru_del(dentry);
-               if (di->lease_session)
-                       ceph_put_mds_session(di->lease_session);
-               kmem_cache_free(ceph_dentry_cachep, di);
-               dentry->d_fsdata = NULL;
-       }
+       ceph_dentry_lru_del(dentry);
+       if (di->lease_session)
+               ceph_put_mds_session(di->lease_session);
+       kmem_cache_free(ceph_dentry_cachep, di);
+       dentry->d_fsdata = NULL;
 }
 
 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
@@ -1096,17 +1094,36 @@ static int ceph_snapdir_d_revalidate(struct dentry *dentry,
  */
 void ceph_dir_set_complete(struct inode *inode)
 {
-       /* not yet implemented */
+       struct dentry *dentry = d_find_any_alias(inode);
+       
+       if (dentry && ceph_dentry(dentry) &&
+           ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
+               dout(" marking %p (%p) complete\n", inode, dentry);
+               set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+       }
+       dput(dentry);
 }
 
 void ceph_dir_clear_complete(struct inode *inode)
 {
-       /* not yet implemented */
+       struct dentry *dentry = d_find_any_alias(inode);
+
+       if (dentry && ceph_dentry(dentry)) {
+               dout(" marking %p (%p) complete\n", inode, dentry);
+               set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+       }
+       dput(dentry);
 }
 
 bool ceph_dir_test_complete(struct inode *inode)
 {
-       /* not yet implemented */
+       struct dentry *dentry = d_find_any_alias(inode);
+
+       if (dentry && ceph_dentry(dentry)) {
+               dout(" marking %p (%p) NOT complete\n", inode, dentry);
+               clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+       }
+       dput(dentry);
        return false;
 }
 
@@ -1220,6 +1237,7 @@ static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
        do {
                ceph_mdsc_get_request(req);
                spin_unlock(&ci->i_unsafe_lock);
+
                dout("dir_fsync %p wait on tid %llu (until %llu)\n",
                     inode, req->r_tid, last_tid);
                if (req->r_timeout) {
@@ -1232,9 +1250,9 @@ static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
                } else {
                        wait_for_completion(&req->r_safe_completion);
                }
-               spin_lock(&ci->i_unsafe_lock);
                ceph_mdsc_put_request(req);
 
+               spin_lock(&ci->i_unsafe_lock);
                if (ret || list_empty(head))
                        break;
                req = list_entry(head->next,
@@ -1259,13 +1277,11 @@ void ceph_dentry_lru_add(struct dentry *dn)
 
        dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
             dn->d_name.len, dn->d_name.name);
-       if (di) {
-               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-               spin_lock(&mdsc->dentry_lru_lock);
-               list_add_tail(&di->lru, &mdsc->dentry_lru);
-               mdsc->num_dentry++;
-               spin_unlock(&mdsc->dentry_lru_lock);
-       }
+       mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+       spin_lock(&mdsc->dentry_lru_lock);
+       list_add_tail(&di->lru, &mdsc->dentry_lru);
+       mdsc->num_dentry++;
+       spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 void ceph_dentry_lru_touch(struct dentry *dn)
@@ -1275,12 +1291,10 @@ void ceph_dentry_lru_touch(struct dentry *dn)
 
        dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
             dn->d_name.len, dn->d_name.name, di->offset);
-       if (di) {
-               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-               spin_lock(&mdsc->dentry_lru_lock);
-               list_move_tail(&di->lru, &mdsc->dentry_lru);
-               spin_unlock(&mdsc->dentry_lru_lock);
-       }
+       mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+       spin_lock(&mdsc->dentry_lru_lock);
+       list_move_tail(&di->lru, &mdsc->dentry_lru);
+       spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 void ceph_dentry_lru_del(struct dentry *dn)
@@ -1290,13 +1304,11 @@ void ceph_dentry_lru_del(struct dentry *dn)
 
        dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
             dn->d_name.len, dn->d_name.name);
-       if (di) {
-               mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-               spin_lock(&mdsc->dentry_lru_lock);
-               list_del_init(&di->lru);
-               mdsc->num_dentry--;
-               spin_unlock(&mdsc->dentry_lru_lock);
-       }
+       mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+       spin_lock(&mdsc->dentry_lru_lock);
+       list_del_init(&di->lru);
+       mdsc->num_dentry--;
+       spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 /*
index 9fbcdec..fbb2a64 100644 (file)
@@ -56,9 +56,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
                return -EINVAL;
 
        spin_lock(&dentry->d_lock);
-       parent = dget(dentry->d_parent);
-       spin_unlock(&dentry->d_lock);
-
+       parent = dentry->d_parent;
        if (*max_len >= connected_handle_length) {
                dout("encode_fh %p connectable\n", dentry);
                cfh->ino = ceph_ino(dentry->d_inode);
@@ -81,7 +79,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
                *max_len = handle_length;
                type = 255;
        }
-       dput(parent);
+       spin_unlock(&dentry->d_lock);
        return type;
 }
 
index 25283e7..2c48937 100644 (file)
@@ -850,11 +850,12 @@ static void ceph_set_dentry_offset(struct dentry *dn)
 {
        struct dentry *dir = dn->d_parent;
        struct inode *inode = dir->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_info *ci;
        struct ceph_dentry_info *di;
 
        BUG_ON(!inode);
 
+       ci = ceph_inode(inode);
        di = ceph_dentry(dn);
 
        spin_lock(&ci->i_ceph_lock);
index 6203d80..23ab6a3 100644 (file)
@@ -2772,7 +2772,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
        di = ceph_dentry(dentry);
        switch (h->action) {
        case CEPH_MDS_LEASE_REVOKE:
-               if (di && di->lease_session == session) {
+               if (di->lease_session == session) {
                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
                                h->seq = cpu_to_le32(di->lease_seq);
                        __ceph_mdsc_drop_dentry_lease(dentry);
@@ -2781,7 +2781,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
                break;
 
        case CEPH_MDS_LEASE_RENEW:
-               if (di && di->lease_session == session &&
+               if (di->lease_session == session &&
                    di->lease_gen == session->s_cap_gen &&
                    di->lease_renew_from &&
                    di->lease_renew_after == 0) {
index 48f61a1..00de2c9 100644 (file)
@@ -131,6 +131,8 @@ enum {
        Opt_rbytes,
        Opt_norbytes,
        Opt_noasyncreaddir,
+       Opt_dcache,
+       Opt_nodcache,
        Opt_ino32,
 };
 
@@ -152,6 +154,8 @@ static match_table_t fsopt_tokens = {
        {Opt_rbytes, "rbytes"},
        {Opt_norbytes, "norbytes"},
        {Opt_noasyncreaddir, "noasyncreaddir"},
+       {Opt_dcache, "dcache"},
+       {Opt_nodcache, "nodcache"},
        {Opt_ino32, "ino32"},
        {-1, NULL}
 };
@@ -231,6 +235,12 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_noasyncreaddir:
                fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
                break;
+       case Opt_dcache:
+               fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
+               break;
+       case Opt_nodcache:
+               fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
+               break;
        case Opt_ino32:
                fsopt->flags |= CEPH_MOUNT_OPT_INO32;
                break;
@@ -377,6 +387,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
                seq_puts(m, ",norbytes");
        if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
                seq_puts(m, ",noasyncreaddir");
+       if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
+               seq_puts(m, ",dcache");
+       else
+               seq_puts(m, ",nodcache");
 
        if (fsopt->wsize)
                seq_printf(m, ",wsize=%d", fsopt->wsize);
@@ -647,10 +661,10 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
                                root = ERR_PTR(-ENOMEM);
                                goto out;
                        }
-                       ceph_init_dentry(root);
                } else {
                        root = d_obtain_alias(inode);
                }
+               ceph_init_dentry(root);
                dout("open_root_inode success, root dentry is %p\n", root);
        } else {
                root = ERR_PTR(err);
index cb3652b..1421f3d 100644 (file)
@@ -28,6 +28,7 @@
 #define CEPH_MOUNT_OPT_RBYTES          (1<<5) /* dir st_bytes = rbytes */
 #define CEPH_MOUNT_OPT_NOASYNCREADDIR  (1<<7) /* no dcache readdir */
 #define CEPH_MOUNT_OPT_INO32           (1<<8) /* 32 bit inos */
+#define CEPH_MOUNT_OPT_DCACHE          (1<<9) /* use dcache for readdir etc */
 
 #define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES)
 
index a5e36e4..857214a 100644 (file)
@@ -818,6 +818,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
        struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
        int issued;
        int err;
+       int required_blob_size;
        int dirty;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
@@ -833,14 +834,34 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
                        return -EOPNOTSUPP;
        }
 
+       err = -ENOMEM;
        spin_lock(&ci->i_ceph_lock);
        __build_xattrs(inode);
+retry:
        issued = __ceph_caps_issued(ci, NULL);
        dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
 
        if (!(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
 
+       required_blob_size = __get_required_blob_size(ci, 0, 0);
+
+       if (!ci->i_xattrs.prealloc_blob ||
+           required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
+               struct ceph_buffer *blob;
+
+               spin_unlock(&ci->i_ceph_lock);
+               dout(" preaallocating new blob size=%d\n", required_blob_size);
+               blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
+               if (!blob)
+                       goto out;
+               spin_lock(&ci->i_ceph_lock);
+               if (ci->i_xattrs.prealloc_blob)
+                       ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+               ci->i_xattrs.prealloc_blob = blob;
+               goto retry;
+       }
+
        err = __remove_xattr_by_name(ceph_inode(inode), name);
        dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
        ci->i_xattrs.dirty = true;
@@ -853,6 +874,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
 do_sync:
        spin_unlock(&ci->i_ceph_lock);
        err = ceph_send_removexattr(dentry, name);
+out:
        return err;
 }
 
index 616fedf..16a53cc 100644 (file)
@@ -1475,7 +1475,14 @@ static struct dentry * __d_find_any_alias(struct inode *inode)
        return alias;
 }
 
-static struct dentry * d_find_any_alias(struct inode *inode)
+/**
+ * d_find_any_alias - find any alias for a given inode
+ * @inode: inode to find an alias for
+ *
+ * If any aliases exist for the given inode, take and return a
+ * reference for one of them.  If no aliases exist, return %NULL.
+ */
+struct dentry *d_find_any_alias(struct inode *inode)
 {
        struct dentry *de;
 
@@ -1484,7 +1491,7 @@ static struct dentry * d_find_any_alias(struct inode *inode)
        spin_unlock(&inode->i_lock);
        return de;
 }
-
+EXPORT_SYMBOL(d_find_any_alias);
 
 /**
  * d_obtain_alias - find or allocate a dentry for a given inode
index 31f7322..d64a55b 100644 (file)
@@ -242,6 +242,7 @@ extern struct dentry * d_alloc(struct dentry *, const struct qstr *);
 extern struct dentry * d_alloc_pseudo(struct super_block *, const struct qstr *);
 extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
 extern struct dentry * d_add_ci(struct dentry *, struct inode *, struct qstr *);
+extern struct dentry *d_find_any_alias(struct inode *inode);
 extern struct dentry * d_obtain_alias(struct inode *);
 extern void shrink_dcache_sb(struct super_block *);
 extern void shrink_dcache_parent(struct dentry *);
index 3a94eae..b79747c 100644 (file)
@@ -510,10 +510,15 @@ int crush_do_rule(struct crush_map *map,
                switch (rule->steps[step].op) {
                case CRUSH_RULE_TAKE:
                        w[0] = rule->steps[step].arg1;
-                       if (force_pos >= 0) {
-                               BUG_ON(force_context[force_pos] != w[0]);
+
+                       /* find position in force_context/hierarchy */
+                       while (force_pos >= 0 &&
+                              force_context[force_pos] != w[0])
                                force_pos--;
-                       }
+                       /* and move past it */
+                       if (force_pos >= 0)
+                               force_pos--;
+
                        wsize = 1;
                        break;
 
index 85f3bc0..b780cb7 100644 (file)
@@ -15,10 +15,9 @@ int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
                          const struct ceph_crypto_key *src)
 {
        memcpy(dst, src, sizeof(struct ceph_crypto_key));
-       dst->key = kmalloc(src->len, GFP_NOFS);
+       dst->key = kmemdup(src->key, src->len, GFP_NOFS);
        if (!dst->key)
                return -ENOMEM;
-       memcpy(dst->key, src->key, src->len);
        return 0;
 }
 
index f4f3f58..5e25405 100644 (file)
@@ -29,8 +29,8 @@ static void __register_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req);
-static int __send_request(struct ceph_osd_client *osdc,
-                         struct ceph_osd_request *req);
+static void __send_request(struct ceph_osd_client *osdc,
+                          struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -1022,8 +1022,8 @@ out:
 /*
  * caller should hold map_sem (for read) and request_mutex
  */
-static int __send_request(struct ceph_osd_client *osdc,
-                         struct ceph_osd_request *req)
+static void __send_request(struct ceph_osd_client *osdc,
+                          struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
 
@@ -1041,7 +1041,6 @@ static int __send_request(struct ceph_osd_client *osdc,
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
        req->r_sent = req->r_osd->o_incarnation;
-       return 0;
 }
 
 /*
@@ -1726,17 +1725,9 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                        dout("send_request %p no up osds in pg\n", req);
                        ceph_monc_request_next_osdmap(&osdc->client->monc);
                } else {
-                       rc = __send_request(osdc, req);
-                       if (rc) {
-                               if (nofail) {
-                                       dout("osdc_start_request failed send, "
-                                            " will retry %lld\n", req->r_tid);
-                                       rc = 0;
-                               } else {
-                                       __unregister_request(osdc, req);
-                               }
-                       }
+                       __send_request(osdc, req);
                }
+               rc = 0;
        }
 
 out_unlock: