ceph: add dir_layout to inode
[linux-2.6.git] / fs / ceph / snap.c
index 2e3cb40..39c243a 100644 (file)
@@ -1,10 +1,12 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
-#include <linux/radix-tree.h>
 #include <linux/sort.h>
+#include <linux/slab.h>
 
 #include "super.h"
-#include "decode.h"
+#include "mds_client.h"
+
+#include <linux/ceph/decode.h>
 
 /*
  * Snapshots in ceph are driven in large part by cooperation from the
@@ -77,6 +79,28 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
        atomic_inc(&realm->nref);
 }
 
+static void __insert_snap_realm(struct rb_root *root,
+                               struct ceph_snap_realm *new)
+{
+       struct rb_node **p = &root->rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_snap_realm *r = NULL;
+
+       while (*p) {
+               parent = *p;
+               r = rb_entry(parent, struct ceph_snap_realm, node);
+               if (new->ino < r->ino)
+                       p = &(*p)->rb_left;
+               else if (new->ino > r->ino)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, root);
+}
+
 /*
  * create and get the realm rooted at @ino and bump its ref count.
  *
@@ -92,33 +116,42 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
        if (!realm)
                return ERR_PTR(-ENOMEM);
 
-       radix_tree_insert(&mdsc->snap_realms, ino, realm);
-
        atomic_set(&realm->nref, 0);    /* tree does not take a ref */
        realm->ino = ino;
        INIT_LIST_HEAD(&realm->children);
        INIT_LIST_HEAD(&realm->child_item);
        INIT_LIST_HEAD(&realm->empty_item);
+       INIT_LIST_HEAD(&realm->dirty_item);
        INIT_LIST_HEAD(&realm->inodes_with_caps);
        spin_lock_init(&realm->inodes_with_caps_lock);
