Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
Linus Torvalds [Sun, 7 Oct 2012 21:38:18 +0000 (06:38 +0900)]
Pull ceph updates from Sage Weil:
 "The bulk of this pull is a series from Alex that refactors and cleans
  up the RBD code to lay the groundwork for supporting the new image
  format and evolving feature set.  There are also some cleanups in
  libceph, and for ceph there's fixed validation of file striping
  layouts and a bugfix in the code handling a shrinking MDS cluster."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits)
  ceph: avoid 32-bit page index overflow
  ceph: return EIO on invalid layout on GET_DATALOC ioctl
  rbd: BUG on invalid layout
  ceph: propagate layout error on osd request creation
  libceph: check for invalid mapping
  ceph: convert to use le32_add_cpu()
  ceph: Fix oops when handling mdsmap that decreases max_mds
  rbd: update remaining header fields for v2
  rbd: get snapshot name for a v2 image
  rbd: get the snapshot context for a v2 image
  rbd: get image features for a v2 image
  rbd: get the object prefix for a v2 rbd image
  rbd: add code to get the size of a v2 rbd image
  rbd: lay out header probe infrastructure
  rbd: encapsulate code that gets snapshot info
  rbd: add an rbd features field
  rbd: don't use index in __rbd_add_snap_dev()
  rbd: kill create_snap sysfs entry
  rbd: define rbd_dev_image_id()
  rbd: define some new format constants
  ...

16 files changed:
Documentation/ABI/testing/sysfs-bus-rbd
drivers/block/rbd.c
drivers/block/rbd_types.h
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/ioctl.c
fs/ceph/mds_client.c
fs/ceph/super.c
include/linux/ceph/mon_client.h
include/linux/ceph/osd_client.h
include/linux/ceph/osdmap.h
net/ceph/mon_client.c
net/ceph/osd_client.c
net/ceph/osdmap.c
net/ceph/pagelist.c

index 3c17b62..1cf2adf 100644 (file)
@@ -25,6 +25,10 @@ client_id
 
        The ceph unique client id that was assigned for this specific session.
 
+features
+
+       A hexadecimal encoding of the feature bits for this image.
+
 major
 
        The block device major number.
@@ -33,6 +37,11 @@ name
 
        The name of the rbd image.
 
+image_id
+
+       The unique id for the rbd image.  (For rbd image format 1
+       this is empty.)
+
 pool
 
        The name of the storage pool where this rbd image resides.
@@ -57,12 +66,6 @@ current_snap
 
        The current snapshot for which the device is mapped.
 
-create_snap
-
-       Create a snapshot:
-
-        $ echo <snap-name> > /sys/bus/rbd/devices/<dev-id>/snap_create
-
 snap_*
 
        A directory per each snapshot
@@ -79,4 +82,7 @@ snap_size
 
        The size of the image when this snapshot was taken.
 
+snap_features
+
+       A hexadecimal encoding of the feature bits for this snapshot.
 
index 54a55f0..bb3d9be 100644 (file)
@@ -41,6 +41,8 @@
 
 #include "rbd_types.h"
 
+#define RBD_DEBUG      /* Activate rbd_assert() calls */
+
 /*
  * The basic unit of block I/O is a sector.  It is interpreted in a
  * number of contexts in Linux (blk, bio, genhd), but the default is
 #define        SECTOR_SHIFT    9
 #define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
 
+/* It might be useful to have this defined elsewhere too */
+
+#define        U64_MAX ((u64) (~0ULL))
+
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
 #define RBD_MAX_SNAP_NAME_LEN  32
+#define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+#define RBD_IMAGE_ID_LEN_MAX   64
+#define RBD_OBJ_PREFIX_LEN_MAX 64
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
 #define DEV_NAME_LEN           32
 #define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
-#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+#define RBD_READ_ONLY_DEFAULT          false
 
 /*
  * block device image metadata (in-memory version)
  */
 struct rbd_image_header {
-       u64 image_size;
+       /* These four fields never change for a given rbd image */
        char *object_prefix;
+       u64 features;
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
-       struct ceph_snap_context *snapc;
-       size_t snap_names_len;
-       u32 total_snaps;
 
+       /* The remaining fields need to be updated occasionally */
+       u64 image_size;
+       struct ceph_snap_context *snapc;
        char *snap_names;
        u64 *snap_sizes;
 
@@ -91,7 +102,7 @@ struct rbd_image_header {
 };
 
 struct rbd_options {
-       int     notify_timeout;
+       bool    read_only;
 };
 
 /*
@@ -99,7 +110,6 @@ struct rbd_options {
  */
 struct rbd_client {
        struct ceph_client      *client;
-       struct rbd_options      *rbd_opts;
        struct kref             kref;
        struct list_head        node;
 };
@@ -141,6 +151,16 @@ struct rbd_snap {
        u64                     size;
        struct list_head        node;
        u64                     id;
+       u64                     features;
+};
+
+struct rbd_mapping {
+       char                    *snap_name;
+       u64                     snap_id;
+       u64                     size;
+       u64                     features;
+       bool                    snap_exists;
+       bool                    read_only;
 };
 
 /*
@@ -151,8 +171,9 @@ struct rbd_device {
 
        int                     major;          /* blkdev assigned major */
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
-       struct request_queue    *q;
 
+       u32                     image_format;   /* Either 1 or 2 */
+       struct rbd_options      rbd_opts;
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -160,6 +181,8 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
+       char                    *image_id;
+       size_t                  image_id_len;
        char                    *image_name;
        size_t                  image_name_len;
        char                    *header_name;
@@ -171,13 +194,8 @@ struct rbd_device {
 
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
-       /* name of the snapshot this device reads from */
-       char                    *snap_name;
-       /* id of the snapshot this device reads from */
-       u64                     snap_id;        /* current snapshot id */
-       /* whether the snap_id this device reads from still exists */
-       bool                    snap_exists;
-       int                     read_only;
+
+       struct rbd_mapping      mapping;
 
        struct list_head        node;
 
@@ -196,12 +214,10 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
+
 static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count);
 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
@@ -229,6 +245,18 @@ static struct device rbd_root_dev = {
        .release =      rbd_root_dev_release,
 };
 
+#ifdef RBD_DEBUG
+#define rbd_assert(expr)                                               \
+               if (unlikely(!(expr))) {                                \
+                       printk(KERN_ERR "\nAssertion failure in %s() "  \
+                                               "at line %d:\n\n"       \
+                                       "\trbd_assert(%s);\n\n",        \
+                                       __func__, __LINE__, #expr);     \
+                       BUG();                                          \
+               }
+#else /* !RBD_DEBUG */
+#  define rbd_assert(expr)     ((void) 0)
+#endif /* !RBD_DEBUG */
 
 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 {
@@ -246,11 +274,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 
-       if ((mode & FMODE_WRITE) && rbd_dev->read_only)
+       if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
                return -EROFS;
 
        rbd_get_dev(rbd_dev);
-       set_device_ro(bdev, rbd_dev->read_only);
+       set_device_ro(bdev, rbd_dev->mapping.read_only);
 
        return 0;
 }
@@ -274,8 +302,7 @@ static const struct block_device_operations rbd_bd_ops = {
  * Initialize an rbd client instance.
  * We own *ceph_opts.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
-                                           struct rbd_options *rbd_opts)
+static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 {
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
@@ -299,8 +326,6 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
        if (ret < 0)
                goto out_err;
 
-       rbdc->rbd_opts = rbd_opts;
-
        spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&rbd_client_list_lock);
@@ -322,36 +347,52 @@ out_opt:
 }
 
 /*
- * Find a ceph client with specific addr and configuration.
+ * Find a ceph client with specific addr and configuration.  If
+ * found, bump its reference count.
  */
-static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
+static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 {
        struct rbd_client *client_node;
+       bool found = false;
 
        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
                return NULL;
 
-       list_for_each_entry(client_node, &rbd_client_list, node)
-               if (!ceph_compare_options(ceph_opts, client_node->client))
-                       return client_node;
-       return NULL;
+       spin_lock(&rbd_client_list_lock);
+       list_for_each_entry(client_node, &rbd_client_list, node) {
+               if (!ceph_compare_options(ceph_opts, client_node->client)) {
+                       kref_get(&client_node->kref);
+                       found = true;
+                       break;
+               }
+       }
+       spin_unlock(&rbd_client_list_lock);
+
+       return found ? client_node : NULL;
 }
 
 /*
  * mount options
  */
 enum {
-       Opt_notify_timeout,
        Opt_last_int,
        /* int args above */
        Opt_last_string,
        /* string args above */
+       Opt_read_only,
+       Opt_read_write,
+       /* Boolean args above */
+       Opt_last_bool,
 };
 
 static match_table_t rbd_opts_tokens = {
-       {Opt_notify_timeout, "notify_timeout=%d"},
        /* int args above */
        /* string args above */
+       {Opt_read_only, "mapping.read_only"},
+       {Opt_read_only, "ro"},          /* Alternate spelling */
+       {Opt_read_write, "read_write"},
+       {Opt_read_write, "rw"},         /* Alternate spelling */
+       /* Boolean args above */
        {-1, NULL}
 };
 
@@ -376,16 +417,22 @@ static int parse_rbd_opts_token(char *c, void *private)
        } else if (token > Opt_last_int && token < Opt_last_string) {
                dout("got string token %d val %s\n", token,
                     argstr[0].from);
+       } else if (token > Opt_last_string && token < Opt_last_bool) {
+               dout("got Boolean token %d\n", token);
        } else {
                dout("got token %d\n", token);
        }
 
        switch (token) {
-       case Opt_notify_timeout:
-               rbd_opts->notify_timeout = intval;
+       case Opt_read_only:
+               rbd_opts->read_only = true;
+               break;
+       case Opt_read_write:
+               rbd_opts->read_only = false;
                break;
        default:
-               BUG_ON(token);
+               rbd_assert(false);
+               break;
        }
        return 0;
 }
