Merge git://bedivere.hansenpartnership.com/git/scsi-rc-fixes-2.6
[linux-2.6.git] / fs / ceph / mds_client.c
index 509339c..86c59e1 100644 (file)
@@ -483,22 +483,26 @@ void ceph_mdsc_release_request(struct kref *kref)
                destroy_reply_info(&req->r_reply_info);
        }
        if (req->r_inode) {
-               ceph_put_cap_refs(ceph_inode(req->r_inode),
-                                 CEPH_CAP_PIN);
+               ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
                iput(req->r_inode);
        }
        if (req->r_locked_dir)
-               ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
-                                 CEPH_CAP_PIN);
+               ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
        if (req->r_target_inode)
                iput(req->r_target_inode);
        if (req->r_dentry)
                dput(req->r_dentry);
        if (req->r_old_dentry) {
-               ceph_put_cap_refs(
-                       ceph_inode(req->r_old_dentry->d_parent->d_inode),
-                       CEPH_CAP_PIN);
+               /*
+                * track (and drop pins for) r_old_dentry_dir
+                * separately, since r_old_dentry's d_parent may have
+                * changed between the dir mutex being dropped and
+                * this request being freed.
+                */
+               ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
+                                 CEPH_CAP_PIN);
                dput(req->r_old_dentry);
+               iput(req->r_old_dentry_dir);
        }
        kfree(req->r_path1);
        kfree(req->r_path2);
@@ -578,6 +582,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
        if (dir) {
                struct ceph_inode_info *ci = ceph_inode(dir);
 
+               ihold(dir);
                spin_lock(&ci->i_unsafe_lock);
                req->r_unsafe_dir = dir;
                list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
@@ -598,6 +603,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
                spin_lock(&ci->i_unsafe_lock);
                list_del_init(&req->r_unsafe_dir_item);
                spin_unlock(&ci->i_unsafe_lock);
+
+               iput(req->r_unsafe_dir);
+               req->r_unsafe_dir = NULL;
        }
 
        ceph_mdsc_put_request(req);
@@ -613,6 +621,12 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  */
 struct dentry *get_nonsnap_parent(struct dentry *dentry)
 {
+       /*
+        * we don't need to worry about protecting the d_parent access
+        * here because we never renaming inside the snapped namespace
+        * except to resplice to another snapdir, and either the old or new
+        * result is a valid result.
+        */
        while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
                dentry = dentry->d_parent;
        return dentry;
@@ -648,7 +662,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
        if (req->r_inode) {
                inode = req->r_inode;
        } else if (req->r_dentry) {
-               struct inode *dir = req->r_dentry->d_parent->d_inode;
+               /* ignore race with rename; old or new d_parent is okay */
+               struct dentry *parent = req->r_dentry->d_parent;
+               struct inode *dir = parent->d_inode;
 
                if (dir->i_sb != mdsc->fsc->sb) {
                        /* not this fs! */
@@ -656,8 +672,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
                        /* direct snapped/virtual snapdir requests
                         * based on parent dir inode */
-                       struct dentry *dn =
-                               get_nonsnap_parent(req->r_dentry->d_parent);
+                       struct dentry *dn = get_nonsnap_parent(parent);
                        inode = dn->d_inode;
                        dout("__choose_mds using nonsnap parent %p\n", inode);
                } else if (req->r_dentry->d_inode) {
@@ -666,7 +681,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                } else {
                        /* dir + name */
                        inode = dir;
-                       hash = ceph_dentry_hash(req->r_dentry);
+                       hash = ceph_dentry_hash(dir, req->r_dentry);
                        is_hash = true;
                }
        }
@@ -693,9 +708,11 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                                dout("choose_mds %p %llx.%llx "
                                     "frag %u mds%d (%d/%d)\n",
                                     inode, ceph_vinop(inode),
-                                    frag.frag, frag.mds,
+                                    frag.frag, mds,
                                     (int)r, frag.ndist);
-                               return mds;
+                               if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
+                                   CEPH_MDS_STATE_ACTIVE)
+                                       return mds;
                        }
 
                        /* since this file/dir wasn't known to be
@@ -708,7 +725,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                                dout("choose_mds %p %llx.%llx "
                                     "frag %u mds%d (auth)\n",
                                     inode, ceph_vinop(inode), frag.frag, mds);
-                               return mds;
+                               if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
+                                   CEPH_MDS_STATE_ACTIVE)
+                                       return mds;
                        }
                }
        }
@@ -1430,12 +1449,15 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
        struct dentry *temp;
        char *path;
        int len, pos;
+       unsigned seq;
 
        if (dentry == NULL)
                return ERR_PTR(-EINVAL);
 
 retry:
        len = 0;
+       seq = read_seqbegin(&rename_lock);
+       rcu_read_lock();
        for (temp = dentry; !IS_ROOT(temp);) {
                struct inode *inode = temp->d_inode;
                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
@@ -1447,10 +1469,12 @@ retry:
                        len += 1 + temp->d_name.len;
                temp = temp->d_parent;
                if (temp == NULL) {
+                       rcu_read_unlock();
                        pr_err("build_path corrupt dentry %p\n", dentry);
                        return ERR_PTR(-EINVAL);
                }
        }
+       rcu_read_unlock();
        if (len)
                len--;  /* no leading '/' */
 
