]> nv-tegra.nvidia Code Review - linux-2.6.git/blobdiff - fs/ceph/mds_client.c
ceph: use complete_all and wake_up_all
[linux-2.6.git] / fs / ceph / mds_client.c
index bec8a7aeb3006d211eb444930545e1fbe61007b3..dd440bd438a930a5dbd30cfa5e6ea63ae24f5947 100644 (file)
@@ -1,6 +1,7 @@
 #include "ceph_debug.h"
 
 #include <linux/wait.h>
+#include <linux/slab.h>
 #include <linux/sched.h>
 
 #include "mds_client.h"
@@ -39,7 +40,7 @@
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
 
-const static struct ceph_connection_operations mds_con_ops;
+static const struct ceph_connection_operations mds_con_ops;
 
 
 /*
@@ -328,6 +329,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        struct ceph_mds_session *s;
 
        s = kzalloc(sizeof(*s), GFP_NOFS);
+       if (!s)
+               return ERR_PTR(-ENOMEM);
        s->s_mdsc = mdsc;
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
@@ -529,7 +532,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
 {
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
        rb_erase(&req->r_node, &mdsc->request_tree);
-       ceph_mdsc_put_request(req);
+       RB_CLEAR_NODE(&req->r_node);
 
        if (req->r_unsafe_dir) {
                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
@@ -538,6 +541,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
                list_del_init(&req->r_unsafe_dir_item);
                spin_unlock(&ci->i_unsafe_lock);
        }
+
+       ceph_mdsc_put_request(req);
 }
 
 /*
@@ -660,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
        struct ceph_msg *msg;
        struct ceph_mds_session_head *h;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
-       if (IS_ERR(msg)) {
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
+       if (!msg) {
                pr_err("create_session_msg ENOMEM creating msg\n");
-               return ERR_PTR(PTR_ERR(msg));
+               return NULL;
        }
        h = msg->front.iov_base;
        h->op = cpu_to_le32(op);
@@ -682,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg;
        int mstate;
        int mds = session->s_mds;
-       int err = 0;
 
        /* wait for mds to go active? */
        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
@@ -693,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc,
 
        /* send connect message */
        msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
-       if (IS_ERR(msg)) {
-               err = PTR_ERR(msg);
-               goto out;
-       }
+       if (!msg)
+               return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
-
-out:
        return 0;
 }
 
@@ -731,9 +731,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
 }
 
 /*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
  *
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
  */
 static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
@@ -798,12 +799,49 @@ out:
 }
 
 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
-                                  void *arg)
+                                 void *arg)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
+       int drop = 0;
+
        dout("removing cap %p, ci is %p, inode is %p\n",
             cap, ci, &ci->vfs_inode);
-       ceph_remove_cap(cap);
+       spin_lock(&inode->i_lock);
+       __ceph_remove_cap(cap);
+       if (!__ceph_is_any_real_caps(ci)) {
+               struct ceph_mds_client *mdsc =
+                       &ceph_sb_to_client(inode->i_sb)->mdsc;
+
+               spin_lock(&mdsc->cap_dirty_lock);
+               if (!list_empty(&ci->i_dirty_item)) {
+                       pr_info(" dropping dirty %s state for %p %lld\n",
+                               ceph_cap_string(ci->i_dirty_caps),
+                               inode, ceph_ino(inode));
+                       ci->i_dirty_caps = 0;
+                       list_del_init(&ci->i_dirty_item);
+                       drop = 1;
+               }
+               if (!list_empty(&ci->i_flushing_item)) {
+                       pr_info(" dropping dirty+flushing %s state for %p %lld\n",
+                               ceph_cap_string(ci->i_flushing_caps),
+                               inode, ceph_ino(inode));
+                       ci->i_flushing_caps = 0;
+                       list_del_init(&ci->i_flushing_item);
+                       mdsc->num_cap_flushing--;
+                       drop = 1;
+               }
+               if (drop && ci->i_wrbuffer_ref) {
+                       pr_info(" dropping dirty data for %p %lld\n",
+                               inode, ceph_ino(inode));
+                       ci->i_wrbuffer_ref = 0;
+                       ci->i_wrbuffer_ref_head = 0;
+                       drop++;
+               }
+               spin_unlock(&mdsc->cap_dirty_lock);
+       }
+       spin_unlock(&inode->i_lock);
+       while (drop--)
+               iput(inode);
        return 0;
 }
 
@@ -815,6 +853,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
        dout("remove_session_caps on %p\n", session);
        iterate_session_caps(session, remove_session_caps_cb, NULL);
        BUG_ON(session->s_nr_caps > 0);