@@ -394,48 +441,33 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static struct rbd_client *rbd_get_client(const char *mon_addr,
-                                        size_t mon_addr_len,
-                                        char *options)
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+                               size_t mon_addr_len, char *options)
 {
-       struct rbd_client *rbdc;
+       struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
        struct ceph_options *ceph_opts;
-       struct rbd_options *rbd_opts;
-
-       rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
-       if (!rbd_opts)
-               return ERR_PTR(-ENOMEM);
+       struct rbd_client *rbdc;
 
-       rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
+       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
 
        ceph_opts = ceph_parse_options(options, mon_addr,
                                        mon_addr + mon_addr_len,
                                        parse_rbd_opts_token, rbd_opts);
-       if (IS_ERR(ceph_opts)) {
-               kfree(rbd_opts);
-               return ERR_CAST(ceph_opts);
-       }
+       if (IS_ERR(ceph_opts))
+               return PTR_ERR(ceph_opts);
 
-       spin_lock(&rbd_client_list_lock);
-       rbdc = __rbd_client_find(ceph_opts);
+       rbdc = rbd_client_find(ceph_opts);
        if (rbdc) {
                /* using an existing client */
-               kref_get(&rbdc->kref);
-               spin_unlock(&rbd_client_list_lock);
-
                ceph_destroy_options(ceph_opts);
-               kfree(rbd_opts);
-
-               return rbdc;
+       } else {
+               rbdc = rbd_client_create(ceph_opts);
+               if (IS_ERR(rbdc))
+                       return PTR_ERR(rbdc);
        }
-       spin_unlock(&rbd_client_list_lock);
-
-       rbdc = rbd_client_create(ceph_opts, rbd_opts);
+       rbd_dev->rbd_client = rbdc;
 
-       if (IS_ERR(rbdc))
-               kfree(rbd_opts);
-
-       return rbdc;
+       return 0;
 }
 
 /*
@@ -453,7 +485,6 @@ static void rbd_client_release(struct kref *kref)
        spin_unlock(&rbd_client_list_lock);
 
        ceph_destroy_client(rbdc->client);
-       kfree(rbdc->rbd_opts);
        kfree(rbdc);
 }
 
@@ -479,10 +510,38 @@ static void rbd_coll_release(struct kref *kref)
        kfree(coll);
 }
 
+static bool rbd_image_format_valid(u32 image_format)
+{
+       return image_format == 1 || image_format == 2;
+}
+
 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 {
-       return !memcmp(&ondisk->text,
-                       RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
+       size_t size;
+       u32 snap_count;
+
+       /* The header has to start with the magic rbd header text */
+       if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
+               return false;
+
+       /*
+        * The size of a snapshot header has to fit in a size_t, and
+        * that limits the number of snapshots.
+        */
+       snap_count = le32_to_cpu(ondisk->snap_count);
+       size = SIZE_MAX - sizeof (struct ceph_snap_context);
+       if (snap_count > size / sizeof (__le64))
+               return false;
+
+       /*
+        * Not only that, but the size of the entire the snapshot
+        * header must also be representable in a size_t.
+        */
+       size -= snap_count * sizeof (__le64);
+       if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
+               return false;
+
+       return true;
 }
 
 /*
@@ -490,179 +549,203 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
  * header.
  */
 static int rbd_header_from_disk(struct rbd_image_header *header,
-                                struct rbd_image_header_ondisk *ondisk,
-                                u32 allocated_snaps)
+                                struct rbd_image_header_ondisk *ondisk)
 {
        u32 snap_count;
+       size_t len;
+       size_t size;
+       u32 i;
 
-       if (!rbd_dev_ondisk_valid(ondisk))
-               return -ENXIO;
+       memset(header, 0, sizeof (*header));
 
        snap_count = le32_to_cpu(ondisk->snap_count);
-       if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
-                                / sizeof (u64))
-               return -EINVAL;
-       header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
-                               snap_count * sizeof(u64),
-                               GFP_KERNEL);
-       if (!header->snapc)
+
+       len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
+       header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
+       if (!header->object_prefix)
                return -ENOMEM;
+       memcpy(header->object_prefix, ondisk->object_prefix, len);
+       header->object_prefix[len] = '\0';
 
        if (snap_count) {
-               header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
-               header->snap_names = kmalloc(header->snap_names_len,
-                                            GFP_KERNEL);
+               u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+
+               /* Save a copy of the snapshot names */
+
+               if (snap_names_len > (u64) SIZE_MAX)
+                       return -EIO;
+               header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
                if (!header->snap_names)
-                       goto err_snapc;
-               header->snap_sizes = kmalloc(snap_count * sizeof(u64),
-                                            GFP_KERNEL);
+                       goto out_err;
+               /*
+                * Note that rbd_dev_v1_header_read() guarantees
+                * the ondisk buffer we're working with has
+                * snap_names_len bytes beyond the end of the
+                * snapshot id array, this memcpy() is safe.
+                */
+               memcpy(header->snap_names, &ondisk->snaps[snap_count],
+                       snap_names_len);
+
+               /* Record each snapshot's size */
+
+               size = snap_count * sizeof (*header->snap_sizes);
+               header->snap_sizes = kmalloc(size, GFP_KERNEL);
                if (!header->snap_sizes)
-                       goto err_names;
+                       goto out_err;
+               for (i = 0; i < snap_count; i++)
+                       header->snap_sizes[i] =
+                               le64_to_cpu(ondisk->snaps[i].image_size);
        } else {
                WARN_ON(ondisk->snap_names_len);
-               header->snap_names_len = 0;
                header->snap_names = NULL;
                header->snap_sizes = NULL;
        }
 
-       header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
-                                       GFP_KERNEL);
-       if (!header->object_prefix)
-               goto err_sizes;
-
-       memcpy(header->object_prefix, ondisk->block_name,
-              sizeof(ondisk->block_name));
-       header->object_prefix[sizeof (ondisk->block_name)] = '\0';
-
-       header->image_size = le64_to_cpu(ondisk->image_size);
+       header->features = 0;   /* No features support in v1 images */
        header->obj_order = ondisk->options.order;
        header->crypt_type = ondisk->options.crypt_type;
        header->comp_type = ondisk->options.comp_type;
 
+       /* Allocate and fill in the snapshot context */
+
+       header->image_size = le64_to_cpu(ondisk->image_size);
+       size = sizeof (struct ceph_snap_context);
+       size += snap_count * sizeof (header->snapc->snaps[0]);
+       header->snapc = kzalloc(size, GFP_KERNEL);
+       if (!header->snapc)
+               goto out_err;
+
        atomic_set(&header->snapc->nref, 1);
        header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
        header->snapc->num_snaps = snap_count;
-       header->total_snaps = snap_count;
-
-       if (snap_count && allocated_snaps == snap_count) {
-               int i;
-
-               for (i = 0; i < snap_count; i++) {
-                       header->snapc->snaps[i] =
-                               le64_to_cpu(ondisk->snaps[i].id);
-                       header->snap_sizes[i] =
-                               le64_to_cpu(ondisk->snaps[i].image_size);
-               }
-
-               /* copy snapshot names */
-               memcpy(header->snap_names, &ondisk->snaps[snap_count],
-                       header->snap_names_len);
-       }
+       for (i = 0; i < snap_count; i++)
+               header->snapc->snaps[i] =
+                       le64_to_cpu(ondisk->snaps[i].id);
 
        return 0;
 
-err_sizes:
+out_err:
        kfree(header->snap_sizes);
        header->snap_sizes = NULL;
-err_names:
        kfree(header->snap_names);
        header->snap_names = NULL;
-err_snapc:
-       kfree(header->snapc);
-       header->snapc = NULL;
+       kfree(header->object_prefix);
+       header->object_prefix = NULL;
 
        return -ENOMEM;
 }
 
-static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
-                       u64 *seq, u64 *size)
+static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 {
-       int i;
-       char *p = header->snap_names;
 
-       for (i = 0; i < header->total_snaps; i++) {
-               if (!strcmp(snap_name, p)) {
+       struct rbd_snap *snap;
 
-                       /* Found it.  Pass back its id and/or size */
+       list_for_each_entry(snap, &rbd_dev->snaps, node) {
+               if (!strcmp(snap_name, snap->name)) {
+                       rbd_dev->mapping.snap_id = snap->id;
+                       rbd_dev->mapping.size = snap->size;
+                       rbd_dev->mapping.features = snap->features;
 
-                       if (seq)
-                               *seq = header->snapc->snaps[i];
-                       if (size)
-                               *size = header->snap_sizes[i];
-                       return i;
+                       return 0;
                }
-               p += strlen(p) + 1;     /* Skip ahead to the next name */
        }
+
        return -ENOENT;
 }
 
-static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
 {
        int ret;
 
-       down_write(&rbd_dev->header_rwsem);
-
-       if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+       if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->snap_id = CEPH_NOSNAP;
-               rbd_dev->snap_exists = false;
-               rbd_dev->read_only = 0;
-               if (size)
-                       *size = rbd_dev->header.image_size;
+               rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+               rbd_dev->mapping.size = rbd_dev->header.image_size;
+               rbd_dev->mapping.features = rbd_dev->header.features;
+               rbd_dev->mapping.snap_exists = false;
+               rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
+               ret = 0;
        } else {
-               u64 snap_id = 0;
-
-               ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
-                                       &snap_id, size);
+               ret = snap_by_name(rbd_dev, snap_name);
                if (ret < 0)
                        goto done;
-               rbd_dev->snap_id = snap_id;
-               rbd_dev->snap_exists = true;
-               rbd_dev->read_only = 1;
+               rbd_dev->mapping.snap_exists = true;
+               rbd_dev->mapping.read_only = true;
        }
-
-       ret = 0;
+       rbd_dev->mapping.snap_name = snap_name;
 done:
-       up_write(&rbd_dev->header_rwsem);
        return ret;
 }
 
 static void rbd_header_free(struct rbd_image_header *header)
 {
        kfree(header->object_prefix);
+       header->object_prefix = NULL;
        kfree(header->snap_sizes);
+       header->snap_sizes = NULL;
        kfree(header->snap_names);
+       header->snap_names = NULL;
        ceph_put_snap_context(header->snapc);
+       header->snapc = NULL;
 }
 
-/*
- * get the actual striped segment name, offset and length
- */
-static u64 rbd_get_segment(struct rbd_image_header *header,
-                          const char *object_prefix,
-                          u64 ofs, u64 len,
-                          char *seg_name, u64 *segofs)
+static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+{
+       char *name;
+       u64 segment;
+       int ret;
+
+       name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+       if (!name)
+               return NULL;
+       segment = offset >> rbd_dev->header.obj_order;
+       ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+                       rbd_dev->header.object_prefix, segment);
+       if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+               pr_err("error formatting segment name for #%llu (%d)\n",
+                       segment, ret);
+               kfree(name);
+               name = NULL;
+       }
+
+       return name;
+}
+
+static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
-       u64 seg = ofs >> header->obj_order;
+       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
 
-       if (seg_name)
-               snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
-                        "%s.%012llx", object_prefix, seg);
+       return offset & (segment_size - 1);
+}
+
+static u64 rbd_segment_length(struct rbd_device *rbd_dev,
+                               u64 offset, u64 length)
+{
+       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
 
-       ofs = ofs & ((1 << header->obj_order) - 1);
-       len = min_t(u64, len, (1 << header->obj_order) - ofs);
+       offset &= segment_size - 1;
 
-       if (segofs)
-               *segofs = ofs;
+       rbd_assert(length <= U64_MAX - offset);
+       if (offset + length > segment_size)
+               length = segment_size - offset;
 
-       return len;
+       return length;
 }
 
 static int rbd_get_num_segments(struct rbd_image_header *header,
                                u64 ofs, u64 len)
 {
-       u64 start_seg = ofs >> header->obj_order;
-       u64 end_seg = (ofs + len - 1) >> header->obj_order;
+       u64 start_seg;
+       u64 end_seg;
+
+       if (!len)
+               return 0;
+       if (len - 1 > U64_MAX - ofs)
+               return -ERANGE;
+
+       start_seg = ofs >> header->obj_order;
+       end_seg = (ofs + len - 1) >> header->obj_order;
+
        return end_seg - start_seg + 1;
 }
 