@@ -1459,9 +1483,12 @@ retry:
                return ERR_PTR(-ENOMEM);
        pos = len;
        path[pos] = 0;  /* trailing null */
+       rcu_read_lock();
        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
-               struct inode *inode = temp->d_inode;
+               struct inode *inode;
 
+               spin_lock(&temp->d_lock);
+               inode = temp->d_inode;
                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
                        dout("build_path path+%d: %p SNAPDIR\n",
                             pos, temp);
@@ -1470,21 +1497,26 @@ retry:
                        break;
                } else {
                        pos -= temp->d_name.len;
-                       if (pos < 0)
+                       if (pos < 0) {
+                               spin_unlock(&temp->d_lock);
                                break;
+                       }
                        strncpy(path + pos, temp->d_name.name,
                                temp->d_name.len);
                }
+               spin_unlock(&temp->d_lock);
                if (pos)
                        path[--pos] = '/';
                temp = temp->d_parent;
                if (temp == NULL) {
+                       rcu_read_unlock();
                        pr_err("build_path corrupt dentry\n");
                        kfree(path);
                        return ERR_PTR(-EINVAL);
                }
        }
-       if (pos != 0) {
+       rcu_read_unlock();
+       if (pos != 0 || read_seqretry(&rename_lock, seq)) {
                pr_err("build_path did not end path lookup where "
                       "expected, namelen is %d, pos is %d\n", len, pos);
                /* presumably this is only possible if racing with a
@@ -1498,7 +1530,7 @@ retry:
        *base = ceph_ino(temp->d_inode);
        *plen = len;
        dout("build_path on %p %d built %llx '%.*s'\n",
-            dentry, atomic_read(&dentry->d_count), *base, len, path);
+            dentry, dentry->d_count, *base, len, path);
        return path;
 }
 
@@ -1563,7 +1595,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
                r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
                     *ppath);
-       } else if (rpath) {
+       } else if (rpath || rino) {
                *ino = rino;
                *ppath = rpath;
                *pathlen = strlen(rpath);
@@ -1910,9 +1942,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
        if (req->r_locked_dir)
                ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
        if (req->r_old_dentry)
-               ceph_get_cap_refs(
-                       ceph_inode(req->r_old_dentry->d_parent->d_inode),
-                       CEPH_CAP_PIN);
+               ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
+                                 CEPH_CAP_PIN);
 
        /* issue */
        mutex_lock(&mdsc->mutex);
@@ -2687,14 +2718,12 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 {
        struct super_block *sb = mdsc->fsc->sb;
        struct inode *inode;
-       struct ceph_inode_info *ci;
        struct dentry *parent, *dentry;
        struct ceph_dentry_info *di;
        int mds = session->s_mds;
        struct ceph_mds_lease *h = msg->front.iov_base;
        u32 seq;
        struct ceph_vino vino;
-       int mask;
        struct qstr dname;
        int release = 0;
 
@@ -2705,7 +2734,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
                goto bad;
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
-       mask = le16_to_cpu(h->mask);
        seq = le32_to_cpu(h->seq);
        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
@@ -2717,14 +2745,13 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 
        /* lookup inode */
        inode = ceph_find_inode(sb, vino);
-       dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
-            ceph_lease_op_name(h->action), mask, vino.ino, inode,
+       dout("handle_lease %s, ino %llx %p %.*s\n",
+            ceph_lease_op_name(h->action), vino.ino, inode,
             dname.len, dname.name);
        if (inode == NULL) {
                dout("handle_lease no inode %llx\n", vino.ino);
                goto release;
        }
-       ci = ceph_inode(inode);
 
        /* dentry */
        parent = d_find_alias(inode);
@@ -2809,7 +2836,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
                return;
        lease = msg->front.iov_base;
        lease->action = action;
-       lease->mask = cpu_to_le16(1);
        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
        lease->seq = cpu_to_le32(seq);
@@ -2831,7 +2857,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
  * Pass @inode always, @dentry is optional.
  */
 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
-                            struct dentry *dentry, int mask)
+                            struct dentry *dentry)
 {
        struct ceph_dentry_info *di;
        struct ceph_mds_session *session;
@@ -2839,7 +2865,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
 
        BUG_ON(inode == NULL);
        BUG_ON(dentry == NULL);
-       BUG_ON(mask == 0);
 
        /* is dentry lease valid? */
        spin_lock(&dentry->d_lock);
@@ -2849,8 +2874,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
            di->lease_gen != di->lease_session->s_cap_gen ||
            !time_before(jiffies, dentry->d_time)) {
                dout("lease_release inode %p dentry %p -- "
-                    "no lease on %d\n",
-                    inode, dentry, mask);
+                    "no lease\n",
+                    inode, dentry);
                spin_unlock(&dentry->d_lock);
                return;
        }
