ceph: fix iterate_caps removal race
Sage Weil [Tue, 16 Feb 2010 19:39:45 +0000 (11:39 -0800)]
We need to be able to iterate over all caps on a session with a
possibly slow callback on each cap.  To allow this, we used to
prevent cap reordering while we were iterating.  However, we were
not safe from races with removal: removing the 'next' cap would
make the next pointer from list_for_each_entry_safe be invalid,
and cause a lock up or similar badness.

Instead, we keep an iterator pointer in the session pointing to
the current cap.  As before, we avoid reordering.  For removal,
if the cap isn't the current cap we are iterating over, we are
fine.  If it is, we clear cap->ci (to mark the cap as pending
removal) but leave it in the session list.  In iterate_caps, we
can safely finish removal and get the next cap pointer.

While we're at it, clean up put_cap to not take a cap reservation
context, as it was never used.

Signed-off-by: Sage Weil <sage@newdream.net>

fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index f94b56f..4958a2e 100644 (file)
@@ -266,12 +266,11 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        return cap;
 }
 
-static void put_cap(struct ceph_cap *cap,
-                   struct ceph_cap_reservation *ctx)
+void ceph_put_cap(struct ceph_cap *cap)
 {
        spin_lock(&caps_list_lock);
-       dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
+       dout("put_cap %p %d = %d used + %d resv + %d avail\n",
+            cap, caps_total_count, caps_use_count,
             caps_reserve_count, caps_avail_count);
        caps_use_count--;
        /*
@@ -282,12 +281,7 @@ static void put_cap(struct ceph_cap *cap,
                caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               if (ctx) {
-                       ctx->count++;
-                       caps_reserve_count++;
-               } else {
-                       caps_avail_count++;
-               }
+               caps_avail_count++;
                list_add(&cap->caps_item, &caps_list);
        }
 
@@ -709,7 +703,7 @@ static void __touch_cap(struct ceph_cap *cap)
        struct ceph_mds_session *s = cap->session;
 
        spin_lock(&s->s_cap_lock);
-       if (!s->s_iterating_caps) {
+       if (s->s_cap_iterator == NULL) {
                dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
                     s->s_mds);
                list_move_tail(&cap->session_caps, &s->s_caps);
@@ -865,8 +859,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
  * caller should hold i_lock, and session s_mutex.
  * returns true if this is the last cap.  if so, caller should iput.
  */
-void __ceph_remove_cap(struct ceph_cap *cap,
-                      struct ceph_cap_reservation *ctx)
+void __ceph_remove_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
@@ -874,19 +867,27 @@ void __ceph_remove_cap(struct ceph_cap *cap,
 
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
-       /* remove from session list */
-       spin_lock(&session->s_cap_lock);
-       list_del_init(&cap->session_caps);
-       session->s_nr_caps--;
-       spin_unlock(&session->s_cap_lock);
-
        /* remove from inode list */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       cap->session = NULL;
+       cap->ci = NULL;
        if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
 
-       put_cap(cap, ctx);
+       /* remove from session list */
+       spin_lock(&session->s_cap_lock);
+       if (session->s_cap_iterator == cap) {
+               /* not yet, we are iterating over this very cap */
+               dout("__ceph_remove_cap  delaying %p removal from session %p\n",
+                    cap, cap->session);
+       } else {
+               list_del_init(&cap->session_caps);
+               session->s_nr_caps--;
+               cap->session = NULL;
+       }
+       spin_unlock(&session->s_cap_lock);
+
+       if (cap->session == NULL)
+               ceph_put_cap(cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1022,7 +1023,7 @@ void ceph_queue_caps_release(struct inode *inode)
                }
                spin_unlock(&session->s_cap_lock);
                p = rb_next(p);
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
 
        }
        spin_unlock(&inode->i_lock);
@@ -2521,7 +2522,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
                }
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
        } else {
                WARN_ON(!cap);
        }
index 02834ce..124c0c1 100644 (file)
@@ -344,7 +344,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        s->s_num_cap_releases = 0;
-       s->s_iterating_caps = false;
+       s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
@@ -729,28 +729,61 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
                                            void *), void *arg)
 {
-       struct ceph_cap *cap, *ncap;
-       struct inode *inode;
+       struct list_head *p;
+       struct ceph_cap *cap;
+       struct inode *inode, *last_inode = NULL;
+       struct ceph_cap *old_cap = NULL;
        int ret;
 
        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
        spin_lock(&session->s_cap_lock);
-       session->s_iterating_caps = true;
-       list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
+       p = session->s_caps.next;
+       while (p != &session->s_caps) {
+               cap = list_entry(p, struct ceph_cap, session_caps);
                inode = igrab(&cap->ci->vfs_inode);
-               if (!inode)
+               if (!inode) {
+                       p = p->next;
                        continue;
+               }
+               session->s_cap_iterator = cap;
                spin_unlock(&session->s_cap_lock);
+
+               if (last_inode) {
+                       iput(last_inode);
+                       last_inode = NULL;
+               }
+               if (old_cap) {
+                       ceph_put_cap(old_cap);
+                       old_cap = NULL;
+               }
+
                ret = cb(inode, cap, arg);
-               iput(inode);
+               last_inode = inode;
+
                spin_lock(&session->s_cap_lock);
+               p = p->next;
+               if (cap->ci == NULL) {
+                       dout("iterate_session_caps  finishing cap %p removal\n",
+                            cap);
+                       BUG_ON(cap->session != session);
+                       list_del_init(&cap->session_caps);
+                       session->s_nr_caps--;
+                       cap->session = NULL;
+                       old_cap = cap;  /* put_cap it w/o locks held */
+               }
                if (ret < 0)
                        goto out;
        }
        ret = 0;
 out:
-       session->s_iterating_caps = false;
+       session->s_cap_iterator = NULL;
        spin_unlock(&session->s_cap_lock);
+
+       if (last_inode)
+               iput(last_inode);
+       if (old_cap)
+               ceph_put_cap(old_cap);
+
        return ret;
 }
 
@@ -942,7 +975,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
        session->s_trim_caps--;
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
        } else {
                /* try to drop referring dentries */
                spin_unlock(&inode->i_lock);
index 9d6b901..961cc6f 100644 (file)
@@ -114,7 +114,7 @@ struct ceph_mds_session {
        int               s_num_cap_releases;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
        struct list_head  s_cap_releases_done; /* ready to send */
-       bool              s_iterating_caps;
+       struct ceph_cap  *s_cap_iterator;
 
        /* protected by mutex */
        struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
index 3b5faf9..384f0e2 100644 (file)
@@ -795,15 +795,15 @@ extern int ceph_add_cap(struct inode *inode,
                        int fmode, unsigned issued, unsigned wanted,
                        unsigned cap, unsigned seq, u64 realmino, int flags,
                        struct ceph_cap_reservation *caps_reservation);
-extern void __ceph_remove_cap(struct ceph_cap *cap,
-                             struct ceph_cap_reservation *ctx);
+extern void __ceph_remove_cap(struct ceph_cap *cap);
 static inline void ceph_remove_cap(struct ceph_cap *cap)
 {
        struct inode *inode = &cap->ci->vfs_inode;
        spin_lock(&inode->i_lock);
-       __ceph_remove_cap(cap, NULL);
+       __ceph_remove_cap(cap);
        spin_unlock(&inode->i_lock);
 }
+extern void ceph_put_cap(struct ceph_cap *cap);
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, int unused);