@@ -724,7 +807,9 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
                                   struct bio_pair **bp,
                                   int len, gfp_t gfpmask)
 {
-       struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+       struct bio *old_chain = *old;
+       struct bio *new_chain = NULL;
+       struct bio *tail;
        int total = 0;
 
        if (*bp) {
@@ -733,9 +818,12 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
        }
 
        while (old_chain && (total < len)) {
+               struct bio *tmp;
+
                tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
                if (!tmp)
                        goto err_out;
+               gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
 
                if (total + old_chain->bi_size > len) {
                        struct bio_pair *bp;
@@ -763,24 +851,18 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
                }
 
                tmp->bi_bdev = NULL;
-               gfpmask &= ~__GFP_WAIT;
                tmp->bi_next = NULL;
-
-               if (!new_chain) {
-                       new_chain = tail = tmp;
-               } else {
+               if (new_chain)
                        tail->bi_next = tmp;
-                       tail = tmp;
-               }
+               else
+                       new_chain = tmp;
+               tail = tmp;
                old_chain = old_chain->bi_next;
 
                total += tmp->bi_size;
        }
 
-       BUG_ON(total < len);
-
-       if (tail)
-               tail->bi_next = NULL;
+       rbd_assert(total == len);
 
        *old = old_chain;
 
@@ -938,8 +1020,9 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
-       ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-                               req, ops);
+       ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+                                  req, ops);
+       rbd_assert(ret == 0);
 
        ceph_osdc_build_request(req, ofs, &len,
                                ops,
@@ -1030,8 +1113,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
                           int flags,
                           struct ceph_osd_req_op *ops,
                           const char *object_name,
-                          u64 ofs, u64 len,
-                          char *buf,
+                          u64 ofs, u64 inbound_size,
+                          char *inbound,
                           struct ceph_osd_request **linger_req,
                           u64 *ver)
 {
@@ -1039,15 +1122,15 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
        struct page **pages;
        int num_pages;
 
-       BUG_ON(ops == NULL);
+       rbd_assert(ops != NULL);
 
-       num_pages = calc_pages_for(ofs , len);
+       num_pages = calc_pages_for(ofs, inbound_size);
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages))
                return PTR_ERR(pages);
 
        ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
-                         object_name, ofs, len, NULL,
+                         object_name, ofs, inbound_size, NULL,
                          pages, num_pages,
                          flags,
                          ops,
@@ -1057,8 +1140,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
        if (ret < 0)
                goto done;
 
-       if ((flags & CEPH_OSD_FLAG_READ) && buf)
-               ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+       if ((flags & CEPH_OSD_FLAG_READ) && inbound)
+               ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
 
 done:
        ceph_release_page_vector(pages, num_pages);
@@ -1085,14 +1168,11 @@ static int rbd_do_op(struct request *rq,
        struct ceph_osd_req_op *ops;
        u32 payload_len;
 
-       seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+       seg_name = rbd_segment_name(rbd_dev, ofs);
        if (!seg_name)
                return -ENOMEM;
-
-       seg_len = rbd_get_segment(&rbd_dev->header,
-                                 rbd_dev->header.object_prefix,
-                                 ofs, len,
-                                 seg_name, &seg_ofs);
+       seg_len = rbd_segment_length(rbd_dev, ofs, len);
+       seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
        payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
 
@@ -1104,7 +1184,7 @@ static int rbd_do_op(struct request *rq,
        /* we've taken care of segment sizes earlier when we
           cloned the bios. We should never have a segment
           truncated at this point */
-       BUG_ON(seg_len < len);
+       rbd_assert(seg_len == len);
 
        ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
                             seg_name, seg_ofs, seg_len,
@@ -1306,89 +1386,36 @@ static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
        return ret;
 }
 
-struct rbd_notify_info {
-       struct rbd_device *rbd_dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
-       struct rbd_device *rbd_dev = (struct rbd_device *)data;
-       if (!rbd_dev)
-               return;
-
-       dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
-                       rbd_dev->header_name, (unsigned long long) notify_id,
-                       (unsigned int) opcode);
-}
-
 /*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
-{
-       struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct ceph_osd_event *event;
-       struct rbd_notify_info info;
-       int payload_len = sizeof(u32) + sizeof(u32);
-       int ret;
-
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
-       if (!ops)
-               return -ENOMEM;
-
-       info.rbd_dev = rbd_dev;
-
-       ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
-                                    (void *)&info, &event);
-       if (ret < 0)
-               goto fail;
-
-       ops[0].watch.ver = 1;
-       ops[0].watch.flag = 1;
-       ops[0].watch.cookie = event->cookie;
-       ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
-       ops[0].watch.timeout = 12;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              CEPH_NOSNAP,
-                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                              ops,
-                              rbd_dev->header_name,
-                              0, 0, NULL, NULL, NULL);
-       if (ret < 0)
-               goto fail_event;
-
-       ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
-       dout("ceph_osdc_wait_event returned %d\n", ret);
-       rbd_destroy_ops(ops);
-       return 0;
-
-fail_event:
-       ceph_osdc_cancel_event(event);
-fail:
-       rbd_destroy_ops(ops);
-       return ret;
-}
-
-/*
- * Request sync osd read
+ * Synchronous osd object method call
  */
 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
                             const char *object_name,
                             const char *class_name,
                             const char *method_name,
-                            const char *data,
-                            int len,
+                            const char *outbound,
+                            size_t outbound_size,
+                            char *inbound,
+                            size_t inbound_size,
+                            int flags,
                             u64 *ver)
 {
        struct ceph_osd_req_op *ops;
        int class_name_len = strlen(class_name);
        int method_name_len = strlen(method_name);
+       int payload_size;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
-                                   class_name_len + method_name_len + len);
+       /*
+        * Any input parameters required by the method we're calling
+        * will be sent along with the class and method names as
+        * part of the message payload.  That data and its size are
+        * supplied via the indata and indata_len fields (named from
+        * the perspective of the server side) in the OSD request
+        * operation.
+        */
+       payload_size = class_name_len + method_name_len + outbound_size;
+       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
        if (!ops)
                return -ENOMEM;
 
@@ -1397,14 +1424,14 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
        ops[0].cls.method_name = method_name;
        ops[0].cls.method_len = (__u8) method_name_len;
        ops[0].cls.argc = 0;
-       ops[0].cls.indata = data;
-       ops[0].cls.indata_len = len;
+       ops[0].cls.indata = outbound;
+       ops[0].cls.indata_len = outbound_size;
 
        ret = rbd_req_sync_op(rbd_dev, NULL,
                               CEPH_NOSNAP,
-                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                              ops,
-                              object_name, 0, 0, NULL, NULL, ver);
+                              flags, ops,
+                              object_name, 0, inbound_size, inbound,
+                              NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1446,10 +1473,6 @@ static void rbd_rq_fn(struct request_queue *q)
                struct rbd_req_coll *coll;
                struct ceph_snap_context *snapc;
 
-               /* peek at request from block layer */
-               if (!rq)
-                       break;
-
                dout("fetched request\n");
 
                /* filter out block requests we don't understand */
@@ -1464,7 +1487,7 @@ static void rbd_rq_fn(struct request_queue *q)
                size = blk_rq_bytes(rq);
                ofs = blk_rq_pos(rq) * SECTOR_SIZE;
                rq_bio = rq->bio;
-               if (do_write && rbd_dev->read_only) {
+               if (do_write && rbd_dev->mapping.read_only) {
                        __blk_end_request_all(rq, -EROFS);
                        continue;
                }
@@ -1473,7 +1496,8 @@ static void rbd_rq_fn(struct request_queue *q)
 
                down_read(&rbd_dev->header_rwsem);
 
-               if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
+               if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
+                               !rbd_dev->mapping.snap_exists) {
                        up_read(&rbd_dev->header_rwsem);
                        dout("request for non-existent snapshot");
                        spin_lock_irq(q->queue_lock);
@@ -1490,6 +1514,12 @@ static void rbd_rq_fn(struct request_queue *q)
                     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
 
                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+               if (num_segs <= 0) {
+                       spin_lock_irq(q->queue_lock);
+                       __blk_end_request_all(rq, num_segs);
+                       ceph_put_snap_context(snapc);
+                       continue;
+               }
                coll = rbd_alloc_coll(num_segs);
                if (!coll) {
                        spin_lock_irq(q->queue_lock);
@@ -1501,10 +1531,7 @@ static void rbd_rq_fn(struct request_queue *q)
                do {
                        /* a bio clone to be passed down to OSD req */
                        dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-                       op_size = rbd_get_segment(&rbd_dev->header,
-                                                 rbd_dev->header.object_prefix,
-                                                 ofs, size,
-                                                 NULL, NULL);
+                       op_size = rbd_segment_length(rbd_dev, ofs, size);
                        kref_get(&coll->kref);
                        bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
                                              op_size, GFP_ATOMIC);
@@ -1524,7 +1551,7 @@ static void rbd_rq_fn(struct request_queue *q)
                                              coll, cur_seg);
                        else
                                rbd_req_read(rq, rbd_dev,
-                                            rbd_dev->snap_id,
+                                            rbd_dev->mapping.snap_id,
                                             ofs,
                                             op_size, bio,
                                             coll, cur_seg);
@@ -1580,8 +1607,6 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        if (!disk)
                return;
 
-       rbd_header_free(&rbd_dev->header);
-
        if (disk->flags & GENHD_FL_UP)
                del_gendisk(disk);
        if (disk->queue)
@@ -1590,105 +1615,96 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 }
 
 /*
- * reload the ondisk the header 
+ * Read the complete header for the given rbd device.
+ *
+ * Returns a pointer to a dynamically-allocated buffer containing
+ * the complete and validated header.  Caller can pass the address
+ * of a variable that will be filled in with the version of the
+ * header object at the time it was read.
+ *
+ * Returns a pointer-coded errno if a failure occurs.
  */