+       BUG_ON(!list_empty(&session->s_cap_flushing));
        cleanup_cap_releases(session);
 }
 
@@ -829,7 +868,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
 
-       wake_up(&ci->i_cap_wq);
+       wake_up_all(&ci->i_cap_wq);
        if (arg) {
                spin_lock(&inode->i_lock);
                ci->i_wanted_max_size = 0;
@@ -862,6 +901,7 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
        if (time_after_eq(jiffies, session->s_cap_ttl) &&
            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
                pr_info("mds%d caps stale\n", session->s_mds);
+       session->s_renew_requested = jiffies;
 
        /* do not try to renew caps until a recovering mds has reconnected
         * with its clients. */
@@ -874,11 +914,10 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
 
        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
                ceph_mds_state_name(state));
-       session->s_renew_requested = jiffies;
        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
                                 ++session->s_renew_seq);
-       if (IS_ERR(msg))
-               return PTR_ERR(msg);
+       if (!msg)
+               return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
        return 0;
 }
@@ -925,17 +964,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_session *session)
 {
        struct ceph_msg *msg;
-       int err = 0;
 
        dout("request_close_session mds%d state %s seq %lld\n",
             session->s_mds, session_state_name(session->s_state),
             session->s_seq);
        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
-       if (IS_ERR(msg))
-               err = PTR_ERR(msg);
-       else
-               ceph_con_send(&session->s_con, msg);
-       return err;
+       if (!msg)
+               return -ENOMEM;
+       ceph_con_send(&session->s_con, msg);
+       return 0;
 }
 
 /*
@@ -1029,9 +1066,9 @@ static int trim_caps(struct ceph_mds_client *mdsc,
  *
  * Called under s_mutex.
  */
-static int add_cap_releases(struct ceph_mds_client *mdsc,
-                           struct ceph_mds_session *session,
-                           int extra)
+int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
+                         struct ceph_mds_session *session,
+                         int extra)
 {
        struct ceph_msg *msg;
        struct ceph_mds_cap_release *head;
@@ -1053,7 +1090,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
        while (session->s_num_cap_releases < session->s_nr_caps + extra) {
                spin_unlock(&session->s_cap_lock);
                msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-                                  0, 0, NULL);
+                                  GFP_NOFS);
                if (!msg)
                        goto out_unlocked;
                dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1139,16 +1176,14 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 /*
  * called under s_mutex
  */
-static void send_cap_releases(struct ceph_mds_client *mdsc,
-                      struct ceph_mds_session *session)
+void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+                           struct ceph_mds_session *session)
 {
        struct ceph_msg *msg;
 
        dout("send_cap_releases mds%d\n", session->s_mds);
-       while (1) {
-               spin_lock(&session->s_cap_lock);
-               if (list_empty(&session->s_cap_releases_done))
-                       break;
+       spin_lock(&session->s_cap_lock);
+       while (!list_empty(&session->s_cap_releases_done)) {
                msg = list_first_entry(&session->s_cap_releases_done,
                                 struct ceph_msg, list_head);
                list_del_init(&msg->list_head);
@@ -1156,10 +1191,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
                ceph_con_send(&session->s_con, msg);
+               spin_lock(&session->s_cap_lock);
        }
        spin_unlock(&session->s_cap_lock);
 }
 
+static void discard_cap_releases(struct ceph_mds_client *mdsc,
+                                struct ceph_mds_session *session)
+{
+       struct ceph_msg *msg;
+       struct ceph_mds_cap_release *head;
+       unsigned num;
+
+       dout("discard_cap_releases mds%d\n", session->s_mds);
+       spin_lock(&session->s_cap_lock);
+
+       /* zero out the in-progress message */
+       msg = list_first_entry(&session->s_cap_releases,
+                              struct ceph_msg, list_head);
+       head = msg->front.iov_base;
+       num = le32_to_cpu(head->num);
+       dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
+       head->num = cpu_to_le32(0);
+       session->s_num_cap_releases += num;
+
+       /* requeue completed messages */
+       while (!list_empty(&session->s_cap_releases_done)) {
+               msg = list_first_entry(&session->s_cap_releases_done,
+                                struct ceph_msg, list_head);
+               list_del_init(&msg->list_head);
+
+               head = msg->front.iov_base;
+               num = le32_to_cpu(head->num);
+               dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
+                    num);
+               session->s_num_cap_releases += num;
+               head->num = cpu_to_le32(0);
+               msg->front.iov_len = sizeof(*head);
+               list_add(&msg->list_head, &session->s_cap_releases);
+       }
+
+       spin_unlock(&session->s_cap_lock);
+}
+
 /*
  * requests
  */