+       __insert_snap_realm(&mdsc->snap_realms, realm);
        dout("create_snap_realm %llx %p\n", realm->ino, realm);
        return realm;
 }
 
 /*
- * find and get (if found) the realm rooted at @ino and bump its ref count.
+ * lookup the realm rooted at @ino.
  *
  * caller must hold snap_rwsem for write.
  */
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
                                               u64 ino)
 {
-       struct ceph_snap_realm *realm;
-
-       realm = radix_tree_lookup(&mdsc->snap_realms, ino);
-       if (realm)
-               dout("lookup_snap_realm %llx %p\n", realm->ino, realm);
-       return realm;
+       struct rb_node *n = mdsc->snap_realms.rb_node;
+       struct ceph_snap_realm *r;
+
+       while (n) {
+               r = rb_entry(n, struct ceph_snap_realm, node);
+               if (ino < r->ino)
+                       n = n->rb_left;
+               else if (ino > r->ino)
+                       n = n->rb_right;
+               else {
+                       dout("lookup_snap_realm %llx %p\n", r->ino, r);
+                       return r;
+               }
+       }
+       return NULL;
 }
 
 static void __put_snap_realm(struct ceph_mds_client *mdsc,
@@ -132,7 +165,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 {
        dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
-       radix_tree_delete(&mdsc->snap_realms, realm->ino);
+       rb_erase(&realm->node, &mdsc->snap_realms);
 
        if (realm->parent) {
                list_del_init(&realm->child_item);
@@ -226,8 +259,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
                return 0;
 
        parent = ceph_lookup_snap_realm(mdsc, parentino);
-       if (IS_ERR(parent))
-               return PTR_ERR(parent);
        if (!parent) {
                parent = ceph_create_snap_realm(mdsc, parentino);
                if (IS_ERR(parent))
@@ -287,9 +318,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
           because we rebuild_snap_realms() works _downward_ in
           hierarchy after each update.) */
        if (realm->cached_context &&
-           realm->cached_context->seq <= realm->seq &&
+           realm->cached_context->seq == realm->seq &&
            (!parent ||
-            realm->cached_context->seq <= parent->cached_context->seq)) {
+            realm->cached_context->seq >= parent->cached_context->seq)) {
                dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
                     " (unchanged)\n",
                     realm->ino, realm, realm->cached_context,
@@ -403,12 +434,11 @@ static int dup_array(u64 **dst, __le64 *src, int num)
  * Caller must hold snap_rwsem for read (i.e., the realm topology won't
  * change).
  */
-void ceph_queue_cap_snap(struct ceph_inode_info *ci,
-                        struct ceph_snap_context *snapc)
+void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap_snap *capsnap;
-       int used;
+       int used, dirty;
 
        capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
        if (!capsnap) {
@@ -418,42 +448,55 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
 
        spin_lock(&inode->i_lock);
        used = __ceph_caps_used(ci);
+       dirty = __ceph_caps_dirty(ci);
        if (__ceph_have_pending_cap_snap(ci)) {
                /* there is no point in queuing multiple "pending" cap_snaps,
                   as no new writes are allowed to start when pending, so any
                   writes in progress now were started before the previous
                   cap_snap.  lucky us. */
-               dout("queue_cap_snap %p snapc %p seq %llu used %d"
-                    " already pending\n", inode, snapc, snapc->seq, used);
+               dout("queue_cap_snap %p already pending\n", inode);
                kfree(capsnap);
-       } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
-               igrab(inode);
+       } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) ||
+                  (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+                            CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
+               struct ceph_snap_context *snapc = ci->i_head_snapc;
 
+               dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode,
+                    capsnap, snapc);
+               igrab(inode);
+               
                atomic_set(&capsnap->nref, 1);
                capsnap->ci = ci;
                INIT_LIST_HEAD(&capsnap->ci_item);
                INIT_LIST_HEAD(&capsnap->flushing_item);
 
-               capsnap->follows = snapc->seq - 1;
-               capsnap->context = ceph_get_snap_context(snapc);
+               capsnap->follows = snapc->seq;
                capsnap->issued = __ceph_caps_issued(ci, NULL);
-               capsnap->dirty = __ceph_caps_dirty(ci);
+               capsnap->dirty = dirty;
 
                capsnap->mode = inode->i_mode;
                capsnap->uid = inode->i_uid;
                capsnap->gid = inode->i_gid;
 
-               /* fixme? */
-               capsnap->xattr_blob = NULL;
-               capsnap->xattr_len = 0;
+               if (dirty & CEPH_CAP_XATTR_EXCL) {
+                       __ceph_build_xattrs_blob(ci);
+                       capsnap->xattr_blob =
+                               ceph_buffer_get(ci->i_xattrs.blob);
+                       capsnap->xattr_version = ci->i_xattrs.version;
+               } else {
+                       capsnap->xattr_blob = NULL;
+                       capsnap->xattr_version = 0;
+               }
 
                /* dirty page count moved from _head to this cap_snap;
                   all subsequent writes page dirties occur _after_ this
                   snapshot. */
                capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
                ci->i_wrbuffer_ref_head = 0;
-               ceph_put_snap_context(ci->i_head_snapc);
-               ci->i_head_snapc = NULL;
+               capsnap->context = snapc;
+               ci->i_head_snapc =
+                       ceph_get_snap_context(ci->i_snap_realm->cached_context);
+               dout(" new snapc is %p\n", ci->i_head_snapc);
                list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
 
                if (used & CEPH_CAP_FILE_WR) {
@@ -485,7 +528,7 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
                            struct ceph_cap_snap *capsnap)
 {
        struct inode *inode = &ci->vfs_inode;
-       struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 
        BUG_ON(capsnap->writing);
        capsnap->size = inode->i_size;
@@ -494,15 +537,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        capsnap->ctime = inode->i_ctime;
        capsnap->time_warp_seq = ci->i_time_warp_seq;
        if (capsnap->dirty_pages) {
-               dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
+               dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
                     "still has %d dirty pages\n", inode, capsnap,
                     capsnap->context, capsnap->context->seq,
-                    capsnap->size, capsnap->dirty_pages);
+                    ceph_cap_string(capsnap->dirty), capsnap->size,
+                    capsnap->dirty_pages);
                return 0;
        }
-       dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
+       dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
             inode, capsnap, capsnap->context,
-            capsnap->context->seq, capsnap->size);
+            capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+            capsnap->size);
 
        spin_lock(&mdsc->snap_flush_lock);
        list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
@@ -510,6 +555,41 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        return 1;  /* caller may want to ceph_flush_snaps */
 }
 