-static int rbd_read_header(struct rbd_device *rbd_dev,
-                          struct rbd_image_header *header)
+static struct rbd_image_header_ondisk *
+rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
 {
-       ssize_t rc;
-       struct rbd_image_header_ondisk *dh;
+       struct rbd_image_header_ondisk *ondisk = NULL;
        u32 snap_count = 0;
-       u64 ver;
-       size_t len;
+       u64 names_size = 0;
+       u32 want_count;
+       int ret;
 
        /*
-        * First reads the fixed-size header to determine the number
-        * of snapshots, then re-reads it, along with all snapshot
-        * records as well as their stored names.
+        * The complete header will include an array of its 64-bit
+        * snapshot ids, followed by the names of those snapshots as
+        * a contiguous block of NUL-terminated strings.  Note that
+        * the number of snapshots could change by the time we read
+        * it in, in which case we re-read it.
         */
-       len = sizeof (*dh);
-       while (1) {
-               dh = kmalloc(len, GFP_KERNEL);
-               if (!dh)
-                       return -ENOMEM;
-
-               rc = rbd_req_sync_read(rbd_dev,
-                                      CEPH_NOSNAP,
+       do {
+               size_t size;
+
+               kfree(ondisk);
+
+               size = sizeof (*ondisk);
+               size += snap_count * sizeof (struct rbd_image_snap_ondisk);
+               size += names_size;
+               ondisk = kmalloc(size, GFP_KERNEL);
+               if (!ondisk)
+                       return ERR_PTR(-ENOMEM);
+
+               ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
                                       rbd_dev->header_name,
-                                      0, len,
-                                      (char *)dh, &ver);
-               if (rc < 0)
-                       goto out_dh;
-
-               rc = rbd_header_from_disk(header, dh, snap_count);
-               if (rc < 0) {
-                       if (rc == -ENXIO)
-                               pr_warning("unrecognized header format"
-                                          " for image %s\n",
-                                          rbd_dev->image_name);
-                       goto out_dh;
+                                      0, size,
+                                      (char *) ondisk, version);
+
+               if (ret < 0)
+                       goto out_err;
+               if (WARN_ON((size_t) ret < size)) {
+                       ret = -ENXIO;
+                       pr_warning("short header read for image %s"
+                                       " (want %zd got %d)\n",
+                               rbd_dev->image_name, size, ret);
+                       goto out_err;
+               }
+               if (!rbd_dev_ondisk_valid(ondisk)) {
+                       ret = -ENXIO;
+                       pr_warning("invalid header for image %s\n",
+                               rbd_dev->image_name);
+                       goto out_err;
                }
 
-               if (snap_count == header->total_snaps)
-                       break;
+               names_size = le64_to_cpu(ondisk->snap_names_len);
+               want_count = snap_count;
+               snap_count = le32_to_cpu(ondisk->snap_count);
+       } while (snap_count != want_count);
 
-               snap_count = header->total_snaps;
-               len = sizeof (*dh) +
-                       snap_count * sizeof(struct rbd_image_snap_ondisk) +
-                       header->snap_names_len;
+       return ondisk;
 
-               rbd_header_free(header);
-               kfree(dh);
-       }
-       header->obj_version = ver;
+out_err:
+       kfree(ondisk);
 
-out_dh:
-       kfree(dh);
-       return rc;
+       return ERR_PTR(ret);
 }
 
 /*
- * create a snapshot
+ * reload the ondisk the header
  */
-static int rbd_header_add_snap(struct rbd_device *rbd_dev,
-                              const char *snap_name,
-                              gfp_t gfp_flags)
+static int rbd_read_header(struct rbd_device *rbd_dev,
+                          struct rbd_image_header *header)
 {
-       int name_len = strlen(snap_name);
-       u64 new_snapid;
+       struct rbd_image_header_ondisk *ondisk;
+       u64 ver = 0;
        int ret;
-       void *data, *p, *e;
-       struct ceph_mon_client *monc;
-
-       /* we should create a snapshot only if we're pointing at the head */
-       if (rbd_dev->snap_id != CEPH_NOSNAP)
-               return -EINVAL;
 
-       monc = &rbd_dev->rbd_client->client->monc;
-       ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
-       dout("created snapid=%llu\n", (unsigned long long) new_snapid);
-       if (ret < 0)
-               return ret;
-
-       data = kmalloc(name_len + 16, gfp_flags);
-       if (!data)
-               return -ENOMEM;
+       ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
+       if (IS_ERR(ondisk))
+               return PTR_ERR(ondisk);
+       ret = rbd_header_from_disk(header, ondisk);
+       if (ret >= 0)
+               header->obj_version = ver;
+       kfree(ondisk);
 
-       p = data;
-       e = data + name_len + 16;
-
-       ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
-       ceph_encode_64_safe(&p, e, new_snapid, bad);
-
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
-                               "rbd", "snap_add",
-                               data, p - data, NULL);
-
-       kfree(data);
-
-       return ret < 0 ? ret : 0;
-bad:
-       return -ERANGE;
+       return ret;
 }
 
 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
@@ -1715,11 +1731,15 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
        down_write(&rbd_dev->header_rwsem);
 
        /* resized? */
-       if (rbd_dev->snap_id == CEPH_NOSNAP) {
+       if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
                sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
 
-               dout("setting size to %llu sectors", (unsigned long long) size);
-               set_capacity(rbd_dev->disk, size);
+               if (size != (sector_t) rbd_dev->mapping.size) {
+                       dout("setting size to %llu sectors",
+                               (unsigned long long) size);
+                       rbd_dev->mapping.size = (u64) size;
+                       set_capacity(rbd_dev->disk, size);
+               }
        }
 
        /* rbd_dev->header.object_prefix shouldn't change */
@@ -1732,16 +1752,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
                *hver = h.obj_version;
        rbd_dev->header.obj_version = h.obj_version;
        rbd_dev->header.image_size = h.image_size;
-       rbd_dev->header.total_snaps = h.total_snaps;
        rbd_dev->header.snapc = h.snapc;
        rbd_dev->header.snap_names = h.snap_names;
-       rbd_dev->header.snap_names_len = h.snap_names_len;
        rbd_dev->header.snap_sizes = h.snap_sizes;
        /* Free the extra copy of the object prefix */
        WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
        kfree(h.object_prefix);
 
-       ret = __rbd_init_snaps_header(rbd_dev);
+       ret = rbd_dev_snaps_update(rbd_dev);
+       if (!ret)
+               ret = rbd_dev_snaps_register(rbd_dev);
 
        up_write(&rbd_dev->header_rwsem);
 
@@ -1763,29 +1783,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 {
        struct gendisk *disk;
        struct request_queue *q;
-       int rc;
        u64 segment_size;
-       u64 total_size = 0;
-
-       /* contact OSD, request size info about the object being mapped */
-       rc = rbd_read_header(rbd_dev, &rbd_dev->header);
-       if (rc)
-               return rc;
-
-       /* no need to lock here, as rbd_dev is not registered yet */
-       rc = __rbd_init_snaps_header(rbd_dev);
-       if (rc)
-               return rc;
-
-       rc = rbd_header_set_snap(rbd_dev, &total_size);
-       if (rc)
-               return rc;
 
        /* create gendisk info */
-       rc = -ENOMEM;
        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
        if (!disk)
-               goto out;
+               return -ENOMEM;
 
        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
                 rbd_dev->dev_id);
@@ -1795,7 +1798,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        disk->private_data = rbd_dev;
 
        /* init rq */
-       rc = -ENOMEM;
        q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
        if (!q)
                goto out_disk;
@@ -1816,20 +1818,14 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        q->queuedata = rbd_dev;
 
        rbd_dev->disk = disk;
-       rbd_dev->q = q;
 
-       /* finally, announce the disk to the world */
-       set_capacity(disk, total_size / SECTOR_SIZE);
-       add_disk(disk);
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 
-       pr_info("%s: added with size 0x%llx\n",
-               disk->disk_name, (unsigned long long)total_size);
        return 0;
-
 out_disk:
        put_disk(disk);
-out:
-       return rc;
+
+       return -ENOMEM;
 }
 
 /*
@@ -1854,6 +1850,19 @@ static ssize_t rbd_size_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
 }
 
+/*
+ * Note this shows the features for whatever's mapped, which is not
+ * necessarily the base image.
+ */
+static ssize_t rbd_features_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) rbd_dev->mapping.features);
+}
+
 static ssize_t rbd_major_show(struct device *dev,
                              struct device_attribute *attr, char *buf)
 {
@@ -1895,13 +1904,25 @@ static ssize_t rbd_name_show(struct device *dev,
        return sprintf(buf, "%s\n", rbd_dev->image_name);
 }
 
+static ssize_t rbd_image_id_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "%s\n", rbd_dev->image_id);
+}
+
+/*
+ * Shows the name of the currently-mapped snapshot (or
+ * RBD_SNAP_HEAD_NAME for the base image).
+ */
 static ssize_t rbd_snap_show(struct device *dev,
                             struct device_attribute *attr,
                             char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->snap_name);
+       return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -1918,25 +1939,27 @@ static ssize_t rbd_image_refresh(struct device *dev,
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
+static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
+static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
 
 static struct attribute *rbd_attrs[] = {
        &dev_attr_size.attr,
+       &dev_attr_features.attr,
        &dev_attr_major.attr,
        &dev_attr_client_id.attr,
        &dev_attr_pool.attr,
        &dev_attr_pool_id.attr,
        &dev_attr_name.attr,
+       &dev_attr_image_id.attr,
        &dev_attr_current_snap.attr,
        &dev_attr_refresh.attr,
-       &dev_attr_create_snap.attr,
        NULL
 };
 
@@ -1982,12 +2005,24 @@ static ssize_t rbd_snap_id_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
 }
 
+static ssize_t rbd_snap_features_show(struct device *dev,
+                               struct device_attribute *attr,
+                               char *buf)
+{
+       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) snap->features);
+}
+
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
+static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
 
 static struct attribute *rbd_snap_attrs[] = {
        &dev_attr_snap_size.attr,
        &dev_attr_snap_id.attr,
+       &dev_attr_snap_features.attr,
        NULL,
 };
 
@@ -2012,10 +2047,21 @@ static struct device_type rbd_snap_device_type = {
        .release        = rbd_snap_dev_release,
 };
 
+static bool rbd_snap_registered(struct rbd_snap *snap)
+{
+       bool ret = snap->dev.type == &rbd_snap_device_type;
+       bool reg = device_is_registered(&snap->dev);
+
+       rbd_assert(!ret ^ reg);
+
+       return ret;
+}
+
 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
 {
        list_del(&snap->node);
-       device_unregister(&snap->dev);
+       if (device_is_registered(&snap->dev))
+               device_unregister(&snap->dev);
 }
 
 static int rbd_register_snap_dev(struct rbd_snap *snap,
@@ -2028,13 +2074,17 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
        dev->parent = parent;
        dev->release = rbd_snap_dev_release;
        dev_set_name(dev, "snap_%s", snap->name);
+       dout("%s: registering device for snapshot %s\n", __func__, snap->name);
+
        ret = device_register(dev);
 
        return ret;
 }
 
 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