@@ -2861,8 +2886,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
        __ceph_mdsc_drop_dentry_lease(dentry);
        spin_unlock(&dentry->d_lock);
 
-       dout("lease_release inode %p dentry %p mask %d to mds%d\n",
-            inode, dentry, mask, session->s_mds);
+       dout("lease_release inode %p dentry %p to mds%d\n",
+            inode, dentry, session->s_mds);
        ceph_mdsc_lease_send_msg(session, inode, dentry,
                                 CEPH_MDS_LEASE_RELEASE, seq);
        ceph_put_mds_session(session);
@@ -2998,6 +3023,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        spin_lock_init(&mdsc->snap_flush_lock);
        mdsc->cap_flush_seq = 0;
        INIT_LIST_HEAD(&mdsc->cap_dirty);
+       INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
        mdsc->num_cap_flushing = 0;
        spin_lock_init(&mdsc->cap_dirty_lock);
        init_waitqueue_head(&mdsc->cap_flushing_wq);
@@ -3211,9 +3237,15 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
 {
        struct ceph_mds_client *mdsc = fsc->mdsc;
 
+       dout("mdsc_destroy %p\n", mdsc);
        ceph_mdsc_stop(mdsc);
+
+       /* flush out any connection work with references to us */
+       ceph_msgr_flush();
+
        fsc->mdsc = NULL;
        kfree(mdsc);
+       dout("mdsc_destroy %p done\n", mdsc);
 }
 
 
@@ -3294,8 +3326,8 @@ static void con_put(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
 
+       dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
        ceph_put_mds_session(s);
-       dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
 }
 
 /*