+/*
+ * Queue cap_snaps for snap writeback for this realm and its children.
+ * Called under snap_rwsem, so realm topology won't change.
+ */
+static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
+{
+       struct ceph_inode_info *ci;
+       struct inode *lastinode = NULL;
+       struct ceph_snap_realm *child;
+
+       dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
+
+       spin_lock(&realm->inodes_with_caps_lock);
+       list_for_each_entry(ci, &realm->inodes_with_caps,
+                           i_snap_realm_item) {
+               struct inode *inode = igrab(&ci->vfs_inode);
+               if (!inode)
+                       continue;
+               spin_unlock(&realm->inodes_with_caps_lock);
+               if (lastinode)
+                       iput(lastinode);
+               lastinode = inode;
+               ceph_queue_cap_snap(ci);
+               spin_lock(&realm->inodes_with_caps_lock);
+       }
+       spin_unlock(&realm->inodes_with_caps_lock);
+       if (lastinode)
+               iput(lastinode);
+
+       dout("queue_realm_cap_snaps %p %llx children\n", realm, realm->ino);
+       list_for_each_entry(child, &realm->children, child_item)
+               queue_realm_cap_snaps(child);
+
+       dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
+}
 
 /*
  * Parse and apply a snapblob "snap trace" from the MDS.  This specifies
@@ -527,6 +607,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
        struct ceph_snap_realm *realm;
        int invalidate = 0;
        int err = -ENOMEM;
+       LIST_HEAD(dirty_realms);
 
        dout("update_snap_trace deletion=%d\n", deletion);
 more:
@@ -541,10 +622,6 @@ more:
        p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);
 
        realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
-       if (IS_ERR(realm)) {
-               err = PTR_ERR(realm);
-               goto fail;
-       }
        if (!realm) {
                realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
                if (IS_ERR(realm)) {
@@ -553,45 +630,6 @@ more:
                }
        }
 
-       if (le64_to_cpu(ri->seq) > realm->seq) {
-               dout("update_snap_trace updating %llx %p %lld -> %lld\n",
-                    realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
-               /*
-                * if the realm seq has changed, queue a cap_snap for every
-                * inode with open caps.  we do this _before_ we update
-                * the realm info so that we prepare for writeback under the
-                * _previous_ snap context.
-                *
-                * ...unless it's a snap deletion!
-                */
-               if (!deletion) {
-                       struct ceph_inode_info *ci;
-                       struct inode *lastinode = NULL;
-
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_for_each_entry(ci, &realm->inodes_with_caps,
-                                           i_snap_realm_item) {
-                               struct inode *inode = igrab(&ci->vfs_inode);
-                               if (!inode)
-                                       continue;
-                               spin_unlock(&realm->inodes_with_caps_lock);
-                               if (lastinode)
-                                       iput(lastinode);
-                               lastinode = inode;
-                               ceph_queue_cap_snap(ci, realm->cached_context);
-                               spin_lock(&realm->inodes_with_caps_lock);
-                       }
-                       spin_unlock(&realm->inodes_with_caps_lock);
-                       if (lastinode)
-                               iput(lastinode);
-                       dout("update_snap_trace cap_snaps queued\n");
-               }
-
-       } else {
-               dout("update_snap_trace %llx %p seq %lld unchanged\n",
-                    realm->ino, realm, realm->seq);
-       }
-
        /* ensure the parent is correct */
        err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
        if (err < 0)