-                                             int i, const char *name)
+                                               const char *snap_name,
+                                               u64 snap_id, u64 snap_size,
+                                               u64 snap_features)
 {
        struct rbd_snap *snap;
        int ret;
@@ -2044,17 +2094,13 @@ static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
                return ERR_PTR(-ENOMEM);
 
        ret = -ENOMEM;
-       snap->name = kstrdup(name, GFP_KERNEL);
+       snap->name = kstrdup(snap_name, GFP_KERNEL);
        if (!snap->name)
                goto err;
 
-       snap->size = rbd_dev->header.snap_sizes[i];
-       snap->id = rbd_dev->header.snapc->snaps[i];
-       if (device_is_registered(&rbd_dev->dev)) {
-               ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-               if (ret < 0)
-                       goto err;
-       }
+       snap->id = snap_id;
+       snap->size = snap_size;
+       snap->features = snap_features;
 
        return snap;
 
@@ -2065,128 +2111,439 @@ err:
        return ERR_PTR(ret);
 }
 
+static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       char *snap_name;
+
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+       *snap_size = rbd_dev->header.snap_sizes[which];
+       *snap_features = 0;     /* No features for v1 */
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which--)
+               snap_name += strlen(snap_name) + 1;
+
+       return snap_name;
+}
+
 /*
- * search for the previous snap in a null delimited string list
+ * Get the size and object order for an image snapshot, or if
+ * snap_id is CEPH_NOSNAP, gets this information for the base
+ * image.
  */
-const char *rbd_prev_snap_name(const char *name, const char *start)
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u8 *order, u64 *snap_size)
 {
-       if (name < start + 2)
-               return NULL;
+       __le64 snapid = cpu_to_le64(snap_id);
+       int ret;
+       struct {
+               u8 order;
+               __le64 size;
+       } __attribute__ ((packed)) size_buf = { 0 };
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_size",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &size_buf, sizeof (size_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+
+       *order = size_buf.order;
+       *snap_size = le64_to_cpu(size_buf.size);
+
+       dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
+               (unsigned long long) snap_id, (unsigned int) *order,
+               (unsigned long long) *snap_size);
+
+       return 0;
+}
+
+static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+                                       &rbd_dev->header.obj_order,
+                                       &rbd_dev->header.image_size);
+}
+
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
+{
+       void *reply_buf;
+       int ret;
+       void *p;
+
+       reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_object_prefix",
+                               NULL, 0,
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = reply_buf;
+       rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+                                               p + RBD_OBJ_PREFIX_LEN_MAX,
+                                               NULL, GFP_NOIO);
+
+       if (IS_ERR(rbd_dev->header.object_prefix)) {
+               ret = PTR_ERR(rbd_dev->header.object_prefix);
+               rbd_dev->header.object_prefix = NULL;
+       } else {
+               dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
+       }
 
-       name -= 2;
-       while (*name) {
-               if (name == start)
-                       return start;
-               name--;
+out:
+       kfree(reply_buf);
+
+       return ret;
+}
+
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+               u64 *snap_features)
+{
+       __le64 snapid = cpu_to_le64(snap_id);
+       struct {
+               __le64 features;
+               __le64 incompat;
+       } features_buf = { 0 };
+       int ret;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_features",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &features_buf, sizeof (features_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+       *snap_features = le64_to_cpu(features_buf.features);
+
+       dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
+               (unsigned long long) snap_id,
+               (unsigned long long) *snap_features,
+               (unsigned long long) le64_to_cpu(features_buf.incompat));
+
+       return 0;
+}
+
+static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+                                               &rbd_dev->header.features);
+}
+
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+{
+       size_t size;
+       int ret;
+       void *reply_buf;
+       void *p;
+       void *end;
+       u64 seq;
+       u32 snap_count;
+       struct ceph_snap_context *snapc;
+       u32 i;
+
+       /*
+        * We'll need room for the seq value (maximum snapshot id),
+        * snapshot count, and array of that many snapshot ids.
+        * For now we have a fixed upper limit on the number we're
+        * prepared to receive.
+        */
+       size = sizeof (__le64) + sizeof (__le32) +
+                       RBD_MAX_SNAP_COUNT * sizeof (__le64);
+       reply_buf = kzalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapcontext",
+                               NULL, 0,
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, ver);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       ret = -ERANGE;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       ceph_decode_64_safe(&p, end, seq, out);
+       ceph_decode_32_safe(&p, end, snap_count, out);
+
+       /*
+        * Make sure the reported number of snapshot ids wouldn't go
+        * beyond the end of our buffer.  But before checking that,
+        * make sure the computed size of the snapshot context we
+        * allocate is representable in a size_t.
+        */
+       if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
+                                / sizeof (u64)) {
+               ret = -EINVAL;
+               goto out;
        }
-       return name + 1;
+       if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
+               goto out;
+
+       size = sizeof (struct ceph_snap_context) +
+                               snap_count * sizeof (snapc->snaps[0]);
+       snapc = kmalloc(size, GFP_KERNEL);
+       if (!snapc) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       atomic_set(&snapc->nref, 1);
+       snapc->seq = seq;
+       snapc->num_snaps = snap_count;
+       for (i = 0; i < snap_count; i++)
+               snapc->snaps[i] = ceph_decode_64(&p);
+
+       rbd_dev->header.snapc = snapc;
+
+       dout("  snap context seq = %llu, snap_count = %u\n",
+               (unsigned long long) seq, (unsigned int) snap_count);
+
+out:
+       kfree(reply_buf);
+
+       return 0;
 }
 
-/*
- * compare the old list of snapshots that we have to what's in the header
- * and update it accordingly. Note that the header holds the snapshots
- * in a reverse order (from newest to oldest) and we need to go from
- * older to new so that we don't get a duplicate snap name when
- * doing the process (e.g., removed snapshot and recreated a new
- * one with the same name.
- */
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
+static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
 {
-       const char *name, *first_name;
-       int i = rbd_dev->header.total_snaps;
-       struct rbd_snap *snap, *old_snap = NULL;
-       struct list_head *p, *n;
+       size_t size;
+       void *reply_buf;
+       __le64 snap_id;
+       int ret;
+       void *p;
+       void *end;
+       size_t snap_name_len;
+       char *snap_name;
+
+       size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
+       reply_buf = kmalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return ERR_PTR(-ENOMEM);
 
-       first_name = rbd_dev->header.snap_names;
-       name = first_name + rbd_dev->header.snap_names_len;
+       snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapshot_name",
+                               (char *) &snap_id, sizeof (snap_id),
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
 
-       list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
-               u64 cur_id;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       snap_name_len = 0;
+       snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
+                               GFP_KERNEL);
+       if (IS_ERR(snap_name)) {
+               ret = PTR_ERR(snap_name);
+               goto out;
+       } else {
+               dout("  snap_id 0x%016llx snap_name = %s\n",
+                       (unsigned long long) le64_to_cpu(snap_id), snap_name);
+       }
+       kfree(reply_buf);
 
-               old_snap = list_entry(p, struct rbd_snap, node);
+       return snap_name;
+out:
+       kfree(reply_buf);
 
-               if (i)
-                       cur_id = rbd_dev->header.snapc->snaps[i - 1];
+       return ERR_PTR(ret);
+}
 
-               if (!i || old_snap->id < cur_id) {
-                       /*
-                        * old_snap->id was skipped, thus was
-                        * removed.  If this rbd_dev is mapped to
-                        * the removed snapshot, record that it no
-                        * longer exists, to prevent further I/O.
-                        */
-                       if (rbd_dev->snap_id == old_snap->id)
-                               rbd_dev->snap_exists = false;
-                       __rbd_remove_snap_dev(old_snap);
-                       continue;
-               }
-               if (old_snap->id == cur_id) {
-                       /* we have this snapshot already */
-                       i--;
-                       name = rbd_prev_snap_name(name, first_name);
+static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       __le64 snap_id;
+       u8 order;
+       int ret;
+
+       snap_id = rbd_dev->header.snapc->snaps[which];
+       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+       if (ret)
+               return ERR_PTR(ret);
+       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+       if (ret)
+               return ERR_PTR(ret);
+
+       return rbd_dev_v2_snap_name(rbd_dev, which);
+}
+
+static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       if (rbd_dev->image_format == 2)
+               return rbd_dev_v2_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       return ERR_PTR(-EINVAL);
+}
+
+/*
+ * Scan the rbd device's current snapshot list and compare it to the
+ * newly-received snapshot context.  Remove any existing snapshots
+ * not present in the new snapshot context.  Add a new snapshot for
+ * any snaphots in the snapshot context not in the current list.
+ * And verify there are no changes to snapshots we already know
+ * about.
+ *
+ * Assumes the snapshots in the snapshot context are sorted by
+ * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
+ * are also maintained in that order.)
+ */
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       const u32 snap_count = snapc->num_snaps;
+       struct list_head *head = &rbd_dev->snaps;
+       struct list_head *links = head->next;
+       u32 index = 0;
+
+       dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
+       while (index < snap_count || links != head) {
+               u64 snap_id;
+               struct rbd_snap *snap;
+               char *snap_name;
+               u64 snap_size = 0;
+               u64 snap_features = 0;
+
+               snap_id = index < snap_count ? snapc->snaps[index]
+                                            : CEPH_NOSNAP;
+               snap = links != head ? list_entry(links, struct rbd_snap, node)
+                                    : NULL;
+               rbd_assert(!snap || snap->id != CEPH_NOSNAP);
+
+               if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
+                       struct list_head *next = links->next;
+
+                       /* Existing snapshot not in the new snap context */
+
+                       if (rbd_dev->mapping.snap_id == snap->id)
+                               rbd_dev->mapping.snap_exists = false;
+                       __rbd_remove_snap_dev(snap);
+                       dout("%ssnap id %llu has been removed\n",
+                               rbd_dev->mapping.snap_id == snap->id ?
+                                                               "mapped " : "",
+                               (unsigned long long) snap->id);
+
+                       /* Done with this list entry; advance */
+
+                       links = next;
                        continue;
                }
-               for (; i > 0;
-                    i--, name = rbd_prev_snap_name(name, first_name)) {
-                       if (!name) {
-                               WARN_ON(1);
-                               return -EINVAL;
+
+               snap_name = rbd_dev_snap_info(rbd_dev, index,
+                                       &snap_size, &snap_features);
+               if (IS_ERR(snap_name))
+                       return PTR_ERR(snap_name);
+
+               dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
+                       (unsigned long long) snap_id);
+               if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
+                       struct rbd_snap *new_snap;
+
+                       /* We haven't seen this snapshot before */
+
+                       new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+                                       snap_id, snap_size, snap_features);
+                       if (IS_ERR(new_snap)) {
+                               int err = PTR_ERR(new_snap);
+
+                               dout("  failed to add dev, error %d\n", err);
+
+                               return err;
                        }
-                       cur_id = rbd_dev->header.snapc->snaps[i];
-                       /* snapshot removal? handle it above */
-                       if (cur_id >= old_snap->id)
-                               break;
-                       /* a new snapshot */
-                       snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
-                       if (IS_ERR(snap))
-                               return PTR_ERR(snap);
-
-                       /* note that we add it backward so using n and not p */
-                       list_add(&snap->node, n);
-                       p = &snap->node;
+
+                       /* New goes before existing, or at end of list */
+
+                       dout("  added dev%s\n", snap ? "" : " at end\n");
+                       if (snap)
+                               list_add_tail(&new_snap->node, &snap->node);
+                       else
+                               list_add_tail(&new_snap->node, head);
+               } else {
+                       /* Already have this one */
+
+                       dout("  already present\n");
+
+                       rbd_assert(snap->size == snap_size);
+                       rbd_assert(!strcmp(snap->name, snap_name));
+                       rbd_assert(snap->features == snap_features);
+
+                       /* Done with this list entry; advance */
+
+                       links = links->next;
                }