@@ -1175,6 +1249,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
        if (!req)
                return ERR_PTR(-ENOMEM);
 
+       mutex_init(&req->r_fill_mutex);
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1245,7 +1320,7 @@ retry:
                        len += 1 + temp->d_name.len;
                temp = temp->d_parent;
                if (temp == NULL) {
-                       pr_err("build_path_dentry corrupt dentry %p\n", dentry);
+                       pr_err("build_path corrupt dentry %p\n", dentry);
                        return ERR_PTR(-EINVAL);
                }
        }
@@ -1261,7 +1336,7 @@ retry:
                struct inode *inode = temp->d_inode;
 
                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
-                       dout("build_path_dentry path+%d: %p SNAPDIR\n",
+                       dout("build_path path+%d: %p SNAPDIR\n",
                             pos, temp);
                } else if (stop_on_nosnap && inode &&
                           ceph_snap(inode) == CEPH_NOSNAP) {
@@ -1272,20 +1347,18 @@ retry:
                                break;
                        strncpy(path + pos, temp->d_name.name,
                                temp->d_name.len);
-                       dout("build_path_dentry path+%d: %p '%.*s'\n",
-                            pos, temp, temp->d_name.len, path + pos);
                }
                if (pos)
                        path[--pos] = '/';
                temp = temp->d_parent;
                if (temp == NULL) {
-                       pr_err("build_path_dentry corrupt dentry\n");
+                       pr_err("build_path corrupt dentry\n");
                        kfree(path);
                        return ERR_PTR(-EINVAL);
                }
        }
        if (pos != 0) {
-               pr_err("build_path_dentry did not end path lookup where "
+               pr_err("build_path did not end path lookup where "
                       "expected, namelen is %d, pos is %d\n", len, pos);
                /* presumably this is only possible if racing with a
                   rename of one of the parent directories (we can not
@@ -1297,7 +1370,7 @@ retry:
 
        *base = ceph_ino(temp->d_inode);
        *plen = len;
-       dout("build_path_dentry on %p %d built %llx '%.*s'\n",
+       dout("build_path on %p %d built %llx '%.*s'\n",
             dentry, atomic_read(&dentry->d_count), *base, len, path);
        return path;
 }
@@ -1420,9 +1493,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        if (req->r_old_dentry_drop)
                len += req->r_old_dentry->d_name.len;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
-       if (IS_ERR(msg))
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
+       if (!msg) {
+               msg = ERR_PTR(-ENOMEM);
                goto out_free2;
+       }
 
        msg->hdr.tid = cpu_to_le64(req->r_tid);
 
@@ -1439,6 +1514,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        ceph_encode_filepath(&p, end, ino1, path1);
        ceph_encode_filepath(&p, end, ino2, path2);
 
+       /* make note of release offset, in case we need to replay */
+       req->r_request_release_offset = p - msg->front.iov_base;
+
        /* cap releases */
        releases = 0;
        if (req->r_inode_drop)
@@ -1486,7 +1564,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
        if (req->r_callback)
                req->r_callback(mdsc, req);
        else
-               complete(&req->r_completion);
+               complete_all(&req->r_completion);
 }
 
 /*
@@ -1505,15 +1583,41 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
+       if (req->r_got_unsafe) {
+               /*
+                * Replay.  Do not regenerate message (and rebuild
+                * paths, etc.); just use the original message.
+                * Rebuilding paths will break for renames because
+                * d_move mangles the src name.
+                */
+               msg = req->r_request;
+               rhead = msg->front.iov_base;
+
+               flags = le32_to_cpu(rhead->flags);
+               flags |= CEPH_MDS_FLAG_REPLAY;
+               rhead->flags = cpu_to_le32(flags);
+
+               if (req->r_target_inode)
+                       rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
+
+               rhead->num_retry = req->r_attempts - 1;
+
+               /* remove cap/dentry releases from message */
+               rhead->num_releases = 0;
+               msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
+               msg->front.iov_len = req->r_request_release_offset;
+               return 0;
+       }
+
        if (req->r_request) {
                ceph_msg_put(req->r_request);
                req->r_request = NULL;
        }
        msg = create_request_message(mdsc, req, mds);
        if (IS_ERR(msg)) {
-               req->r_reply = ERR_PTR(PTR_ERR(msg));
+               req->r_err = PTR_ERR(msg);
                complete_request(mdsc, req);
-               return -PTR_ERR(msg);
+               return PTR_ERR(msg);
        }
        req->r_request = msg;
 