@@ -599,6 +637,8 @@ more:
        invalidate += err;
 
        if (le64_to_cpu(ri->seq) > realm->seq) {
+               dout("update_snap_trace updating %llx %p %lld -> %lld\n",
+                    realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
                /* update realm parameters, snap lists */
                realm->seq = le64_to_cpu(ri->seq);
                realm->created = le64_to_cpu(ri->created);
@@ -616,9 +656,17 @@ more:
                if (err < 0)
                        goto fail;
 
+               /* queue realm for cap_snap creation */
+               list_add(&realm->dirty_item, &dirty_realms);
+
                invalidate = 1;
        } else if (!realm->cached_context) {
+               dout("update_snap_trace %llx %p seq %lld new\n",
+                    realm->ino, realm, realm->seq);
                invalidate = 1;
+       } else {
+               dout("update_snap_trace %llx %p seq %lld unchanged\n",
+                    realm->ino, realm, realm->seq);
        }
 
        dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
@@ -631,6 +679,14 @@ more:
        if (invalidate)
                rebuild_snap_realms(realm);
 
+       /*
+        * queue cap snaps _after_ we've built the new snap contexts,
+        * so that i_head_snapc can be set appropriately.
+        */
+       list_for_each_entry(realm, &dirty_realms, dirty_item) {
+               queue_realm_cap_snaps(realm);
+       }
+
        __cleanup_empty_realms(mdsc);
        return 0;
 
@@ -663,7 +719,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
                igrab(inode);
                spin_unlock(&mdsc->snap_flush_lock);
                spin_lock(&inode->i_lock);
-               __ceph_flush_snaps(ci, &session);
+               __ceph_flush_snaps(ci, &session, 0);
                spin_unlock(&inode->i_lock);
                iput(inode);
                spin_lock(&mdsc->snap_flush_lock);
@@ -690,11 +746,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
  * directory into another realm.
  */
 void ceph_handle_snap(struct ceph_mds_client *mdsc,
+                     struct ceph_mds_session *session,
                      struct ceph_msg *msg)
 {
-       struct super_block *sb = mdsc->client->sb;
-       struct ceph_mds_session *session;
-       int mds;
+       struct super_block *sb = mdsc->fsc->sb;
+       int mds = session->s_mds;
        u64 split;
        int op;
        int trace_len;
@@ -707,10 +763,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        int i;
        int locked_rwsem = 0;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               return;
-       mds = le64_to_cpu(msg->hdr.src.name.num);
-
        /* decode */
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
@@ -726,15 +778,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
             ceph_snap_op_name(op), split, trace_len);
 
-       /* find session */
-       mutex_lock(&mdsc->mutex);
-       session = __ceph_lookup_mds_session(mdsc, mds);
-       mutex_unlock(&mdsc->mutex);
-       if (!session) {
-               dout("WTF, got snap but no session for mds%d\n", mds);
-               return;
-       }
-
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        mutex_unlock(&session->s_mutex);
@@ -762,8 +805,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                ri = p;
 
                realm = ceph_lookup_snap_realm(mdsc, split);
-               if (IS_ERR(realm))
-                       goto out;
                if (!realm) {
                        realm = ceph_create_snap_realm(mdsc, split);
                        if (IS_ERR(realm))
@@ -779,6 +820,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        };
                        struct inode *inode = ceph_find_inode(sb, vino);
                        struct ceph_inode_info *ci;
+                       struct ceph_snap_realm *oldrealm;
 
                        if (!inode)
                                continue;
@@ -804,17 +846,19 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        dout(" will move %p to split realm %llx %p\n",
                             inode, realm->ino, realm);
                        /*
-                        * Remove the inode from the realm's inode
-                        * list, but don't add it to the new realm
-                        * yet.  We don't want the cap_snap to be
-                        * queued (again) by ceph_update_snap_trace()
-                        * below.  Queue it _now_, under the old context.
+                        * Move the inode to the new realm
                         */
+                       spin_lock(&realm->inodes_with_caps_lock);
                        list_del_init(&ci->i_snap_realm_item);
+                       list_add(&ci->i_snap_realm_item,
+                                &realm->inodes_with_caps);
+                       oldrealm = ci->i_snap_realm;
+                       ci->i_snap_realm = realm;
+                       spin_unlock(&realm->inodes_with_caps_lock);
                        spin_unlock(&inode->i_lock);
 
-                       ceph_queue_cap_snap(ci,
-                                           ci->i_snap_realm->cached_context);
+                       ceph_get_snap_realm(mdsc, realm);
+                       ceph_put_snap_realm(mdsc, oldrealm);
 
                        iput(inode);
                        continue;
@@ -829,8 +873,6 @@ skip_inode:
                        struct ceph_snap_realm *child =
                                ceph_lookup_snap_realm(mdsc,
                                           le64_to_cpu(split_realms[i]));
-                       if (IS_ERR(child))
-                               continue;
                        if (!child)
                                continue;
                        adjust_snap_realm_parent(mdsc, child, realm->ino);
@@ -844,39 +886,9 @@ skip_inode:
        ceph_update_snap_trace(mdsc, p, e,
                               op == CEPH_SNAP_OP_DESTROY);
 
-       if (op == CEPH_SNAP_OP_SPLIT) {
-               /*
-                * ok, _now_ add the inodes into the new realm.
-                */
-               for (i = 0; i < num_split_inos; i++) {
-                       struct ceph_vino vino = {
-                               .ino = le64_to_cpu(split_inos[i]),
-                               .snap = CEPH_NOSNAP,
-                       };
-                       struct inode *inode = ceph_find_inode(sb, vino);
-                       struct ceph_inode_info *ci;
-
-                       if (!inode)
-                               continue;
-                       ci = ceph_inode(inode);
-                       spin_lock(&inode->i_lock);
-                       if (!ci->i_snap_realm)
-                               goto split_skip_inode;
-                       ceph_put_snap_realm(mdsc, ci->i_snap_realm);
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_add(&ci->i_snap_realm_item,
-                                &realm->inodes_with_caps);
-                       ci->i_snap_realm = realm;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-                       ceph_get_snap_realm(mdsc, realm);
-split_skip_inode:
-                       spin_unlock(&inode->i_lock);
-                       iput(inode);
-               }
-
+       if (op == CEPH_SNAP_OP_SPLIT)
                /* we took a reference when we created the realm, above */
                ceph_put_snap_realm(mdsc, realm);
-       }
 
        __cleanup_empty_realms(mdsc);
 
@@ -887,6 +899,7 @@ split_skip_inode:
 
 bad:
        pr_err("corrupt snap message from mds%d\n", mds);
+       ceph_msg_dump(msg);
 out:
        if (locked_rwsem)
                up_write(&mdsc->snap_rwsem);