+
+               /* Advance to the next entry in the snapshot context */
+
+               index++;
        }
-       /* we're done going over the old snap list, just add what's left */
-       for (; i > 0; i--) {
-               name = rbd_prev_snap_name(name, first_name);
-               if (!name) {
-                       WARN_ON(1);
-                       return -EINVAL;
+       dout("%s: done\n", __func__);
+
+       return 0;
+}
+
+/*
+ * Scan the list of snapshots and register the devices for any that
+ * have not already been registered.
+ */
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
+{
+       struct rbd_snap *snap;
+       int ret = 0;
+
+       dout("%s called\n", __func__);
+       if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
+               return -EIO;
+
+       list_for_each_entry(snap, &rbd_dev->snaps, node) {
+               if (!rbd_snap_registered(snap)) {
+                       ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
+                       if (ret < 0)
+                               break;
                }
-               snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
-               if (IS_ERR(snap))
-                       return PTR_ERR(snap);
-               list_add(&snap->node, &rbd_dev->snaps);
        }
+       dout("%s: returning %d\n", __func__, ret);
 
-       return 0;
+       return ret;
 }
 
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
-       int ret;
        struct device *dev;
-       struct rbd_snap *snap;
+       int ret;
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       dev = &rbd_dev->dev;
 
+       dev = &rbd_dev->dev;
        dev->bus = &rbd_bus_type;
        dev->type = &rbd_device_type;
        dev->parent = &rbd_root_dev;
        dev->release = rbd_dev_release;
        dev_set_name(dev, "%d", rbd_dev->dev_id);
        ret = device_register(dev);
-       if (ret < 0)
-               goto out;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-               if (ret < 0)
-                       break;
-       }
-out:
        mutex_unlock(&ctl_mutex);
+
        return ret;
 }
 
@@ -2211,33 +2568,37 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        return ret;
 }
 
-static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
+static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
 
 /*
  * Get a unique rbd identifier for the given new rbd_dev, and add
  * the rbd_dev to the global list.  The minimum rbd id is 1.
  */
-static void rbd_id_get(struct rbd_device *rbd_dev)
+static void rbd_dev_id_get(struct rbd_device *rbd_dev)
 {
-       rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
+       rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
 
        spin_lock(&rbd_dev_list_lock);
        list_add_tail(&rbd_dev->node, &rbd_dev_list);
        spin_unlock(&rbd_dev_list_lock);
+       dout("rbd_dev %p given dev id %llu\n", rbd_dev,
+               (unsigned long long) rbd_dev->dev_id);
 }
 
 /*
  * Remove an rbd_dev from the global list, and record that its
  * identifier is no longer in use.
  */
-static void rbd_id_put(struct rbd_device *rbd_dev)
+static void rbd_dev_id_put(struct rbd_device *rbd_dev)
 {
        struct list_head *tmp;
        int rbd_id = rbd_dev->dev_id;
        int max_id;
 
-       BUG_ON(rbd_id < 1);
+       rbd_assert(rbd_id > 0);
 
+       dout("rbd_dev %p released dev id %llu\n", rbd_dev,
+               (unsigned long long) rbd_dev->dev_id);
        spin_lock(&rbd_dev_list_lock);
        list_del_init(&rbd_dev->node);
 
@@ -2245,7 +2606,7 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
         * If the id being "put" is not the current maximum, there
         * is nothing special we need to do.
         */
-       if (rbd_id != atomic64_read(&rbd_id_max)) {
+       if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
                spin_unlock(&rbd_dev_list_lock);
                return;
        }
@@ -2266,12 +2627,13 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
        spin_unlock(&rbd_dev_list_lock);
 
        /*
-        * The max id could have been updated by rbd_id_get(), in
+        * The max id could have been updated by rbd_dev_id_get(), in
         * which case it now accurately reflects the new maximum.
         * Be careful not to overwrite the maximum value in that
         * case.
         */
-       atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
+       atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
+       dout("  max dev id has been reset\n");
 }
 
 /*
@@ -2360,28 +2722,31 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, snap_name,
- * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
- * on the list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.
+ * This fills in the pool_name, image_name, image_name_len, rbd_dev,
+ * rbd_md_name, and name fields of the given rbd_dev, based on the
+ * list of monitor addresses and other options provided via
+ * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
+ * copy of the snapshot name to map if successful, or a
+ * pointer-coded error otherwise.
  *
  * Note: rbd_dev is assumed to have been initially zero-filled.
  */
-static int rbd_add_parse_args(struct rbd_device *rbd_dev,
-                             const char *buf,
-                             const char **mon_addrs,
-                             size_t *mon_addrs_size,
-                             char *options,
-                            size_t options_size)
+static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
+                               const char *buf,
+                               const char **mon_addrs,
+                               size_t *mon_addrs_size,
+                               char *options,
+                               size_t options_size)
 {
        size_t len;
-       int ret;
+       char *err_ptr = ERR_PTR(-EINVAL);
+       char *snap_name;
 
        /* The first four tokens are required */
 
        len = next_token(&buf);
        if (!len)
-               return -EINVAL;
+               return err_ptr;
        *mon_addrs_size = len + 1;
        *mon_addrs = buf;
 
@@ -2389,9 +2754,9 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
 
        len = copy_token(&buf, options, options_size);
        if (!len || len >= options_size)
-               return -EINVAL;
+               return err_ptr;
 
-       ret = -ENOMEM;
+       err_ptr = ERR_PTR(-ENOMEM);
        rbd_dev->pool_name = dup_token(&buf, NULL);
        if (!rbd_dev->pool_name)
                goto out_err;
@@ -2400,41 +2765,227 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
        if (!rbd_dev->image_name)
                goto out_err;
 
-       /* Create the name of the header object */
+       /* Snapshot name is optional */
+       len = next_token(&buf);
+       if (!len) {
+               buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
+               len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
+       }
+       snap_name = kmalloc(len + 1, GFP_KERNEL);
+       if (!snap_name)
+               goto out_err;
+       memcpy(snap_name, buf, len);
+       *(snap_name + len) = '\0';
 
-       rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
-                                               + sizeof (RBD_SUFFIX),
-                                       GFP_KERNEL);
-       if (!rbd_dev->header_name)
+dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+
+       return snap_name;
+
+out_err:
+       kfree(rbd_dev->image_name);
+       rbd_dev->image_name = NULL;
+       rbd_dev->image_name_len = 0;
+       kfree(rbd_dev->pool_name);
+       rbd_dev->pool_name = NULL;
+
+       return err_ptr;
+}
+
+/*
+ * An rbd format 2 image has a unique identifier, distinct from the
+ * name given to it by the user.  Internally, that identifier is
+ * what's used to specify the names of objects related to the image.
+ *
+ * A special "rbd id" object is used to map an rbd image name to its
+ * id.  If that object doesn't exist, then there is no v2 rbd image
+ * with the supplied name.
+ *
+ * This function will record the given rbd_dev's image_id field if
+ * it can be determined, and in that case will return 0.  If any
+ * errors occur a negative errno will be returned and the rbd_dev's
+ * image_id field will be unchanged (and should be NULL).
+ */
+static int rbd_dev_image_id(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+       char *object_name;
+       void *response;
+       void *p;
+
+       /*
+        * First, see if the format 2 image id file exists, and if
+        * so, get the image's persistent id from it.
+        */
+       size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+       object_name = kmalloc(size, GFP_NOIO);
+       if (!object_name)
+               return -ENOMEM;
+       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+       dout("rbd id object name is %s\n", object_name);
+
+       /* Response will be an encoded string, which includes a length */
+
+       size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
+       response = kzalloc(size, GFP_NOIO);
+       if (!response) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       ret = rbd_req_sync_exec(rbd_dev, object_name,
+                               "rbd", "get_id",
+                               NULL, 0,
+                               response, RBD_IMAGE_ID_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = response;
+       rbd_dev->image_id = ceph_extract_encoded_string(&p,
+                                               p + RBD_IMAGE_ID_LEN_MAX,
+                                               &rbd_dev->image_id_len,
+                                               GFP_NOIO);
+       if (IS_ERR(rbd_dev->image_id)) {
+               ret = PTR_ERR(rbd_dev->image_id);
+               rbd_dev->image_id = NULL;
+       } else {
+               dout("image_id is %s\n", rbd_dev->image_id);
+       }
+out:
+       kfree(response);
+       kfree(object_name);
+
+       return ret;
+}
+
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+
+       /* Version 1 images have no id; empty string is used */
+
+       rbd_dev->image_id = kstrdup("", GFP_KERNEL);
+       if (!rbd_dev->image_id)
+               return -ENOMEM;
+       rbd_dev->image_id_len = 0;
+
+       /* Record the header object name for this rbd image. */
+
+       size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name) {
+               ret = -ENOMEM;
                goto out_err;
+       }
        sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
 
+       /* Populate rbd image metadata */
+
+       ret = rbd_read_header(rbd_dev, &rbd_dev->header);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->image_format = 1;
+
+       dout("discovered version 1 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return 0;
+
+out_err:
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->image_id);
+       rbd_dev->image_id = NULL;
+
+       return ret;
+}
+
+static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+{
+       size_t size;
+       int ret;
+       u64 ver = 0;
+
        /*
-        * The snapshot name is optional.  If none is is supplied,
-        * we use the default value.
+        * Image id was filled in by the caller.  Record the header
+        * object name for this rbd image.
         */
-       rbd_dev->snap_name = dup_token(&buf, &len);
-       if (!rbd_dev->snap_name)
+       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name)
+               return -ENOMEM;
+       sprintf(rbd_dev->header_name, "%s%s",
+                       RBD_HEADER_PREFIX, rbd_dev->image_id);
+
+       /* Get the size and object order for the image */
+
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret < 0)
                goto out_err;
-       if (!len) {
-               /* Replace the empty name with the default */
-               kfree(rbd_dev->snap_name);
-               rbd_dev->snap_name
-                       = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
-               if (!rbd_dev->snap_name)
-                       goto out_err;
 
-               memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
-                       sizeof (RBD_SNAP_HEAD_NAME));
-       }
+       /* Get the object prefix (a.k.a. block_name) for the image */
 