@@ -1526,13 +1630,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        rhead->flags = cpu_to_le32(flags);
        rhead->num_fwd = req->r_num_fwd;
        rhead->num_retry = req->r_attempts - 1;
+       rhead->ino = 0;
 
        dout(" r_locked_dir = %p\n", req->r_locked_dir);
-
-       if (req->r_target_inode && req->r_got_unsafe)
-               rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
-       else
-               rhead->ino = 0;
        return 0;
 }
 
@@ -1546,7 +1646,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
        int mds = -1;
        int err = -EAGAIN;
 
-       if (req->r_reply)
+       if (req->r_err || req->r_got_result)
                goto out;
 
        if (req->r_timeout &&
@@ -1566,8 +1666,13 @@ static int __do_request(struct ceph_mds_client *mdsc,
 
        /* get, open session */
        session = __ceph_lookup_mds_session(mdsc, mds);
-       if (!session)
+       if (!session) {
                session = register_session(mdsc, mds);
+               if (IS_ERR(session)) {
+                       err = PTR_ERR(session);
+                       goto finish;
+               }
+       }
        dout("do_request mds%d session %p state %s\n", mds, session,
             session_state_name(session->s_state));
        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
@@ -1598,7 +1703,7 @@ out:
        return err;
 
 finish:
-       req->r_reply = ERR_PTR(err);
+       req->r_err = err;
        complete_request(mdsc, req);
        goto out;
 }
@@ -1619,10 +1724,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
 
 /*
  * Wake up threads with requests pending for @mds, so that they can
- * resubmit their requests to a possibly different mds.  If @all is set,
- * wake up if their requests has been forwarded to @mds, too.
+ * resubmit their requests to a possibly different mds.
  */
-static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+static void kick_requests(struct ceph_mds_client *mdsc, int mds)
 {
        struct ceph_mds_request *req;
        struct rb_node *p;
@@ -1678,63 +1782,77 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
        __register_request(mdsc, req, dir);
        __do_request(mdsc, req);
 
-       /* wait */
-       if (!req->r_reply) {
-               mutex_unlock(&mdsc->mutex);
-               if (req->r_timeout) {
-                       err = (long)wait_for_completion_interruptible_timeout(
-                               &req->r_completion, req->r_timeout);
-                       if (err == 0)
-                               req->r_reply = ERR_PTR(-EIO);
-                       else if (err < 0)
-                               req->r_reply = ERR_PTR(err);
-               } else {
-                        err = wait_for_completion_interruptible(
-                                &req->r_completion);
-                        if (err)
-                                req->r_reply = ERR_PTR(err);
-               }
-               mutex_lock(&mdsc->mutex);
+       if (req->r_err) {
+               err = req->r_err;
+               __unregister_request(mdsc, req);
+               dout("do_request early error %d\n", err);
+               goto out;
        }
 
-       if (IS_ERR(req->r_reply)) {
-               err = PTR_ERR(req->r_reply);
-               req->r_reply = NULL;
+       /* wait */
+       mutex_unlock(&mdsc->mutex);
+       dout("do_request waiting\n");
+       if (req->r_timeout) {
+               err = (long)wait_for_completion_killable_timeout(
+                       &req->r_completion, req->r_timeout);
+               if (err == 0)
+                       err = -EIO;
+       } else {
+               err = wait_for_completion_killable(&req->r_completion);
+       }
+       dout("do_request waited, got %d\n", err);
+       mutex_lock(&mdsc->mutex);
 
-               if (err == -ERESTARTSYS) {
-                       /* aborted */
-                       req->r_aborted = true;
+       /* only abort if we didn't race with a real reply */
+       if (req->r_got_result) {
+               err = le32_to_cpu(req->r_reply_info.head->result);
+       } else if (err < 0) {
+               dout("aborted request %lld with %d\n", req->r_tid, err);
 
-                       if (req->r_locked_dir &&
-                           (req->r_op & CEPH_MDS_OP_WRITE)) {
-                               struct ceph_inode_info *ci =
-                                       ceph_inode(req->r_locked_dir);
+               /*
+                * ensure we aren't running concurrently with
+                * ceph_fill_trace or ceph_readdir_prepopulate, which
+                * rely on locks (dir mutex) held by our caller.
+                */
+               mutex_lock(&req->r_fill_mutex);
+               req->r_err = err;
+               req->r_aborted = true;
+               mutex_unlock(&req->r_fill_mutex);
 
-                               dout("aborted, clearing I_COMPLETE on %p\n", 
-                                    req->r_locked_dir);
-                               spin_lock(&req->r_locked_dir->i_lock);
-                               ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                               ci->i_release_count++;
-                               spin_unlock(&req->r_locked_dir->i_lock);
-                       }
-               } else {
-                       /* clean up this request */
-                       __unregister_request(mdsc, req);
-                       if (!list_empty(&req->r_unsafe_item))
-                               list_del_init(&req->r_unsafe_item);
-                       complete(&req->r_safe_completion);
-               }
-       } else if (req->r_err) {
-               err = req->r_err;
+               if (req->r_locked_dir &&
+                   (req->r_op & CEPH_MDS_OP_WRITE))
+                       ceph_invalidate_dir_request(req);
        } else {
-               err = le32_to_cpu(req->r_reply_info.head->result);
+               err = req->r_err;
        }
-       mutex_unlock(&mdsc->mutex);
 
+out:
+       mutex_unlock(&mdsc->mutex);
        dout("do_request %p done, result %d\n", req, err);
        return err;
 }
 
+/*
+ * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
+ * namespace request.
+ */
+void ceph_invalidate_dir_request(struct ceph_mds_request *req)
+{
+       struct inode *inode = req->r_locked_dir;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
+       spin_lock(&inode->i_lock);
+       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+       ci->i_release_count++;
+       spin_unlock(&inode->i_lock);
+
+       if (req->r_dentry)
+               ceph_invalidate_dentry_lease(req->r_dentry);
+       if (req->r_old_dentry)
+               ceph_invalidate_dentry_lease(req->r_old_dentry);
+}
+
 /*
  * Handle mds reply.
  *
@@ -1770,7 +1888,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        dout("handle_reply %p\n", req);
 
        /* correct session? */
-       if (!req->r_session && req->r_session != session) {
+       if (req->r_session != session) {
                pr_err("mdsc_handle_reply got %llu on session mds%d"
                       " not mds%d\n", tid, session->s_mds,
                       req->r_session ? req->r_session->s_mds : -1);
@@ -1786,6 +1904,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                mutex_unlock(&mdsc->mutex);
                goto out;
        }
+       if (req->r_got_safe && !head->safe) {
+               pr_warning("got unsafe after safe on %llu from mds%d\n",
+                          tid, mds);
+               mutex_unlock(&mdsc->mutex);
+               goto out;
+       }
 
        result = le32_to_cpu(head->result);
 
@@ -1808,7 +1932,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (head->safe) {
                req->r_got_safe = true;
                __unregister_request(mdsc, req);
-               complete(&req->r_safe_completion);
+               complete_all(&req->r_safe_completion);
 
                if (req->r_got_unsafe) {
                        /*
@@ -1823,15 +1947,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
                        /* last unsafe request during umount? */
                        if (mdsc->stopping && !__get_oldest_req(mdsc))
-                               complete(&mdsc->safe_umount_waiters);
+                               complete_all(&mdsc->safe_umount_waiters);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
                }
-       }
-
-       BUG_ON(req->r_reply);
-
-       if (!head->safe) {
+       } else {
                req->r_got_unsafe = true;
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
        }
@@ -1860,23 +1980,32 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        }
 
        /* insert trace into our cache */
+       mutex_lock(&req->r_fill_mutex);
        err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
                ceph_unreserve_caps(&req->r_caps_reservation);
        }
+       mutex_unlock(&req->r_fill_mutex);
 
        up_read(&mdsc->snap_rwsem);
 out_err:
-       if (err) {
-               req->r_err = err;
+       mutex_lock(&mdsc->mutex);
+       if (!req->r_aborted) {
+               if (err) {
+                       req->r_err = err;
+               } else {
+                       req->r_reply = msg;
+                       ceph_msg_get(msg);
+                       req->r_got_result = true;
+               }
        } else {
-               req->r_reply = msg;
-               ceph_msg_get(msg);
+               dout("reply arrived after request %lld was aborted\n", tid);
        }
+       mutex_unlock(&mdsc->mutex);
 
-       add_cap_releases(mdsc, req->r_session, -1);
+       ceph_add_cap_releases(mdsc, req->r_session, -1);
        mutex_unlock(&session->s_mutex);
 
        /* kick calling process */
@@ -1899,7 +2028,6 @@ static void handle_forward(struct ceph_mds_client *mdsc,
        u64 tid = le64_to_cpu(msg->hdr.tid);
        u32 next_mds;
        u32 fwd_seq;
-       u8 must_resend;
        int err = -EINVAL;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
@@ -1907,23 +2035,25 @@ static void handle_forward(struct ceph_mds_client *mdsc,
        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
        next_mds = ceph_decode_32(&p);
        fwd_seq = ceph_decode_32(&p);
-       must_resend = ceph_decode_8(&p);
-
-       WARN_ON(must_resend);  /* shouldn't happen. */
 
        mutex_lock(&mdsc->mutex);
        req = __lookup_request(mdsc, tid);
        if (!req) {
-               dout("forward %llu dne\n", tid);
+               dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
                goto out;  /* dup reply? */
        }
 
-       if (fwd_seq <= req->r_num_fwd) {
-               dout("forward %llu to mds%d - old seq %d <= %d\n",
+       if (req->r_aborted) {
+               dout("forward tid %llu aborted, unregistering\n", tid);
+               __unregister_request(mdsc, req);
+       } else if (fwd_seq <= req->r_num_fwd) {
+               dout("forward tid %llu to mds%d - old seq %d <= %d\n",
                     tid, next_mds, req->r_num_fwd, fwd_seq);
        } else {
                /* resend. forward race not possible; mds would drop */
-               dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
+               dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
+               BUG_ON(req->r_err);
+               BUG_ON(req->r_got_result);
                req->r_num_fwd = fwd_seq;
                req->r_resend_mds = next_mds;
                put_request_session(req);
@@ -1977,6 +2107,8 @@ static void handle_session(struct ceph_mds_session *session,
 
        switch (op) {
        case CEPH_SESSION_OPEN:
+               if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
+                       pr_info("mds%d reconnect success\n", session->s_mds);
                session->s_state = CEPH_MDS_SESSION_OPEN;
                renewed_caps(mdsc, session, 0);
                wake = 1;
@@ -1990,10 +2122,12 @@ static void handle_session(struct ceph_mds_session *session,
                break;
 
        case CEPH_SESSION_CLOSE:
+               if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
+                       pr_info("mds%d reconnect denied\n", session->s_mds);
                remove_session_caps(session);
                wake = 1; /* for good measure */
-               complete(&mdsc->session_close_waiters);
-               kick_requests(mdsc, mds, 0);      /* cur only */
+               complete_all(&mdsc->session_close_waiters);
+               kick_requests(mdsc, mds);
                break;
 
        case CEPH_SESSION_STALE:
@@ -2125,61 +2259,51 @@ out:
  *
  * called with mdsc->mutex held.
  */
-static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
+static void send_mds_reconnect(struct ceph_mds_client *mdsc,
+                              struct ceph_mds_session *session)
 {
-       struct ceph_mds_session *session = NULL;
        struct ceph_msg *reply;
        struct rb_node *p;
-       int err;
+       int mds = session->s_mds;
+       int err = -ENOMEM;
        struct ceph_pagelist *pagelist;
 
-       pr_info("reconnect to recovering mds%d\n", mds);
+       pr_info("mds%d reconnect start\n", mds);
 
        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
        if (!pagelist)
                goto fail_nopagelist;
        ceph_pagelist_init(pagelist);
 
-       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
-       if (IS_ERR(reply)) {
-               err = PTR_ERR(reply);
+       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
+       if (!reply)
                goto fail_nomsg;
-       }
 
-       /* find session */
-       session = __ceph_lookup_mds_session(mdsc, mds);
-       mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
-
-       if (session) {
-               mutex_lock(&session->s_mutex);
-
-               session->s_state = CEPH_MDS_SESSION_RECONNECTING;
-               session->s_seq = 0;
+       mutex_lock(&session->s_mutex);
+       session->s_state = CEPH_MDS_SESSION_RECONNECTING;
+       session->s_seq = 0;
 
-               ceph_con_open(&session->s_con,
-                             ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+       ceph_con_open(&session->s_con,
+                     ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 
-               /* replay unsafe requests */
-               replay_unsafe_requests(mdsc, session);
-       } else {
-               dout("no session for mds%d, will send short reconnect\n",
-                    mds);
-       }
+       /* replay unsafe requests */
+       replay_unsafe_requests(mdsc, session);
 
        down_read(&mdsc->snap_rwsem);
 
-       if (!session)
-               goto send;
        dout("session %p state %s\n", session,
             session_state_name(session->s_state));
 
+       /* drop old cap expires; we're about to reestablish that state */
+       discard_cap_releases(mdsc, session);
+
        /* traverse this session's caps */
        err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
        if (err)
                goto fail;
        err = iterate_session_caps(session, encode_caps_cb, pagelist);
        if (err < 0)
-               goto out;
+               goto fail;
 
        /*
         * snaprealms.  we provide mds with the ino, seq (version), and
@@ -2201,34 +2325,30 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                        goto fail;
        }
 
-send:
        reply->pagelist = pagelist;
        reply->hdr.data_len = cpu_to_le32(pagelist->length);
        reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
 
-       if (session) {
-               session->s_state = CEPH_MDS_SESSION_OPEN;
-               __wake_requests(mdsc, &session->s_waiting);
-       }
+       mutex_unlock(&session->s_mutex);
 
-out:
-       up_read(&mdsc->snap_rwsem);
-       if (session) {
-               mutex_unlock(&session->s_mutex);
-               ceph_put_mds_session(session);
-       }
        mutex_lock(&mdsc->mutex);
+       __wake_requests(mdsc, &session->s_waiting);
+       mutex_unlock(&mdsc->mutex);
+
+       up_read(&mdsc->snap_rwsem);
        return;
 
 fail:
        ceph_msg_put(reply);
+       up_read(&mdsc->snap_rwsem);
+       mutex_unlock(&session->s_mutex);
 fail_nomsg:
        ceph_pagelist_release(pagelist);
        kfree(pagelist);
 fail_nopagelist:
-       pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
-       goto out;
+       pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+       return;
 }
 
 
@@ -2280,7 +2400,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                        }
 
                        /* kick any requests waiting on the recovering mds */
-                       kick_requests(mdsc, i, 1);
+                       kick_requests(mdsc, i);
                } else if (oldstate == newstate) {
                        continue;  /* nothing new with this mds */
                }
@@ -2289,22 +2409,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                 * send reconnect?
                 */
                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
-                   newstate >= CEPH_MDS_STATE_RECONNECT)
-                       send_mds_reconnect(mdsc, i);
+                   newstate >= CEPH_MDS_STATE_RECONNECT) {
+                       mutex_unlock(&mdsc->mutex);
+                       send_mds_reconnect(mdsc, s);
+                       mutex_lock(&mdsc->mutex);
+               }
 
                /*
-                * kick requests on any mds that has gone active.
-                *
-                * kick requests on cur or forwarder: we may have sent
-                * the request to mds1, mds1 told us it forwarded it
-                * to mds2, but then we learn mds1 failed and can't be
-                * sure it successfully forwarded our request before
-                * it died.
+                * kick request on any mds that has gone active.
                 */
                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
                    newstate >= CEPH_MDS_STATE_ACTIVE) {
-                       pr_info("mds%d reconnect completed\n", s->s_mds);
-                       kick_requests(mdsc, i, 1);
+                       if (oldstate != CEPH_MDS_STATE_CREATING &&
+                           oldstate != CEPH_MDS_STATE_STARTING)
+                               pr_info("mds%d recovery completed\n", s->s_mds);
+                       kick_requests(mdsc, i);
                        ceph_kick_flushing_caps(mdsc, s);
                        wake_up_session_caps(s, 1);
                }
@@ -2339,6 +2458,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
        struct ceph_dentry_info *di;
        int mds = session->s_mds;
        struct ceph_mds_lease *h = msg->front.iov_base;
+       u32 seq;
        struct ceph_vino vino;
        int mask;
        struct qstr dname;
@@ -2352,6 +2472,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
        mask = le16_to_cpu(h->mask);
+       seq = le32_to_cpu(h->seq);
        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
        if (dname.len != get_unaligned_le32(h+1))
@@ -2362,8 +2483,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 
        /* lookup inode */
        inode = ceph_find_inode(sb, vino);
-       dout("handle_lease '%s', mask %d, ino %llx %p\n",
-            ceph_lease_op_name(h->action), mask, vino.ino, inode);
+       dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
+            ceph_lease_op_name(h->action), mask, vino.ino, inode,
+            dname.len, dname.name);
        if (inode == NULL) {
                dout("handle_lease no inode %llx\n", vino.ino);
                goto release;
@@ -2388,7 +2510,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
        switch (h->action) {
        case CEPH_MDS_LEASE_REVOKE:
                if (di && di->lease_session == session) {
-                       h->seq = cpu_to_le32(di->lease_seq);
+                       if (ceph_seq_cmp(di->lease_seq, seq) > 0)
+                               h->seq = cpu_to_le32(di->lease_seq);
                        __ceph_mdsc_drop_dentry_lease(dentry);
                }
                release = 1;
@@ -2402,7 +2525,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
                        unsigned long duration =
                                le32_to_cpu(h->duration_ms) * HZ / 1000;
 
-                       di->lease_seq = le32_to_cpu(h->seq);
+                       di->lease_seq = seq;
                        dentry->d_time = di->lease_renew_from + duration;
                        di->lease_renew_after = di->lease_renew_from +
                                (duration >> 1);
@@ -2447,12 +2570,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
        dnamelen = dentry->d_name.len;
        len += dnamelen;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
-       if (IS_ERR(msg))
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
+       if (!msg)
                return;
        lease = msg->front.iov_base;
        lease->action = action;
-       lease->mask = cpu_to_le16(CEPH_LOCK_DN);
+       lease->mask = cpu_to_le16(1);
        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
        lease->seq = cpu_to_le32(seq);
@@ -2482,7 +2605,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
 
        BUG_ON(inode == NULL);
        BUG_ON(dentry == NULL);
-       BUG_ON(mask != CEPH_LOCK_DN);
+       BUG_ON(mask == 0);
 
        /* is dentry lease valid? */
        spin_lock(&dentry->d_lock);
@@ -2592,8 +2715,10 @@ static void delayed_work(struct work_struct *work)
                        send_renew_caps(mdsc, s);
                else
                        ceph_con_keepalive(&s->s_con);
-               add_cap_releases(mdsc, s, -1);
-               send_cap_releases(mdsc, s);
+               ceph_add_cap_releases(mdsc, s, -1);
+               if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+                   s->s_state == CEPH_MDS_SESSION_HUNG)
+                       ceph_send_cap_releases(mdsc, s);
                mutex_unlock(&s->s_mutex);
                ceph_put_mds_session(s);
 
@@ -2610,6 +2735,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        mdsc->client = client;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
+       if (mdsc->mdsmap == NULL)
+               return -ENOMEM;
+
        init_completion(&mdsc->safe_umount_waiters);
        init_completion(&mdsc->session_close_waiters);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -2635,6 +2763,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
+
        return 0;
 }
 
@@ -2679,6 +2808,12 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
        drop_leases(mdsc);
        ceph_flush_dirty_caps(mdsc);
        wait_requests(mdsc);
+
+       /*
+        * wait for reply handlers to drop their request refs and
+        * their inode/dcache refs
+        */
+       ceph_msgr_flush();
 }
 
 /*
@@ -2686,29 +2821,41 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
  */
 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
 {
-       struct ceph_mds_request *req = NULL;
+       struct ceph_mds_request *req = NULL, *nextreq;
        struct rb_node *n;
 
        mutex_lock(&mdsc->mutex);
        dout("wait_unsafe_requests want %lld\n", want_tid);
+restart:
        req = __get_oldest_req(mdsc);
        while (req && req->r_tid <= want_tid) {
+               /* find next request */
+               n = rb_next(&req->r_node);
+               if (n)
+                       nextreq = rb_entry(n, struct ceph_mds_request, r_node);
+               else
+                       nextreq = NULL;
                if ((req->r_op & CEPH_MDS_OP_WRITE)) {
                        /* write op */
                        ceph_mdsc_get_request(req);
+                       if (nextreq)
+                               ceph_mdsc_get_request(nextreq);
                        mutex_unlock(&mdsc->mutex);
                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
                             req->r_tid, want_tid);
                        wait_for_completion(&req->r_safe_completion);
                        mutex_lock(&mdsc->mutex);
-                       n = rb_next(&req->r_node);
                        ceph_mdsc_put_request(req);
-               } else {
-                       n = rb_next(&req->r_node);
+                       if (!nextreq)
+                               break;  /* next dne before, so we're done! */
+                       if (RB_EMPTY_NODE(&nextreq->r_node)) {
+                               /* next request was removed from tree */
+                               ceph_mdsc_put_request(nextreq);
+                               goto restart;
+                       }
+                       ceph_mdsc_put_request(nextreq);  /* won't go away */
                }
-               if (!n)
-                       break;
-               req = rb_entry(n, struct ceph_mds_request, r_node);
+               req = nextreq;
        }
        mutex_unlock(&mdsc->mutex);
        dout("wait_unsafe_requests done\n");
@@ -2718,6 +2865,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
        u64 want_tid, want_flush;
 
+       if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+               return;
+
        dout("sync\n");
        mutex_lock(&mdsc->mutex);
        want_tid = mdsc->last_tid;
@@ -2900,9 +3050,10 @@ static void con_put(struct ceph_connection *con)
 static void peer_reset(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
+       struct ceph_mds_client *mdsc = s->s_mdsc;
 
-       pr_err("mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
-              s->s_mds);
+       pr_warning("mds%d closed our session\n", s->s_mds);
+       send_mds_reconnect(mdsc, s);
 }
 
 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
@@ -3009,7 +3160,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
        return ceph_monc_validate_auth(&mdsc->client->monc);
 }
 
-const static struct ceph_connection_operations mds_con_ops = {
+static const struct ceph_connection_operations mds_con_ops = {
        .get = con_get,
        .put = con_put,
        .dispatch = dispatch,