-       return 0;
+       ret = rbd_dev_v2_object_prefix(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* Get the features for the image */
 
+       ret = rbd_dev_v2_features(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* crypto and compression type aren't (yet) supported for v2 images */
+
+       rbd_dev->header.crypt_type = 0;
+       rbd_dev->header.comp_type = 0;
+
+       /* Get the snapshot context, plus the header version */
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+       if (ret)
+               goto out_err;
+       rbd_dev->header.obj_version = ver;
+
+       rbd_dev->image_format = 2;
+
+       dout("discovered version 2 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return -ENOTSUPP;
 out_err:
        kfree(rbd_dev->header_name);
-       kfree(rbd_dev->image_name);
-       kfree(rbd_dev->pool_name);
-       rbd_dev->pool_name = NULL;
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->header.object_prefix);
+       rbd_dev->header.object_prefix = NULL;
+
+       return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device.  For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       /*
+        * Get the id from the image id object.  If it's not a
+        * format 2 image, we'll get ENOENT back, and we'll assume
+        * it's a format 1 image.
+        */
+       ret = rbd_dev_image_id(rbd_dev);
+       if (ret)
+               ret = rbd_dev_v1_probe(rbd_dev);
+       else
+               ret = rbd_dev_v2_probe(rbd_dev);
+       if (ret)
+               dout("probe failed, returning %d\n", ret);
 
        return ret;
 }
@@ -2449,16 +3000,17 @@ static ssize_t rbd_add(struct bus_type *bus,
        size_t mon_addrs_size = 0;
        struct ceph_osd_client *osdc;
        int rc = -ENOMEM;
+       char *snap_name;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
 
        options = kmalloc(count, GFP_KERNEL);
        if (!options)
-               goto err_nomem;
+               goto err_out_mem;
        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
-               goto err_nomem;
+               goto err_out_mem;
 
        /* static rbd_device initialization */
        spin_lock_init(&rbd_dev->lock);
@@ -2466,27 +3018,18 @@ static ssize_t rbd_add(struct bus_type *bus,
        INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
 
-       /* generate unique id: find highest unique id, add one */
-       rbd_id_get(rbd_dev);
-
-       /* Fill in the device name, now that we have its id. */
-       BUILD_BUG_ON(DEV_NAME_LEN
-                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
        /* parse add command */
-       rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
-                               options, count);
-       if (rc)
-               goto err_put_id;
-
-       rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
-                                               options);
-       if (IS_ERR(rbd_dev->rbd_client)) {
-               rc = PTR_ERR(rbd_dev->rbd_client);
-               goto err_put_id;
+       snap_name = rbd_add_parse_args(rbd_dev, buf,
+                               &mon_addrs, &mon_addrs_size, options, count);
+       if (IS_ERR(snap_name)) {
+               rc = PTR_ERR(snap_name);
+               goto err_out_mem;
        }
 
+       rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+       if (rc < 0)
+               goto err_out_args;
+
        /* pick the pool */
        osdc = &rbd_dev->rbd_client->client->osdc;
        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
@@ -2494,23 +3037,53 @@ static ssize_t rbd_add(struct bus_type *bus,
                goto err_out_client;
        rbd_dev->pool_id = rc;
 
-       /* register our block device */
-       rc = register_blkdev(0, rbd_dev->name);
+       rc = rbd_dev_probe(rbd_dev);
        if (rc < 0)
                goto err_out_client;
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+       /* no need to lock here, as rbd_dev is not registered yet */
+       rc = rbd_dev_snaps_update(rbd_dev);
+       if (rc)
+               goto err_out_header;
+
+       rc = rbd_dev_set_mapping(rbd_dev, snap_name);
+       if (rc)
+               goto err_out_header;
+
+       /* generate unique id: find highest unique id, add one */
+       rbd_dev_id_get(rbd_dev);
+
+       /* Fill in the device name, now that we have its id. */
+       BUILD_BUG_ON(DEV_NAME_LEN
+                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
+
+       /* Get our block major device number. */
+
+       rc = register_blkdev(0, rbd_dev->name);
+       if (rc < 0)
+               goto err_out_id;
        rbd_dev->major = rc;
 
-       rc = rbd_bus_add_dev(rbd_dev);
+       /* Set up the blkdev mapping. */
+
+       rc = rbd_init_disk(rbd_dev);
        if (rc)
                goto err_out_blkdev;
 
+       rc = rbd_bus_add_dev(rbd_dev);
+       if (rc)
+               goto err_out_disk;
+
        /*
         * At this point cleanup in the event of an error is the job
         * of the sysfs code (initiated by rbd_bus_del_dev()).
-        *
-        * Set up and announce blkdev mapping.
         */
-       rc = rbd_init_disk(rbd_dev);
+
+       down_write(&rbd_dev->header_rwsem);
+       rc = rbd_dev_snaps_register(rbd_dev);
+       up_write(&rbd_dev->header_rwsem);
        if (rc)
                goto err_out_bus;
 
@@ -2518,6 +3091,13 @@ static ssize_t rbd_add(struct bus_type *bus,
        if (rc)
                goto err_out_bus;
 
+       /* Everything's ready.  Announce the disk to the world. */
+
+       add_disk(rbd_dev->disk);
+
+       pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
+               (unsigned long long) rbd_dev->mapping.size);
+
        return count;
 
 err_out_bus:
@@ -2527,19 +3107,23 @@ err_out_bus:
        kfree(options);
        return rc;
 
+err_out_disk:
+       rbd_free_disk(rbd_dev);
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_id:
+       rbd_dev_id_put(rbd_dev);
+err_out_header:
+       rbd_header_free(&rbd_dev->header);
 err_out_client:
+       kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev);
-err_put_id:
-       if (rbd_dev->pool_name) {
-               kfree(rbd_dev->snap_name);
-               kfree(rbd_dev->header_name);
-               kfree(rbd_dev->image_name);
-               kfree(rbd_dev->pool_name);
-       }
-       rbd_id_put(rbd_dev);
-err_nomem:
+       kfree(rbd_dev->image_id);
+err_out_args:
+       kfree(rbd_dev->mapping.snap_name);
+       kfree(rbd_dev->image_name);
+       kfree(rbd_dev->pool_name);
+err_out_mem:
        kfree(rbd_dev);
        kfree(options);
 
@@ -2585,12 +3169,16 @@ static void rbd_dev_release(struct device *dev)
        rbd_free_disk(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 
+       /* release allocated disk header fields */
+       rbd_header_free(&rbd_dev->header);
+
        /* done with the id, and with the rbd_dev */
-       kfree(rbd_dev->snap_name);
+       kfree(rbd_dev->mapping.snap_name);
+       kfree(rbd_dev->image_id);
        kfree(rbd_dev->header_name);
        kfree(rbd_dev->pool_name);
        kfree(rbd_dev->image_name);
-       rbd_id_put(rbd_dev);
+       rbd_dev_id_put(rbd_dev);
        kfree(rbd_dev);
 
        /* release module ref */
@@ -2628,47 +3216,7 @@ static ssize_t rbd_remove(struct bus_type *bus,
 
 done:
        mutex_unlock(&ctl_mutex);
-       return ret;
-}
 
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count)
-{
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       int ret;
-       char *name = kmalloc(count + 1, GFP_KERNEL);
-       if (!name)
-               return -ENOMEM;
-
-       snprintf(name, count, "%s", buf);
-
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       ret = rbd_header_add_snap(rbd_dev,
-                                 name, GFP_KERNEL);
-       if (ret < 0)
-               goto err_unlock;
-
-       ret = __rbd_refresh_header(rbd_dev, NULL);
-       if (ret < 0)
-               goto err_unlock;
-
-       /* shouldn't hold ctl_mutex when notifying.. notify might
-          trigger a watch callback that would need to get that mutex */
-       mutex_unlock(&ctl_mutex);
-
-       /* make a best effort, don't error if failed */
-       rbd_req_sync_notify(rbd_dev);
-
-       ret = count;
-       kfree(name);
-       return ret;
-
-err_unlock:
-       mutex_unlock(&ctl_mutex);
-       kfree(name);
        return ret;
 }
 
index 0924e9e..cbe77fa 100644 (file)
 
 #include <linux/types.h>
 
+/* For format version 2, rbd image 'foo' consists of objects
+ *   rbd_id.foo                - id of image
+ *   rbd_header.<id>   - image metadata
+ *   rbd_data.<id>.0000000000000000
+ *   rbd_data.<id>.0000000000000001
+ *   ...               - data
+ * Clients do not access header data directly in rbd format 2.
+ */
+
+#define RBD_HEADER_PREFIX      "rbd_header."
+#define RBD_DATA_PREFIX        "rbd_data."
+#define RBD_ID_PREFIX          "rbd_id."
+
 /*
- * rbd image 'foo' consists of objects
- *   foo.rbd      - image metadata
- *   foo.00000000
- *   foo.00000001
- *   ...          - data
+ * For format version 1, rbd image 'foo' consists of objects
+ *   foo.rbd           - image metadata
+ *   rb.<idhi>.<idlo>.00000000
+ *   rb.<idhi>.<idlo>.00000001
+ *   ...               - data
+ * There is no notion of a persistent image id in rbd format 1.
  */
 
 #define RBD_SUFFIX             ".rbd"
+
 #define RBD_DIRECTORY           "rbd_directory"
 #define RBD_INFO                "rbd_info"
 
@@ -47,7 +62,7 @@ struct rbd_image_snap_ondisk {
 
 struct rbd_image_header_ondisk {
        char text[40];
-       char block_name[24];
+       char object_prefix[24];
        char signature[4];
        char version[8];
        struct {
index 452e71a..22b6e45 100644 (file)
@@ -205,7 +205,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        dout("readpage inode %p file %p page %p index %lu\n",
             inode, filp, page, page->index);
        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-                                 page->index << PAGE_CACHE_SHIFT, &len,
+                                 (u64) page_offset(page), &len,
                                  ci->i_truncate_seq, ci->i_truncate_size,
                                  &page, 1, 0);
        if (err == -ENOENT)
@@ -286,7 +286,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        int nr_pages = 0;
        int ret;
 
-       off = page->index << PAGE_CACHE_SHIFT;
+       off = (u64) page_offset(page);
 
        /* count pages */
        next_index = page->index;
@@ -308,8 +308,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
                                    NULL, 0,
                                    ci->i_truncate_seq, ci->i_truncate_size,
                                    NULL, false, 1, 0);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* build page vector */
        nr_pages = len >> PAGE_CACHE_SHIFT;
@@ -426,7 +426,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_inode_info *ci;
        struct ceph_fs_client *fsc;
        struct ceph_osd_client *osdc;
-       loff_t page_off = page->index << PAGE_CACHE_SHIFT;
+       loff_t page_off = page_offset(page);
        int len = PAGE_CACHE_SIZE;
        loff_t i_size;
        int err = 0;
@@ -817,8 +817,7 @@ get_more_pages:
                        /* ok */
                        if (locked_pages == 0) {
                                /* prepare async write request */
-                               offset = (unsigned long long)page->index
-                                       << PAGE_CACHE_SHIFT;
+                               offset = (u64) page_offset(page);
                                len = wsize;
                                req = ceph_osdc_new_request(&fsc->client->osdc,
                                            &ci->i_layout,
@@ -832,8 +831,8 @@ get_more_pages:
                                            ci->i_truncate_size,
                                            &inode->i_mtime, true, 1, 0);
 
-                               if (!req) {
-                                       rc = -ENOMEM;
+                               if (IS_ERR(req)) {
+                                       rc = PTR_ERR(req);
                                        unlock_page(page);
                                        break;
                                }
@@ -1180,7 +1179,7 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        struct inode *inode = vma->vm_file->f_dentry->d_inode;
        struct page *page = vmf->page;
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
-       loff_t off = page->index << PAGE_CACHE_SHIFT;
+       loff_t off = page_offset(page);
        loff_t size, len;
        int ret;
 
index 620daad..3251e9c 100644 (file)
@@ -1005,7 +1005,7 @@ static void __queue_cap_release(struct ceph_mds_session *session,
 
        BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
        head = msg->front.iov_base;
-       head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
+       le32_add_cpu(&head->num, 1);
        item = msg->front.iov_base + msg->front.iov_len;
        item->ino = cpu_to_le64(ino);
        item->cap_id = cpu_to_le64(cap_id);
index ecebbc0..5840d2a 100644 (file)
@@ -536,8 +536,8 @@ more:
                                    do_sync,
                                    ci->i_truncate_seq, ci->i_truncate_size,
                                    &mtime, false, 2, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        if (file->f_flags & O_DIRECT) {
                pages = ceph_get_direct_page_vector(data, num_pages, false);
index 1396ceb..36549a4 100644 (file)
@@ -187,14 +187,18 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        u64 tmp;
        struct ceph_object_layout ol;
        struct ceph_pg pgid;
+       int r;
 
        /* copy and validate */
        if (copy_from_user(&dl, arg, sizeof(dl)))
                return -EFAULT;
 
        down_read(&osdc->map_sem);
-       ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
-                                     &dl.object_no, &dl.object_offset, &olen);
+       r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
+                                         &dl.object_no, &dl.object_offset,
+                                         &olen);
+       if (r < 0)
+               return -EIO;
        dl.file_offset -= dl.object_offset;
        dl.object_size = ceph_file_layout_object_size(ci->i_layout);
        dl.block_size = ceph_file_layout_su(ci->i_layout);
index a5a7354..1bcf712 100644 (file)
@@ -2625,7 +2625,8 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
                     session_state_name(s->s_state));
 
-               if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
+               if (i >= newmap->m_max_mds ||
+                   memcmp(ceph_mdsmap_get_addr(oldmap, i),
                           ceph_mdsmap_get_addr(newmap, i),
                           sizeof(struct ceph_entity_addr))) {
                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
index 3a42d93..2eb43f2 100644 (file)
@@ -307,7 +307,10 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
 {
        struct ceph_mount_options *fsopt;
        const char *dev_name_end;
-       int err = -ENOMEM;
+       int err;
+
+       if (!dev_name || !*dev_name)
+               return -EINVAL;
 
        fsopt = kzalloc(sizeof(*fsopt), GFP_KERNEL);
        if (!fsopt)
@@ -328,21 +331,33 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
        fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
        fsopt->congestion_kb = default_congestion_kb();
 
-       /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+       /*
+        * Distinguish the server list from the path in "dev_name".
+        * Internally we do not include the leading '/' in the path.
+        *
+        * "dev_name" will look like:
+        *     <server_spec>[,<server_spec>...]:[<path>]
+        * where
+        *     <server_spec> is <ip>[:<port>]
+        *     <path> is optional, but if present must begin with '/'
+        */
+       dev_name_end = strchr(dev_name, '/');
+       if (dev_name_end) {
+               /* skip over leading '/' for path */
+               *path = dev_name_end + 1;
+       } else {
+               /* path is empty */
+               dev_name_end = dev_name + strlen(dev_name);
+               *path = dev_name_end;
+       }
        err = -EINVAL;
-       if (!dev_name)
-               goto out;
-       *path = strstr(dev_name, ":/");
-       if (*path == NULL) {
-               pr_err("device name is missing path (no :/ in %s)\n",
+       dev_name_end--;         /* back up to ':' separator */
+       if (*dev_name_end != ':') {
+               pr_err("device name is missing path (no : separator in %s)\n",
                                dev_name);
                goto out;
        }
-       dev_name_end = *path;
        dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
-
-       /* path on server */
-       *path += 2;
        dout("server path '%s'\n", *path);
 
        *popt = ceph_parse_options(options, dev_name, dev_name_end,
index 1fb93e9..a486f39 100644 (file)
@@ -71,7 +71,6 @@ struct ceph_mon_client {
        int cur_mon;                       /* last monitor i contacted */
        unsigned long sub_sent, sub_renew_after;
        struct ceph_connection con;
-       bool have_fsid;
 
        /* pending generic requests */
        struct rb_root generic_request_tree;
index cedfb1a..d9b880e 100644 (file)
@@ -207,7 +207,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
                                 struct ceph_msg *msg);
 
-extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                        struct ceph_file_layout *layout,
                        u64 snapid,
                        u64 off, u64 *plen, u64 *bno,
index 25b930b..e37acbe 100644 (file)
@@ -109,9 +109,9 @@ extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
 
 /* calculate mapping of a file extent to an object */
-extern void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-                                         u64 off, u64 *plen,
-                                         u64 *bno, u64 *oxoff, u64 *oxlen);
+extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+                                        u64 off, u64 *plen,
+                                        u64 *bno, u64 *oxoff, u64 *oxlen);
 
 /* calculate mapping of object to a placement group */
 extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
index 900ea0f..812eb3b 100644 (file)
@@ -637,7 +637,7 @@ bad:
 /*
  * Do a synchronous pool op.
  */
-int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+static int do_poolop(struct ceph_mon_client *monc, u32 op,
                        u32 pool, u64 snapid,
                        char *buf, int len)
 {
@@ -687,7 +687,7 @@ out:
 int ceph_monc_create_snapid(struct ceph_mon_client *monc,
                            u32 pool, u64 *snapid)
 {
-       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+       return do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
                                   pool, 0, (char *)snapid, sizeof(*snapid));
 
 }
@@ -696,7 +696,7 @@ EXPORT_SYMBOL(ceph_monc_create_snapid);
 int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
                            u32 pool, u64 snapid)
 {
-       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+       return do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
                                   pool, snapid, 0, 0);
 
 }
@@ -769,7 +769,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
        }
        monc->monmap->num_mon = num_mon;
-       monc->have_fsid = false;
        return 0;
 }
 
index 42119c0..ccbdfbb 100644 (file)
@@ -52,7 +52,7 @@ static int op_has_extent(int op)
                op == CEPH_OSD_OP_WRITE);
 }
 
-void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                        struct ceph_file_layout *layout,
                        u64 snapid,
                        u64 off, u64 *plen, u64 *bno,
@@ -62,12 +62,15 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
+       int r;
 
        reqhead->snapid = cpu_to_le64(snapid);
 
        /* object extent? */
-       ceph_calc_file_object_mapping(layout, off, plen, bno,
-                                     &objoff, &objlen);
+       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+                                         &objoff, &objlen);
+       if (r < 0)
+               return r;
        if (*plen < orig_len)
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
@@ -83,7 +86,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 
        dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
             *bno, objoff, objlen, req->r_num_pages);
-
+       return 0;
 }
 EXPORT_SYMBOL(ceph_calc_raw_layout);
 
@@ -112,20 +115,25 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static void calc_layout(struct ceph_osd_client *osdc,
-                       struct ceph_vino vino,
-                       struct ceph_file_layout *layout,
-                       u64 off, u64 *plen,
-                       struct ceph_osd_request *req,
-                       struct ceph_osd_req_op *op)
+static int calc_layout(struct ceph_osd_client *osdc,
+                      struct ceph_vino vino,
+                      struct ceph_file_layout *layout,
+                      u64 off, u64 *plen,
+                      struct ceph_osd_request *req,
+                      struct ceph_osd_req_op *op)
 {
        u64 bno;
+       int r;
 
-       ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                            plen, &bno, req, op);
+       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+                                plen, &bno, req, op);
+       if (r < 0)
+               return r;
 
        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
        req->r_oid_len = strlen(req->r_oid);
+
+       return r;
 }
 
 /*
@@ -456,6 +464,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_req_op ops[3];
        struct ceph_osd_request *req;
+       int r;
 
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
@@ -474,10 +483,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                         use_mempool,
                                         GFP_NOFS, NULL, NULL);
        if (!req)
-               return NULL;
+               return ERR_PTR(-ENOMEM);
 
        /* calculate max write size */
-       calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       if (r < 0)
+               return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
 
        /* in case it differs from natural (file) alignment that
@@ -1920,8 +1931,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
                                    false, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
@@ -1963,8 +1974,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
                                    nofail, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short write due to an object boundary */
        req->r_pages = pages;
index 3124b71..5433fb0 100644 (file)
@@ -984,7 +984,7 @@ bad:
  * for now, we write only a single su, until we can
  * pass a stride back to the caller.
  */
-void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
                                   u64 off, u64 *plen,
                                   u64 *ono,
                                   u64 *oxoff, u64 *oxlen)
@@ -998,11 +998,17 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 
        dout("mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
             osize, su);
+       if (su == 0 || sc == 0)
+               goto invalid;
        su_per_object = osize / su;
+       if (su_per_object == 0)
+               goto invalid;
        dout("osize %u / su %u = su_per_object %u\n", osize, su,
             su_per_object);
 
-       BUG_ON((su & ~PAGE_MASK) != 0);
+       if ((su & ~PAGE_MASK) != 0)
+               goto invalid;
+
        /* bl = *off / su; */
        t = off;
        do_div(t, su);
@@ -1030,6 +1036,14 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
        *plen = *oxlen;
 
        dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
+       return 0;
+
+invalid:
+       dout(" invalid layout\n");
+       *ono = 0;
+       *oxoff = 0;
+       *oxlen = 0;
+       return -EINVAL;
 }
 EXPORT_SYMBOL(ceph_calc_file_object_mapping);
 
index 665cd23..92866be 100644 (file)
@@ -1,4 +1,3 @@
-
 #include <linux/module.h>
 #include <linux/gfp.h>
 #include <linux/pagemap.h>
@@ -134,8 +133,8 @@ int ceph_pagelist_truncate(struct ceph_pagelist *pl,
        ceph_pagelist_unmap_tail(pl);
        while (pl->head.prev != c->page_lru) {
                page = list_entry(pl->head.prev, struct page, lru);
-               list_del(&page->lru);                /* remove from pagelist */
-               list_add_tail(&page->lru, &pl->free_list); /* add to reserve */
+               /* move from pagelist to reserve */
+               list_move_tail(&page->lru, &pl->free_list);
                ++pl->num_pages_free;
        }
        pl->room = c->room;