block_device_operations->release() should return void
[linux-3.10.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178         const char              *object_name;
179         u64                     offset;         /* object start byte */
180         u64                     length;         /* bytes from offset */
181
182         struct rbd_img_request  *img_request;
183         struct list_head        links;          /* img_request->obj_requests */
184         u32                     which;          /* posn image request list */
185
186         enum obj_request_type   type;
187         union {
188                 struct bio      *bio_list;
189                 struct {
190                         struct page     **pages;
191                         u32             page_count;
192                 };
193         };
194
195         struct ceph_osd_request *osd_req;
196
197         u64                     xferred;        /* bytes transferred */
198         u64                     version;
199         int                     result;
200         atomic_t                done;
201
202         rbd_obj_callback_t      callback;
203         struct completion       completion;
204
205         struct kref             kref;
206 };
207
208 struct rbd_img_request {
209         struct request          *rq;
210         struct rbd_device       *rbd_dev;
211         u64                     offset; /* starting image byte offset */
212         u64                     length; /* byte count from offset */
213         bool                    write_request;  /* false for read */
214         union {
215                 struct ceph_snap_context *snapc;        /* for writes */
216                 u64             snap_id;                /* for reads */
217         };
218         spinlock_t              completion_lock;/* protects next_completion */
219         u32                     next_completion;
220         rbd_img_callback_t      callback;
221
222         u32                     obj_request_count;
223         struct list_head        obj_requests;   /* rbd_obj_request structs */
224
225         struct kref             kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
235 struct rbd_snap {
236         struct  device          dev;
237         const char              *name;
238         u64                     size;
239         struct list_head        node;
240         u64                     id;
241         u64                     features;
242 };
243
244 struct rbd_mapping {
245         u64                     size;
246         u64                     features;
247         bool                    read_only;
248 };
249
250 /*
251  * a single device
252  */
253 struct rbd_device {
254         int                     dev_id;         /* blkdev unique id */
255
256         int                     major;          /* blkdev assigned major */
257         struct gendisk          *disk;          /* blkdev's gendisk and rq */
258
259         u32                     image_format;   /* Either 1 or 2 */
260         struct rbd_client       *rbd_client;
261
262         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264         spinlock_t              lock;           /* queue, flags, open_count */
265
266         struct rbd_image_header header;
267         unsigned long           flags;          /* possibly lock protected */
268         struct rbd_spec         *spec;
269
270         char                    *header_name;
271
272         struct ceph_file_layout layout;
273
274         struct ceph_osd_event   *watch_event;
275         struct rbd_obj_request  *watch_request;
276
277         struct rbd_spec         *parent_spec;
278         u64                     parent_overlap;
279
280         /* protects updating the header */
281         struct rw_semaphore     header_rwsem;
282
283         struct rbd_mapping      mapping;
284
285         struct list_head        node;
286
287         /* list of snapshots */
288         struct list_head        snaps;
289
290         /* sysfs related */
291         struct device           dev;
292         unsigned long           open_count;     /* protected by lock */
293 };
294
295 /*
296  * Flag bits for rbd_dev->flags.  If atomicity is required,
297  * rbd_dev->lock is used to protect access.
298  *
299  * Currently, only the "removing" flag (which is coupled with the
300  * "open_count" field) requires atomic access.
301  */
302 enum rbd_dev_flags {
303         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
304         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
305 };
306
307 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
308
309 static LIST_HEAD(rbd_dev_list);    /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312 static LIST_HEAD(rbd_client_list);              /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322                        size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324                           size_t count);
325
326 static struct bus_attribute rbd_bus_attrs[] = {
327         __ATTR(add, S_IWUSR, NULL, rbd_add),
328         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329         __ATTR_NULL
330 };
331
332 static struct bus_type rbd_bus_type = {
333         .name           = "rbd",
334         .bus_attrs      = rbd_bus_attrs,
335 };
336
337 static void rbd_root_dev_release(struct device *dev)
338 {
339 }
340
341 static struct device rbd_root_dev = {
342         .init_name =    "rbd",
343         .release =      rbd_root_dev_release,
344 };
345
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348 {
349         struct va_format vaf;
350         va_list args;
351
352         va_start(args, fmt);
353         vaf.fmt = fmt;
354         vaf.va = &args;
355
356         if (!rbd_dev)
357                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358         else if (rbd_dev->disk)
359                 printk(KERN_WARNING "%s: %s: %pV\n",
360                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361         else if (rbd_dev->spec && rbd_dev->spec->image_name)
362                 printk(KERN_WARNING "%s: image %s: %pV\n",
363                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364         else if (rbd_dev->spec && rbd_dev->spec->image_id)
365                 printk(KERN_WARNING "%s: id %s: %pV\n",
366                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367         else    /* punt */
368                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369                         RBD_DRV_NAME, rbd_dev, &vaf);
370         va_end(args);
371 }
372
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr)                                                \
375                 if (unlikely(!(expr))) {                                \
376                         printk(KERN_ERR "\nAssertion failure in %s() "  \
377                                                 "at line %d:\n\n"       \
378                                         "\trbd_assert(%s);\n\n",        \
379                                         __func__, __LINE__, #expr);     \
380                         BUG();                                          \
381                 }
382 #else /* !RBD_DEBUG */
383 #  define rbd_assert(expr)      ((void) 0)
384 #endif /* !RBD_DEBUG */
385
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
390 {
391         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392         bool removing = false;
393
394         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395                 return -EROFS;
396
397         spin_lock_irq(&rbd_dev->lock);
398         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399                 removing = true;
400         else
401                 rbd_dev->open_count++;
402         spin_unlock_irq(&rbd_dev->lock);
403         if (removing)
404                 return -ENOENT;
405
406         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407         (void) get_device(&rbd_dev->dev);
408         set_device_ro(bdev, rbd_dev->mapping.read_only);
409         mutex_unlock(&ctl_mutex);
410
411         return 0;
412 }
413
414 static void rbd_release(struct gendisk *disk, fmode_t mode)
415 {
416         struct rbd_device *rbd_dev = disk->private_data;
417         unsigned long open_count_before;
418
419         spin_lock_irq(&rbd_dev->lock);
420         open_count_before = rbd_dev->open_count--;
421         spin_unlock_irq(&rbd_dev->lock);
422         rbd_assert(open_count_before > 0);
423
424         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425         put_device(&rbd_dev->dev);
426         mutex_unlock(&ctl_mutex);
427 }
428
429 static const struct block_device_operations rbd_bd_ops = {
430         .owner                  = THIS_MODULE,
431         .open                   = rbd_open,
432         .release                = rbd_release,
433 };
434
435 /*
436  * Initialize an rbd client instance.
437  * We own *ceph_opts.
438  */
439 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
440 {
441         struct rbd_client *rbdc;
442         int ret = -ENOMEM;
443
444         dout("%s:\n", __func__);
445         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
446         if (!rbdc)
447                 goto out_opt;
448
449         kref_init(&rbdc->kref);
450         INIT_LIST_HEAD(&rbdc->node);
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453
454         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
455         if (IS_ERR(rbdc->client))
456                 goto out_mutex;
457         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
458
459         ret = ceph_open_session(rbdc->client);
460         if (ret < 0)
461                 goto out_err;
462
463         spin_lock(&rbd_client_list_lock);
464         list_add_tail(&rbdc->node, &rbd_client_list);
465         spin_unlock(&rbd_client_list_lock);
466
467         mutex_unlock(&ctl_mutex);
468         dout("%s: rbdc %p\n", __func__, rbdc);
469
470         return rbdc;
471
472 out_err:
473         ceph_destroy_client(rbdc->client);
474 out_mutex:
475         mutex_unlock(&ctl_mutex);
476         kfree(rbdc);
477 out_opt:
478         if (ceph_opts)
479                 ceph_destroy_options(ceph_opts);
480         dout("%s: error %d\n", __func__, ret);
481
482         return ERR_PTR(ret);
483 }
484
485 /*
486  * Find a ceph client with specific addr and configuration.  If
487  * found, bump its reference count.
488  */
489 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *client_node;
492         bool found = false;
493
494         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
495                 return NULL;
496
497         spin_lock(&rbd_client_list_lock);
498         list_for_each_entry(client_node, &rbd_client_list, node) {
499                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
500                         kref_get(&client_node->kref);
501                         found = true;
502                         break;
503                 }
504         }
505         spin_unlock(&rbd_client_list_lock);
506
507         return found ? client_node : NULL;
508 }
509
510 /*
511  * mount options
512  */
513 enum {
514         Opt_last_int,
515         /* int args above */
516         Opt_last_string,
517         /* string args above */
518         Opt_read_only,
519         Opt_read_write,
520         /* Boolean args above */
521         Opt_last_bool,
522 };
523
524 static match_table_t rbd_opts_tokens = {
525         /* int args above */
526         /* string args above */
527         {Opt_read_only, "read_only"},
528         {Opt_read_only, "ro"},          /* Alternate spelling */
529         {Opt_read_write, "read_write"},
530         {Opt_read_write, "rw"},         /* Alternate spelling */
531         /* Boolean args above */
532         {-1, NULL}
533 };
534
535 struct rbd_options {
536         bool    read_only;
537 };
538
539 #define RBD_READ_ONLY_DEFAULT   false
540
541 static int parse_rbd_opts_token(char *c, void *private)
542 {
543         struct rbd_options *rbd_opts = private;
544         substring_t argstr[MAX_OPT_ARGS];
545         int token, intval, ret;
546
547         token = match_token(c, rbd_opts_tokens, argstr);
548         if (token < 0)
549                 return -EINVAL;
550
551         if (token < Opt_last_int) {
552                 ret = match_int(&argstr[0], &intval);
553                 if (ret < 0) {
554                         pr_err("bad mount option arg (not int) "
555                                "at '%s'\n", c);
556                         return ret;
557                 }
558                 dout("got int token %d val %d\n", token, intval);
559         } else if (token > Opt_last_int && token < Opt_last_string) {
560                 dout("got string token %d val %s\n", token,
561                      argstr[0].from);
562         } else if (token > Opt_last_string && token < Opt_last_bool) {
563                 dout("got Boolean token %d\n", token);
564         } else {
565                 dout("got token %d\n", token);
566         }
567
568         switch (token) {
569         case Opt_read_only:
570                 rbd_opts->read_only = true;
571                 break;
572         case Opt_read_write:
573                 rbd_opts->read_only = false;
574                 break;
575         default:
576                 rbd_assert(false);
577                 break;
578         }
579         return 0;
580 }
581
582 /*
583  * Get a ceph client with specific addr and configuration, if one does
584  * not exist create it.
585  */
586 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
587 {
588         struct rbd_client *rbdc;
589
590         rbdc = rbd_client_find(ceph_opts);
591         if (rbdc)       /* using an existing client */
592                 ceph_destroy_options(ceph_opts);
593         else
594                 rbdc = rbd_client_create(ceph_opts);
595
596         return rbdc;
597 }
598
599 /*
600  * Destroy ceph client
601  *
602  * Caller must hold rbd_client_list_lock.
603  */
604 static void rbd_client_release(struct kref *kref)
605 {
606         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
607
608         dout("%s: rbdc %p\n", __func__, rbdc);
609         spin_lock(&rbd_client_list_lock);
610         list_del(&rbdc->node);
611         spin_unlock(&rbd_client_list_lock);
612
613         ceph_destroy_client(rbdc->client);
614         kfree(rbdc);
615 }
616
617 /*
618  * Drop reference to ceph client node. If it's not referenced anymore, release
619  * it.
620  */
621 static void rbd_put_client(struct rbd_client *rbdc)
622 {
623         if (rbdc)
624                 kref_put(&rbdc->kref, rbd_client_release);
625 }
626
627 static bool rbd_image_format_valid(u32 image_format)
628 {
629         return image_format == 1 || image_format == 2;
630 }
631
632 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
633 {
634         size_t size;
635         u32 snap_count;
636
637         /* The header has to start with the magic rbd header text */
638         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
639                 return false;
640
641         /* The bio layer requires at least sector-sized I/O */
642
643         if (ondisk->options.order < SECTOR_SHIFT)
644                 return false;
645
646         /* If we use u64 in a few spots we may be able to loosen this */
647
648         if (ondisk->options.order > 8 * sizeof (int) - 1)
649                 return false;
650
651         /*
652          * The size of a snapshot header has to fit in a size_t, and
653          * that limits the number of snapshots.
654          */
655         snap_count = le32_to_cpu(ondisk->snap_count);
656         size = SIZE_MAX - sizeof (struct ceph_snap_context);
657         if (snap_count > size / sizeof (__le64))
658                 return false;
659
660         /*
661          * Not only that, but the size of the entire the snapshot
662          * header must also be representable in a size_t.
663          */
664         size -= snap_count * sizeof (__le64);
665         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
666                 return false;
667
668         return true;
669 }
670
671 /*
672  * Create a new header structure, translate header format from the on-disk
673  * header.
674  */
675 static int rbd_header_from_disk(struct rbd_image_header *header,
676                                  struct rbd_image_header_ondisk *ondisk)
677 {
678         u32 snap_count;
679         size_t len;
680         size_t size;
681         u32 i;
682
683         memset(header, 0, sizeof (*header));
684
685         snap_count = le32_to_cpu(ondisk->snap_count);
686
687         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
688         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
689         if (!header->object_prefix)
690                 return -ENOMEM;
691         memcpy(header->object_prefix, ondisk->object_prefix, len);
692         header->object_prefix[len] = '\0';
693
694         if (snap_count) {
695                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
696
697                 /* Save a copy of the snapshot names */
698
699                 if (snap_names_len > (u64) SIZE_MAX)
700                         return -EIO;
701                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
702                 if (!header->snap_names)
703                         goto out_err;
704                 /*
705                  * Note that rbd_dev_v1_header_read() guarantees
706                  * the ondisk buffer we're working with has
707                  * snap_names_len bytes beyond the end of the
708                  * snapshot id array, this memcpy() is safe.
709                  */
710                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
711                         snap_names_len);
712
713                 /* Record each snapshot's size */
714
715                 size = snap_count * sizeof (*header->snap_sizes);
716                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
717                 if (!header->snap_sizes)
718                         goto out_err;
719                 for (i = 0; i < snap_count; i++)
720                         header->snap_sizes[i] =
721                                 le64_to_cpu(ondisk->snaps[i].image_size);
722         } else {
723                 WARN_ON(ondisk->snap_names_len);
724                 header->snap_names = NULL;
725                 header->snap_sizes = NULL;
726         }
727
728         header->features = 0;   /* No features support in v1 images */
729         header->obj_order = ondisk->options.order;
730         header->crypt_type = ondisk->options.crypt_type;
731         header->comp_type = ondisk->options.comp_type;
732
733         /* Allocate and fill in the snapshot context */
734
735         header->image_size = le64_to_cpu(ondisk->image_size);
736         size = sizeof (struct ceph_snap_context);
737         size += snap_count * sizeof (header->snapc->snaps[0]);
738         header->snapc = kzalloc(size, GFP_KERNEL);
739         if (!header->snapc)
740                 goto out_err;
741
742         atomic_set(&header->snapc->nref, 1);
743         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
744         header->snapc->num_snaps = snap_count;
745         for (i = 0; i < snap_count; i++)
746                 header->snapc->snaps[i] =
747                         le64_to_cpu(ondisk->snaps[i].id);
748
749         return 0;
750
751 out_err:
752         kfree(header->snap_sizes);
753         header->snap_sizes = NULL;
754         kfree(header->snap_names);
755         header->snap_names = NULL;
756         kfree(header->object_prefix);
757         header->object_prefix = NULL;
758
759         return -ENOMEM;
760 }
761
762 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
763 {
764         struct rbd_snap *snap;
765
766         if (snap_id == CEPH_NOSNAP)
767                 return RBD_SNAP_HEAD_NAME;
768
769         list_for_each_entry(snap, &rbd_dev->snaps, node)
770                 if (snap_id == snap->id)
771                         return snap->name;
772
773         return NULL;
774 }
775
776 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
777 {
778
779         struct rbd_snap *snap;
780
781         list_for_each_entry(snap, &rbd_dev->snaps, node) {
782                 if (!strcmp(snap_name, snap->name)) {
783                         rbd_dev->spec->snap_id = snap->id;
784                         rbd_dev->mapping.size = snap->size;
785                         rbd_dev->mapping.features = snap->features;
786
787                         return 0;
788                 }
789         }
790
791         return -ENOENT;
792 }
793
794 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
795 {
796         int ret;
797
798         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
799                     sizeof (RBD_SNAP_HEAD_NAME))) {
800                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
801                 rbd_dev->mapping.size = rbd_dev->header.image_size;
802                 rbd_dev->mapping.features = rbd_dev->header.features;
803                 ret = 0;
804         } else {
805                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
806                 if (ret < 0)
807                         goto done;
808                 rbd_dev->mapping.read_only = true;
809         }
810         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811
812 done:
813         return ret;
814 }
815
816 static void rbd_header_free(struct rbd_image_header *header)
817 {
818         kfree(header->object_prefix);
819         header->object_prefix = NULL;
820         kfree(header->snap_sizes);
821         header->snap_sizes = NULL;
822         kfree(header->snap_names);
823         header->snap_names = NULL;
824         ceph_put_snap_context(header->snapc);
825         header->snapc = NULL;
826 }
827
828 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
829 {
830         char *name;
831         u64 segment;
832         int ret;
833
834         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
835         if (!name)
836                 return NULL;
837         segment = offset >> rbd_dev->header.obj_order;
838         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
839                         rbd_dev->header.object_prefix, segment);
840         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
841                 pr_err("error formatting segment name for #%llu (%d)\n",
842                         segment, ret);
843                 kfree(name);
844                 name = NULL;
845         }
846
847         return name;
848 }
849
850 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
851 {
852         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
853
854         return offset & (segment_size - 1);
855 }
856
857 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
858                                 u64 offset, u64 length)
859 {
860         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861
862         offset &= segment_size - 1;
863
864         rbd_assert(length <= U64_MAX - offset);
865         if (offset + length > segment_size)
866                 length = segment_size - offset;
867
868         return length;
869 }
870
871 /*
872  * returns the size of an object in the image
873  */
874 static u64 rbd_obj_bytes(struct rbd_image_header *header)
875 {
876         return 1 << header->obj_order;
877 }
878
879 /*
880  * bio helpers
881  */
882
883 static void bio_chain_put(struct bio *chain)
884 {
885         struct bio *tmp;
886
887         while (chain) {
888                 tmp = chain;
889                 chain = chain->bi_next;
890                 bio_put(tmp);
891         }
892 }
893
894 /*
895  * zeros a bio chain, starting at specific offset
896  */
897 static void zero_bio_chain(struct bio *chain, int start_ofs)
898 {
899         struct bio_vec *bv;
900         unsigned long flags;
901         void *buf;
902         int i;
903         int pos = 0;
904
905         while (chain) {
906                 bio_for_each_segment(bv, chain, i) {
907                         if (pos + bv->bv_len > start_ofs) {
908                                 int remainder = max(start_ofs - pos, 0);
909                                 buf = bvec_kmap_irq(bv, &flags);
910                                 memset(buf + remainder, 0,
911                                        bv->bv_len - remainder);
912                                 bvec_kunmap_irq(buf, &flags);
913                         }
914                         pos += bv->bv_len;
915                 }
916
917                 chain = chain->bi_next;
918         }
919 }
920
921 /*
922  * Clone a portion of a bio, starting at the given byte offset
923  * and continuing for the number of bytes indicated.
924  */
925 static struct bio *bio_clone_range(struct bio *bio_src,
926                                         unsigned int offset,
927                                         unsigned int len,
928                                         gfp_t gfpmask)
929 {
930         struct bio_vec *bv;
931         unsigned int resid;
932         unsigned short idx;
933         unsigned int voff;
934         unsigned short end_idx;
935         unsigned short vcnt;
936         struct bio *bio;
937
938         /* Handle the easy case for the caller */
939
940         if (!offset && len == bio_src->bi_size)
941                 return bio_clone(bio_src, gfpmask);
942
943         if (WARN_ON_ONCE(!len))
944                 return NULL;
945         if (WARN_ON_ONCE(len > bio_src->bi_size))
946                 return NULL;
947         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
948                 return NULL;
949
950         /* Find first affected segment... */
951
952         resid = offset;
953         __bio_for_each_segment(bv, bio_src, idx, 0) {
954                 if (resid < bv->bv_len)
955                         break;
956                 resid -= bv->bv_len;
957         }
958         voff = resid;
959
960         /* ...and the last affected segment */
961
962         resid += len;
963         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
964                 if (resid <= bv->bv_len)
965                         break;
966                 resid -= bv->bv_len;
967         }
968         vcnt = end_idx - idx + 1;
969
970         /* Build the clone */
971
972         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
973         if (!bio)
974                 return NULL;    /* ENOMEM */
975
976         bio->bi_bdev = bio_src->bi_bdev;
977         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
978         bio->bi_rw = bio_src->bi_rw;
979         bio->bi_flags |= 1 << BIO_CLONED;
980
981         /*
982          * Copy over our part of the bio_vec, then update the first
983          * and last (or only) entries.
984          */
985         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
986                         vcnt * sizeof (struct bio_vec));
987         bio->bi_io_vec[0].bv_offset += voff;
988         if (vcnt > 1) {
989                 bio->bi_io_vec[0].bv_len -= voff;
990                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
991         } else {
992                 bio->bi_io_vec[0].bv_len = len;
993         }
994
995         bio->bi_vcnt = vcnt;
996         bio->bi_size = len;
997         bio->bi_idx = 0;
998
999         return bio;
1000 }
1001
1002 /*
1003  * Clone a portion of a bio chain, starting at the given byte offset
1004  * into the first bio in the source chain and continuing for the
1005  * number of bytes indicated.  The result is another bio chain of
1006  * exactly the given length, or a null pointer on error.
1007  *
1008  * The bio_src and offset parameters are both in-out.  On entry they
1009  * refer to the first source bio and the offset into that bio where
1010  * the start of data to be cloned is located.
1011  *
1012  * On return, bio_src is updated to refer to the bio in the source
1013  * chain that contains first un-cloned byte, and *offset will
1014  * contain the offset of that byte within that bio.
1015  */
1016 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1017                                         unsigned int *offset,
1018                                         unsigned int len,
1019                                         gfp_t gfpmask)
1020 {
1021         struct bio *bi = *bio_src;
1022         unsigned int off = *offset;
1023         struct bio *chain = NULL;
1024         struct bio **end;
1025
1026         /* Build up a chain of clone bios up to the limit */
1027
1028         if (!bi || off >= bi->bi_size || !len)
1029                 return NULL;            /* Nothing to clone */
1030
1031         end = &chain;
1032         while (len) {
1033                 unsigned int bi_size;
1034                 struct bio *bio;
1035
1036                 if (!bi) {
1037                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1038                         goto out_err;   /* EINVAL; ran out of bio's */
1039                 }
1040                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1041                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1042                 if (!bio)
1043                         goto out_err;   /* ENOMEM */
1044
1045                 *end = bio;
1046                 end = &bio->bi_next;
1047
1048                 off += bi_size;
1049                 if (off == bi->bi_size) {
1050                         bi = bi->bi_next;
1051                         off = 0;
1052                 }
1053                 len -= bi_size;
1054         }
1055         *bio_src = bi;
1056         *offset = off;
1057
1058         return chain;
1059 out_err:
1060         bio_chain_put(chain);
1061
1062         return NULL;
1063 }
1064
1065 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1066 {
1067         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1068                 atomic_read(&obj_request->kref.refcount));
1069         kref_get(&obj_request->kref);
1070 }
1071
1072 static void rbd_obj_request_destroy(struct kref *kref);
1073 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1074 {
1075         rbd_assert(obj_request != NULL);
1076         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1077                 atomic_read(&obj_request->kref.refcount));
1078         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1079 }
1080
1081 static void rbd_img_request_get(struct rbd_img_request *img_request)
1082 {
1083         dout("%s: img %p (was %d)\n", __func__, img_request,
1084                 atomic_read(&img_request->kref.refcount));
1085         kref_get(&img_request->kref);
1086 }
1087
1088 static void rbd_img_request_destroy(struct kref *kref);
1089 static void rbd_img_request_put(struct rbd_img_request *img_request)
1090 {
1091         rbd_assert(img_request != NULL);
1092         dout("%s: img %p (was %d)\n", __func__, img_request,
1093                 atomic_read(&img_request->kref.refcount));
1094         kref_put(&img_request->kref, rbd_img_request_destroy);
1095 }
1096
1097 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1098                                         struct rbd_obj_request *obj_request)
1099 {
1100         rbd_assert(obj_request->img_request == NULL);
1101
1102         rbd_obj_request_get(obj_request);
1103         obj_request->img_request = img_request;
1104         obj_request->which = img_request->obj_request_count;
1105         rbd_assert(obj_request->which != BAD_WHICH);
1106         img_request->obj_request_count++;
1107         list_add_tail(&obj_request->links, &img_request->obj_requests);
1108         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1109                 obj_request->which);
1110 }
1111
1112 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1113                                         struct rbd_obj_request *obj_request)
1114 {
1115         rbd_assert(obj_request->which != BAD_WHICH);
1116
1117         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1118                 obj_request->which);
1119         list_del(&obj_request->links);
1120         rbd_assert(img_request->obj_request_count > 0);
1121         img_request->obj_request_count--;
1122         rbd_assert(obj_request->which == img_request->obj_request_count);
1123         obj_request->which = BAD_WHICH;
1124         rbd_assert(obj_request->img_request == img_request);
1125         obj_request->img_request = NULL;
1126         obj_request->callback = NULL;
1127         rbd_obj_request_put(obj_request);
1128 }
1129
1130 static bool obj_request_type_valid(enum obj_request_type type)
1131 {
1132         switch (type) {
1133         case OBJ_REQUEST_NODATA:
1134         case OBJ_REQUEST_BIO:
1135         case OBJ_REQUEST_PAGES:
1136                 return true;
1137         default:
1138                 return false;
1139         }
1140 }
1141
1142 static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1143 {
1144         struct ceph_osd_req_op *op;
1145         va_list args;
1146         size_t size;
1147
1148         op = kzalloc(sizeof (*op), GFP_NOIO);
1149         if (!op)
1150                 return NULL;
1151         op->op = opcode;
1152         va_start(args, opcode);
1153         switch (opcode) {
1154         case CEPH_OSD_OP_READ:
1155         case CEPH_OSD_OP_WRITE:
1156                 /* rbd_osd_req_op_create(READ, offset, length) */
1157                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1158                 op->extent.offset = va_arg(args, u64);
1159                 op->extent.length = va_arg(args, u64);
1160                 if (opcode == CEPH_OSD_OP_WRITE)
1161                         op->payload_len = op->extent.length;
1162                 break;
1163         case CEPH_OSD_OP_STAT:
1164                 break;
1165         case CEPH_OSD_OP_CALL:
1166                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1167                 op->cls.class_name = va_arg(args, char *);
1168                 size = strlen(op->cls.class_name);
1169                 rbd_assert(size <= (size_t) U8_MAX);
1170                 op->cls.class_len = size;
1171                 op->payload_len = size;
1172
1173                 op->cls.method_name = va_arg(args, char *);
1174                 size = strlen(op->cls.method_name);
1175                 rbd_assert(size <= (size_t) U8_MAX);
1176                 op->cls.method_len = size;
1177                 op->payload_len += size;
1178
1179                 op->cls.argc = 0;
1180                 op->cls.indata = va_arg(args, void *);
1181                 size = va_arg(args, size_t);
1182                 rbd_assert(size <= (size_t) U32_MAX);
1183                 op->cls.indata_len = (u32) size;
1184                 op->payload_len += size;
1185                 break;
1186         case CEPH_OSD_OP_NOTIFY_ACK:
1187         case CEPH_OSD_OP_WATCH:
1188                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1189                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1190                 op->watch.cookie = va_arg(args, u64);
1191                 op->watch.ver = va_arg(args, u64);
1192                 op->watch.ver = cpu_to_le64(op->watch.ver);
1193                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1194                         op->watch.flag = (u8) 1;
1195                 break;
1196         default:
1197                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1198                 kfree(op);
1199                 op = NULL;
1200                 break;
1201         }
1202         va_end(args);
1203
1204         return op;
1205 }
1206
1207 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1208 {
1209         kfree(op);
1210 }
1211
1212 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1213                                 struct rbd_obj_request *obj_request)
1214 {
1215         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1216
1217         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1218 }
1219
1220 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1221 {
1222         dout("%s: img %p\n", __func__, img_request);
1223         if (img_request->callback)
1224                 img_request->callback(img_request);
1225         else
1226                 rbd_img_request_put(img_request);
1227 }
1228
1229 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1230
1231 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1232 {
1233         dout("%s: obj %p\n", __func__, obj_request);
1234
1235         return wait_for_completion_interruptible(&obj_request->completion);
1236 }
1237
1238 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1239 {
1240         atomic_set(&obj_request->done, 0);
1241         smp_wmb();
1242 }
1243
1244 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1245 {
1246         int done;
1247
1248         done = atomic_inc_return(&obj_request->done);
1249         if (done > 1) {
1250                 struct rbd_img_request *img_request = obj_request->img_request;
1251                 struct rbd_device *rbd_dev;
1252
1253                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1254                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1255                         obj_request);
1256         }
1257 }
1258
1259 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1260 {
1261         smp_mb();
1262         return atomic_read(&obj_request->done) != 0;
1263 }
1264
1265 static void
1266 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1267 {
1268         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1269                 obj_request, obj_request->img_request, obj_request->result,
1270                 obj_request->xferred, obj_request->length);
1271         /*
1272          * ENOENT means a hole in the image.  We zero-fill the
1273          * entire length of the request.  A short read also implies
1274          * zero-fill to the end of the request.  Either way we
1275          * update the xferred count to indicate the whole request
1276          * was satisfied.
1277          */
1278         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1279         if (obj_request->result == -ENOENT) {
1280                 zero_bio_chain(obj_request->bio_list, 0);
1281                 obj_request->result = 0;
1282                 obj_request->xferred = obj_request->length;
1283         } else if (obj_request->xferred < obj_request->length &&
1284                         !obj_request->result) {
1285                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1286                 obj_request->xferred = obj_request->length;
1287         }
1288         obj_request_done_set(obj_request);
1289 }
1290
1291 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1292 {
1293         dout("%s: obj %p cb %p\n", __func__, obj_request,
1294                 obj_request->callback);
1295         if (obj_request->callback)
1296                 obj_request->callback(obj_request);
1297         else
1298                 complete_all(&obj_request->completion);
1299 }
1300
1301 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1302 {
1303         dout("%s: obj %p\n", __func__, obj_request);
1304         obj_request_done_set(obj_request);
1305 }
1306
1307 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1308 {
1309         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1310                 obj_request->result, obj_request->xferred, obj_request->length);
1311         if (obj_request->img_request)
1312                 rbd_img_obj_request_read_callback(obj_request);
1313         else
1314                 obj_request_done_set(obj_request);
1315 }
1316
1317 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1318 {
1319         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1320                 obj_request->result, obj_request->length);
1321         /*
1322          * There is no such thing as a successful short write.
1323          * Our xferred value is the number of bytes transferred
1324          * back.  Set it to our originally-requested length.
1325          */
1326         obj_request->xferred = obj_request->length;
1327         obj_request_done_set(obj_request);
1328 }
1329
1330 /*
1331  * For a simple stat call there's nothing to do.  We'll do more if
1332  * this is part of a write sequence for a layered image.
1333  */
1334 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1335 {
1336         dout("%s: obj %p\n", __func__, obj_request);
1337         obj_request_done_set(obj_request);
1338 }
1339
1340 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1341                                 struct ceph_msg *msg)
1342 {
1343         struct rbd_obj_request *obj_request = osd_req->r_priv;
1344         u16 opcode;
1345
1346         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1347         rbd_assert(osd_req == obj_request->osd_req);
1348         rbd_assert(!!obj_request->img_request ^
1349                                 (obj_request->which == BAD_WHICH));
1350
1351         if (osd_req->r_result < 0)
1352                 obj_request->result = osd_req->r_result;
1353         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1354
1355         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1356
1357         /*
1358          * We support a 64-bit length, but ultimately it has to be
1359          * passed to blk_end_request(), which takes an unsigned int.
1360          */
1361         obj_request->xferred = osd_req->r_reply_op_len[0];
1362         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1363         opcode = osd_req->r_request_ops[0].op;
1364         switch (opcode) {
1365         case CEPH_OSD_OP_READ:
1366                 rbd_osd_read_callback(obj_request);
1367                 break;
1368         case CEPH_OSD_OP_WRITE:
1369                 rbd_osd_write_callback(obj_request);
1370                 break;
1371         case CEPH_OSD_OP_STAT:
1372                 rbd_osd_stat_callback(obj_request);
1373                 break;
1374         case CEPH_OSD_OP_CALL:
1375         case CEPH_OSD_OP_NOTIFY_ACK:
1376         case CEPH_OSD_OP_WATCH:
1377                 rbd_osd_trivial_callback(obj_request);
1378                 break;
1379         default:
1380                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1381                         obj_request->object_name, (unsigned short) opcode);
1382                 break;
1383         }
1384
1385         if (obj_request_done_test(obj_request))
1386                 rbd_obj_request_complete(obj_request);
1387 }
1388
1389 static struct ceph_osd_request *rbd_osd_req_create(
1390                                         struct rbd_device *rbd_dev,
1391                                         bool write_request,
1392                                         struct rbd_obj_request *obj_request,
1393                                         struct ceph_osd_req_op *op)
1394 {
1395         struct rbd_img_request *img_request = obj_request->img_request;
1396         struct ceph_snap_context *snapc = NULL;
1397         struct ceph_osd_client *osdc;
1398         struct ceph_osd_request *osd_req;
1399         struct timespec now;
1400         struct timespec *mtime;
1401         u64 snap_id = CEPH_NOSNAP;
1402         u64 offset = obj_request->offset;
1403         u64 length = obj_request->length;
1404
1405         if (img_request) {
1406                 rbd_assert(img_request->write_request == write_request);
1407                 if (img_request->write_request)
1408                         snapc = img_request->snapc;
1409                 else
1410                         snap_id = img_request->snap_id;
1411         }
1412
1413         /* Allocate and initialize the request, for the single op */
1414
1415         osdc = &rbd_dev->rbd_client->client->osdc;
1416         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1417         if (!osd_req)
1418                 return NULL;    /* ENOMEM */
1419
1420         rbd_assert(obj_request_type_valid(obj_request->type));
1421         switch (obj_request->type) {
1422         case OBJ_REQUEST_NODATA:
1423                 break;          /* Nothing to do */
1424         case OBJ_REQUEST_BIO:
1425                 rbd_assert(obj_request->bio_list != NULL);
1426                 osd_req->r_bio = obj_request->bio_list;
1427                 break;
1428         case OBJ_REQUEST_PAGES:
1429                 osd_req->r_pages = obj_request->pages;
1430                 osd_req->r_num_pages = obj_request->page_count;
1431                 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1432                 break;
1433         }
1434
1435         if (write_request) {
1436                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1437                 now = CURRENT_TIME;
1438                 mtime = &now;
1439         } else {
1440                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1441                 mtime = NULL;   /* not needed for reads */
1442                 offset = 0;     /* These are not used... */
1443                 length = 0;     /* ...for osd read requests */
1444         }
1445
1446         osd_req->r_callback = rbd_osd_req_callback;
1447         osd_req->r_priv = obj_request;
1448
1449         osd_req->r_oid_len = strlen(obj_request->object_name);
1450         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1451         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1452
1453         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1454
1455         /* osd_req will get its own reference to snapc (if non-null) */
1456
1457         ceph_osdc_build_request(osd_req, offset, length, 1, op,
1458                                 snapc, snap_id, mtime);
1459
1460         return osd_req;
1461 }
1462
1463 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1464 {
1465         ceph_osdc_put_request(osd_req);
1466 }
1467
1468 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1469
1470 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1471                                                 u64 offset, u64 length,
1472                                                 enum obj_request_type type)
1473 {
1474         struct rbd_obj_request *obj_request;
1475         size_t size;
1476         char *name;
1477
1478         rbd_assert(obj_request_type_valid(type));
1479
1480         size = strlen(object_name) + 1;
1481         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1482         if (!obj_request)
1483                 return NULL;
1484
1485         name = (char *)(obj_request + 1);
1486         obj_request->object_name = memcpy(name, object_name, size);
1487         obj_request->offset = offset;
1488         obj_request->length = length;
1489         obj_request->which = BAD_WHICH;
1490         obj_request->type = type;
1491         INIT_LIST_HEAD(&obj_request->links);
1492         obj_request_done_init(obj_request);
1493         init_completion(&obj_request->completion);
1494         kref_init(&obj_request->kref);
1495
1496         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1497                 offset, length, (int)type, obj_request);
1498
1499         return obj_request;
1500 }
1501
1502 static void rbd_obj_request_destroy(struct kref *kref)
1503 {
1504         struct rbd_obj_request *obj_request;
1505
1506         obj_request = container_of(kref, struct rbd_obj_request, kref);
1507
1508         dout("%s: obj %p\n", __func__, obj_request);
1509
1510         rbd_assert(obj_request->img_request == NULL);
1511         rbd_assert(obj_request->which == BAD_WHICH);
1512
1513         if (obj_request->osd_req)
1514                 rbd_osd_req_destroy(obj_request->osd_req);
1515
1516         rbd_assert(obj_request_type_valid(obj_request->type));
1517         switch (obj_request->type) {
1518         case OBJ_REQUEST_NODATA:
1519                 break;          /* Nothing to do */
1520         case OBJ_REQUEST_BIO:
1521                 if (obj_request->bio_list)
1522                         bio_chain_put(obj_request->bio_list);
1523                 break;
1524         case OBJ_REQUEST_PAGES:
1525                 if (obj_request->pages)
1526                         ceph_release_page_vector(obj_request->pages,
1527                                                 obj_request->page_count);
1528                 break;
1529         }
1530
1531         kfree(obj_request);
1532 }
1533
1534 /*
1535  * Caller is responsible for filling in the list of object requests
1536  * that comprises the image request, and the Linux request pointer
1537  * (if there is one).
1538  */
1539 static struct rbd_img_request *rbd_img_request_create(
1540                                         struct rbd_device *rbd_dev,
1541                                         u64 offset, u64 length,
1542                                         bool write_request)
1543 {
1544         struct rbd_img_request *img_request;
1545         struct ceph_snap_context *snapc = NULL;
1546
1547         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1548         if (!img_request)
1549                 return NULL;
1550
1551         if (write_request) {
1552                 down_read(&rbd_dev->header_rwsem);
1553                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1554                 up_read(&rbd_dev->header_rwsem);
1555                 if (WARN_ON(!snapc)) {
1556                         kfree(img_request);
1557                         return NULL;    /* Shouldn't happen */
1558                 }
1559         }
1560
1561         img_request->rq = NULL;
1562         img_request->rbd_dev = rbd_dev;
1563         img_request->offset = offset;
1564         img_request->length = length;
1565         img_request->write_request = write_request;
1566         if (write_request)
1567                 img_request->snapc = snapc;
1568         else
1569                 img_request->snap_id = rbd_dev->spec->snap_id;
1570         spin_lock_init(&img_request->completion_lock);
1571         img_request->next_completion = 0;
1572         img_request->callback = NULL;
1573         img_request->obj_request_count = 0;
1574         INIT_LIST_HEAD(&img_request->obj_requests);
1575         kref_init(&img_request->kref);
1576
1577         rbd_img_request_get(img_request);       /* Avoid a warning */
1578         rbd_img_request_put(img_request);       /* TEMPORARY */
1579
1580         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1581                 write_request ? "write" : "read", offset, length,
1582                 img_request);
1583
1584         return img_request;
1585 }
1586
1587 static void rbd_img_request_destroy(struct kref *kref)
1588 {
1589         struct rbd_img_request *img_request;
1590         struct rbd_obj_request *obj_request;
1591         struct rbd_obj_request *next_obj_request;
1592
1593         img_request = container_of(kref, struct rbd_img_request, kref);
1594
1595         dout("%s: img %p\n", __func__, img_request);
1596
1597         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1598                 rbd_img_obj_request_del(img_request, obj_request);
1599         rbd_assert(img_request->obj_request_count == 0);
1600
1601         if (img_request->write_request)
1602                 ceph_put_snap_context(img_request->snapc);
1603
1604         kfree(img_request);
1605 }
1606
1607 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1608                                         struct bio *bio_list)
1609 {
1610         struct rbd_device *rbd_dev = img_request->rbd_dev;
1611         struct rbd_obj_request *obj_request = NULL;
1612         struct rbd_obj_request *next_obj_request;
1613         unsigned int bio_offset;
1614         u64 image_offset;
1615         u64 resid;
1616         u16 opcode;
1617
1618         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1619
1620         opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1621                                               : CEPH_OSD_OP_READ;
1622         bio_offset = 0;
1623         image_offset = img_request->offset;
1624         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1625         resid = img_request->length;
1626         rbd_assert(resid > 0);
1627         while (resid) {
1628                 const char *object_name;
1629                 unsigned int clone_size;
1630                 struct ceph_osd_req_op *op;
1631                 u64 offset;
1632                 u64 length;
1633
1634                 object_name = rbd_segment_name(rbd_dev, image_offset);
1635                 if (!object_name)
1636                         goto out_unwind;
1637                 offset = rbd_segment_offset(rbd_dev, image_offset);
1638                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1639                 obj_request = rbd_obj_request_create(object_name,
1640                                                 offset, length,
1641                                                 OBJ_REQUEST_BIO);
1642                 kfree(object_name);     /* object request has its own copy */
1643                 if (!obj_request)
1644                         goto out_unwind;
1645
1646                 rbd_assert(length <= (u64) UINT_MAX);
1647                 clone_size = (unsigned int) length;
1648                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1649                                                 &bio_offset, clone_size,
1650                                                 GFP_ATOMIC);
1651                 if (!obj_request->bio_list)
1652                         goto out_partial;
1653
1654                 /*
1655                  * Build up the op to use in building the osd
1656                  * request.  Note that the contents of the op are
1657                  * copied by rbd_osd_req_create().
1658                  */
1659                 op = rbd_osd_req_op_create(opcode, offset, length);
1660                 if (!op)
1661                         goto out_partial;
1662                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1663                                                 img_request->write_request,
1664                                                 obj_request, op);
1665                 rbd_osd_req_op_destroy(op);
1666                 if (!obj_request->osd_req)
1667                         goto out_partial;
1668                 /* status and version are initially zero-filled */
1669
1670                 rbd_img_obj_request_add(img_request, obj_request);
1671
1672                 image_offset += length;
1673                 resid -= length;
1674         }
1675
1676         return 0;
1677
1678 out_partial:
1679         rbd_obj_request_put(obj_request);
1680 out_unwind:
1681         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1682                 rbd_obj_request_put(obj_request);
1683
1684         return -ENOMEM;
1685 }
1686
1687 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1688 {
1689         struct rbd_img_request *img_request;
1690         u32 which = obj_request->which;
1691         bool more = true;
1692
1693         img_request = obj_request->img_request;
1694
1695         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1696         rbd_assert(img_request != NULL);
1697         rbd_assert(img_request->rq != NULL);
1698         rbd_assert(img_request->obj_request_count > 0);
1699         rbd_assert(which != BAD_WHICH);
1700         rbd_assert(which < img_request->obj_request_count);
1701         rbd_assert(which >= img_request->next_completion);
1702
1703         spin_lock_irq(&img_request->completion_lock);
1704         if (which != img_request->next_completion)
1705                 goto out;
1706
1707         for_each_obj_request_from(img_request, obj_request) {
1708                 unsigned int xferred;
1709                 int result;
1710
1711                 rbd_assert(more);
1712                 rbd_assert(which < img_request->obj_request_count);
1713
1714                 if (!obj_request_done_test(obj_request))
1715                         break;
1716
1717                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1718                 xferred = (unsigned int) obj_request->xferred;
1719                 result = (int) obj_request->result;
1720                 if (result)
1721                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1722                                 img_request->write_request ? "write" : "read",
1723                                 result, xferred);
1724
1725                 more = blk_end_request(img_request->rq, result, xferred);
1726                 which++;
1727         }
1728
1729         rbd_assert(more ^ (which == img_request->obj_request_count));
1730         img_request->next_completion = which;
1731 out:
1732         spin_unlock_irq(&img_request->completion_lock);
1733
1734         if (!more)
1735                 rbd_img_request_complete(img_request);
1736 }
1737
1738 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1739 {
1740         struct rbd_device *rbd_dev = img_request->rbd_dev;
1741         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1742         struct rbd_obj_request *obj_request;
1743         struct rbd_obj_request *next_obj_request;
1744
1745         dout("%s: img %p\n", __func__, img_request);
1746         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1747                 int ret;
1748
1749                 obj_request->callback = rbd_img_obj_callback;
1750                 ret = rbd_obj_request_submit(osdc, obj_request);
1751                 if (ret)
1752                         return ret;
1753                 /*
1754                  * The image request has its own reference to each
1755                  * of its object requests, so we can safely drop the
1756                  * initial one here.
1757                  */
1758                 rbd_obj_request_put(obj_request);
1759         }
1760
1761         return 0;
1762 }
1763
1764 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1765                                    u64 ver, u64 notify_id)
1766 {
1767         struct rbd_obj_request *obj_request;
1768         struct ceph_osd_req_op *op;
1769         struct ceph_osd_client *osdc;
1770         int ret;
1771
1772         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1773                                                         OBJ_REQUEST_NODATA);
1774         if (!obj_request)
1775                 return -ENOMEM;
1776
1777         ret = -ENOMEM;
1778         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1779         if (!op)
1780                 goto out;
1781         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1782                                                 obj_request, op);
1783         rbd_osd_req_op_destroy(op);
1784         if (!obj_request->osd_req)
1785                 goto out;
1786
1787         osdc = &rbd_dev->rbd_client->client->osdc;
1788         obj_request->callback = rbd_obj_request_put;
1789         ret = rbd_obj_request_submit(osdc, obj_request);
1790 out:
1791         if (ret)
1792                 rbd_obj_request_put(obj_request);
1793
1794         return ret;
1795 }
1796
1797 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1798 {
1799         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1800         u64 hver;
1801         int rc;
1802
1803         if (!rbd_dev)
1804                 return;
1805
1806         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1807                 rbd_dev->header_name, (unsigned long long) notify_id,
1808                 (unsigned int) opcode);
1809         rc = rbd_dev_refresh(rbd_dev, &hver);
1810         if (rc)
1811                 rbd_warn(rbd_dev, "got notification but failed to "
1812                            " update snaps: %d\n", rc);
1813
1814         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1815 }
1816
1817 /*
1818  * Request sync osd watch/unwatch.  The value of "start" determines
1819  * whether a watch request is being initiated or torn down.
1820  */
1821 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1822 {
1823         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1824         struct rbd_obj_request *obj_request;
1825         struct ceph_osd_req_op *op;
1826         int ret;
1827
1828         rbd_assert(start ^ !!rbd_dev->watch_event);
1829         rbd_assert(start ^ !!rbd_dev->watch_request);
1830
1831         if (start) {
1832                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1833                                                 &rbd_dev->watch_event);
1834                 if (ret < 0)
1835                         return ret;
1836                 rbd_assert(rbd_dev->watch_event != NULL);
1837         }
1838
1839         ret = -ENOMEM;
1840         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1841                                                         OBJ_REQUEST_NODATA);
1842         if (!obj_request)
1843                 goto out_cancel;
1844
1845         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1846                                 rbd_dev->watch_event->cookie,
1847                                 rbd_dev->header.obj_version, start);
1848         if (!op)
1849                 goto out_cancel;
1850         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1851                                                         obj_request, op);
1852         rbd_osd_req_op_destroy(op);
1853         if (!obj_request->osd_req)
1854                 goto out_cancel;
1855
1856         if (start)
1857                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1858         else
1859                 ceph_osdc_unregister_linger_request(osdc,
1860                                         rbd_dev->watch_request->osd_req);
1861         ret = rbd_obj_request_submit(osdc, obj_request);
1862         if (ret)
1863                 goto out_cancel;
1864         ret = rbd_obj_request_wait(obj_request);
1865         if (ret)
1866                 goto out_cancel;
1867         ret = obj_request->result;
1868         if (ret)
1869                 goto out_cancel;
1870
1871         /*
1872          * A watch request is set to linger, so the underlying osd
1873          * request won't go away until we unregister it.  We retain
1874          * a pointer to the object request during that time (in
1875          * rbd_dev->watch_request), so we'll keep a reference to
1876          * it.  We'll drop that reference (below) after we've
1877          * unregistered it.
1878          */
1879         if (start) {
1880                 rbd_dev->watch_request = obj_request;
1881
1882                 return 0;
1883         }
1884
1885         /* We have successfully torn down the watch request */
1886
1887         rbd_obj_request_put(rbd_dev->watch_request);
1888         rbd_dev->watch_request = NULL;
1889 out_cancel:
1890         /* Cancel the event if we're tearing down, or on error */
1891         ceph_osdc_cancel_event(rbd_dev->watch_event);
1892         rbd_dev->watch_event = NULL;
1893         if (obj_request)
1894                 rbd_obj_request_put(obj_request);
1895
1896         return ret;
1897 }
1898
1899 /*
1900  * Synchronous osd object method call
1901  */
1902 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1903                              const char *object_name,
1904                              const char *class_name,
1905                              const char *method_name,
1906                              const char *outbound,
1907                              size_t outbound_size,
1908                              char *inbound,
1909                              size_t inbound_size,
1910                              u64 *version)
1911 {
1912         struct rbd_obj_request *obj_request;
1913         struct ceph_osd_client *osdc;
1914         struct ceph_osd_req_op *op;
1915         struct page **pages;
1916         u32 page_count;
1917         int ret;
1918
1919         /*
1920          * Method calls are ultimately read operations but they
1921          * don't involve object data (so no offset or length).
1922          * The result should placed into the inbound buffer
1923          * provided.  They also supply outbound data--parameters for
1924          * the object method.  Currently if this is present it will
1925          * be a snapshot id.
1926          */
1927         page_count = (u32) calc_pages_for(0, inbound_size);
1928         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1929         if (IS_ERR(pages))
1930                 return PTR_ERR(pages);
1931
1932         ret = -ENOMEM;
1933         obj_request = rbd_obj_request_create(object_name, 0, 0,
1934                                                         OBJ_REQUEST_PAGES);
1935         if (!obj_request)
1936                 goto out;
1937
1938         obj_request->pages = pages;
1939         obj_request->page_count = page_count;
1940
1941         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1942                                         method_name, outbound, outbound_size);
1943         if (!op)
1944                 goto out;
1945         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1946                                                 obj_request, op);
1947         rbd_osd_req_op_destroy(op);
1948         if (!obj_request->osd_req)
1949                 goto out;
1950
1951         osdc = &rbd_dev->rbd_client->client->osdc;
1952         ret = rbd_obj_request_submit(osdc, obj_request);
1953         if (ret)
1954                 goto out;
1955         ret = rbd_obj_request_wait(obj_request);
1956         if (ret)
1957                 goto out;
1958
1959         ret = obj_request->result;
1960         if (ret < 0)
1961                 goto out;
1962         ret = 0;
1963         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1964         if (version)
1965                 *version = obj_request->version;
1966 out:
1967         if (obj_request)
1968                 rbd_obj_request_put(obj_request);
1969         else
1970                 ceph_release_page_vector(pages, page_count);
1971
1972         return ret;
1973 }
1974
1975 static void rbd_request_fn(struct request_queue *q)
1976                 __releases(q->queue_lock) __acquires(q->queue_lock)
1977 {
1978         struct rbd_device *rbd_dev = q->queuedata;
1979         bool read_only = rbd_dev->mapping.read_only;
1980         struct request *rq;
1981         int result;
1982
1983         while ((rq = blk_fetch_request(q))) {
1984                 bool write_request = rq_data_dir(rq) == WRITE;
1985                 struct rbd_img_request *img_request;
1986                 u64 offset;
1987                 u64 length;
1988
1989                 /* Ignore any non-FS requests that filter through. */
1990
1991                 if (rq->cmd_type != REQ_TYPE_FS) {
1992                         dout("%s: non-fs request type %d\n", __func__,
1993                                 (int) rq->cmd_type);
1994                         __blk_end_request_all(rq, 0);
1995                         continue;
1996                 }
1997
1998                 /* Ignore/skip any zero-length requests */
1999
2000                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2001                 length = (u64) blk_rq_bytes(rq);
2002
2003                 if (!length) {
2004                         dout("%s: zero-length request\n", __func__);
2005                         __blk_end_request_all(rq, 0);
2006                         continue;
2007                 }
2008
2009                 spin_unlock_irq(q->queue_lock);
2010
2011                 /* Disallow writes to a read-only device */
2012
2013                 if (write_request) {
2014                         result = -EROFS;
2015                         if (read_only)
2016                                 goto end_request;
2017                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2018                 }
2019
2020                 /*
2021                  * Quit early if the mapped snapshot no longer
2022                  * exists.  It's still possible the snapshot will
2023                  * have disappeared by the time our request arrives
2024                  * at the osd, but there's no sense in sending it if
2025                  * we already know.
2026                  */
2027                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2028                         dout("request for non-existent snapshot");
2029                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2030                         result = -ENXIO;
2031                         goto end_request;
2032                 }
2033
2034                 result = -EINVAL;
2035                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2036                         goto end_request;       /* Shouldn't happen */
2037
2038                 result = -ENOMEM;
2039                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2040                                                         write_request);
2041                 if (!img_request)
2042                         goto end_request;
2043
2044                 img_request->rq = rq;
2045
2046                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2047                 if (!result)
2048                         result = rbd_img_request_submit(img_request);
2049                 if (result)
2050                         rbd_img_request_put(img_request);
2051 end_request:
2052                 spin_lock_irq(q->queue_lock);
2053                 if (result < 0) {
2054                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
2055                                 write_request ? "write" : "read", result);
2056                         __blk_end_request_all(rq, result);
2057                 }
2058         }
2059 }
2060
2061 /*
2062  * a queue callback. Makes sure that we don't create a bio that spans across
2063  * multiple osd objects. One exception would be with a single page bios,
2064  * which we handle later at bio_chain_clone_range()
2065  */
2066 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2067                           struct bio_vec *bvec)
2068 {
2069         struct rbd_device *rbd_dev = q->queuedata;
2070         sector_t sector_offset;
2071         sector_t sectors_per_obj;
2072         sector_t obj_sector_offset;
2073         int ret;
2074
2075         /*
2076          * Find how far into its rbd object the partition-relative
2077          * bio start sector is to offset relative to the enclosing
2078          * device.
2079          */
2080         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2081         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2082         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2083
2084         /*
2085          * Compute the number of bytes from that offset to the end
2086          * of the object.  Account for what's already used by the bio.
2087          */
2088         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2089         if (ret > bmd->bi_size)
2090                 ret -= bmd->bi_size;
2091         else
2092                 ret = 0;
2093
2094         /*
2095          * Don't send back more than was asked for.  And if the bio
2096          * was empty, let the whole thing through because:  "Note
2097          * that a block device *must* allow a single page to be
2098          * added to an empty bio."
2099          */
2100         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2101         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2102                 ret = (int) bvec->bv_len;
2103
2104         return ret;
2105 }
2106
2107 static void rbd_free_disk(struct rbd_device *rbd_dev)
2108 {
2109         struct gendisk *disk = rbd_dev->disk;
2110
2111         if (!disk)
2112                 return;
2113
2114         if (disk->flags & GENHD_FL_UP)
2115                 del_gendisk(disk);
2116         if (disk->queue)
2117                 blk_cleanup_queue(disk->queue);
2118         put_disk(disk);
2119 }
2120
2121 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2122                                 const char *object_name,
2123                                 u64 offset, u64 length,
2124                                 char *buf, u64 *version)
2125
2126 {
2127         struct ceph_osd_req_op *op;
2128         struct rbd_obj_request *obj_request;
2129         struct ceph_osd_client *osdc;
2130         struct page **pages = NULL;
2131         u32 page_count;
2132         size_t size;
2133         int ret;
2134
2135         page_count = (u32) calc_pages_for(offset, length);
2136         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2137         if (IS_ERR(pages))
2138                 ret = PTR_ERR(pages);
2139
2140         ret = -ENOMEM;
2141         obj_request = rbd_obj_request_create(object_name, offset, length,
2142                                                         OBJ_REQUEST_PAGES);
2143         if (!obj_request)
2144                 goto out;
2145
2146         obj_request->pages = pages;
2147         obj_request->page_count = page_count;
2148
2149         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2150         if (!op)
2151                 goto out;
2152         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2153                                                 obj_request, op);
2154         rbd_osd_req_op_destroy(op);
2155         if (!obj_request->osd_req)
2156                 goto out;
2157
2158         osdc = &rbd_dev->rbd_client->client->osdc;
2159         ret = rbd_obj_request_submit(osdc, obj_request);
2160         if (ret)
2161                 goto out;
2162         ret = rbd_obj_request_wait(obj_request);
2163         if (ret)
2164                 goto out;
2165
2166         ret = obj_request->result;
2167         if (ret < 0)
2168                 goto out;
2169
2170         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2171         size = (size_t) obj_request->xferred;
2172         ceph_copy_from_page_vector(pages, buf, 0, size);
2173         rbd_assert(size <= (size_t) INT_MAX);
2174         ret = (int) size;
2175         if (version)
2176                 *version = obj_request->version;
2177 out:
2178         if (obj_request)
2179                 rbd_obj_request_put(obj_request);
2180         else
2181                 ceph_release_page_vector(pages, page_count);
2182
2183         return ret;
2184 }
2185
2186 /*
2187  * Read the complete header for the given rbd device.
2188  *
2189  * Returns a pointer to a dynamically-allocated buffer containing
2190  * the complete and validated header.  Caller can pass the address
2191  * of a variable that will be filled in with the version of the
2192  * header object at the time it was read.
2193  *
2194  * Returns a pointer-coded errno if a failure occurs.
2195  */
2196 static struct rbd_image_header_ondisk *
2197 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2198 {
2199         struct rbd_image_header_ondisk *ondisk = NULL;
2200         u32 snap_count = 0;
2201         u64 names_size = 0;
2202         u32 want_count;
2203         int ret;
2204
2205         /*
2206          * The complete header will include an array of its 64-bit
2207          * snapshot ids, followed by the names of those snapshots as
2208          * a contiguous block of NUL-terminated strings.  Note that
2209          * the number of snapshots could change by the time we read
2210          * it in, in which case we re-read it.
2211          */
2212         do {
2213                 size_t size;
2214
2215                 kfree(ondisk);
2216
2217                 size = sizeof (*ondisk);
2218                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2219                 size += names_size;
2220                 ondisk = kmalloc(size, GFP_KERNEL);
2221                 if (!ondisk)
2222                         return ERR_PTR(-ENOMEM);
2223
2224                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2225                                        0, size,
2226                                        (char *) ondisk, version);
2227                 if (ret < 0)
2228                         goto out_err;
2229                 if (WARN_ON((size_t) ret < size)) {
2230                         ret = -ENXIO;
2231                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2232                                 size, ret);
2233                         goto out_err;
2234                 }
2235                 if (!rbd_dev_ondisk_valid(ondisk)) {
2236                         ret = -ENXIO;
2237                         rbd_warn(rbd_dev, "invalid header");
2238                         goto out_err;
2239                 }
2240
2241                 names_size = le64_to_cpu(ondisk->snap_names_len);
2242                 want_count = snap_count;
2243                 snap_count = le32_to_cpu(ondisk->snap_count);
2244         } while (snap_count != want_count);
2245
2246         return ondisk;
2247
2248 out_err:
2249         kfree(ondisk);
2250
2251         return ERR_PTR(ret);
2252 }
2253
2254 /*
2255  * reload the ondisk the header
2256  */
2257 static int rbd_read_header(struct rbd_device *rbd_dev,
2258                            struct rbd_image_header *header)
2259 {
2260         struct rbd_image_header_ondisk *ondisk;
2261         u64 ver = 0;
2262         int ret;
2263
2264         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2265         if (IS_ERR(ondisk))
2266                 return PTR_ERR(ondisk);
2267         ret = rbd_header_from_disk(header, ondisk);
2268         if (ret >= 0)
2269                 header->obj_version = ver;
2270         kfree(ondisk);
2271
2272         return ret;
2273 }
2274
2275 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2276 {
2277         struct rbd_snap *snap;
2278         struct rbd_snap *next;
2279
2280         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2281                 rbd_remove_snap_dev(snap);
2282 }
2283
2284 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2285 {
2286         sector_t size;
2287
2288         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2289                 return;
2290
2291         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2292         dout("setting size to %llu sectors", (unsigned long long) size);
2293         rbd_dev->mapping.size = (u64) size;
2294         set_capacity(rbd_dev->disk, size);
2295 }
2296
2297 /*
2298  * only read the first part of the ondisk header, without the snaps info
2299  */
2300 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2301 {
2302         int ret;
2303         struct rbd_image_header h;
2304
2305         ret = rbd_read_header(rbd_dev, &h);
2306         if (ret < 0)
2307                 return ret;
2308
2309         down_write(&rbd_dev->header_rwsem);
2310
2311         /* Update image size, and check for resize of mapped image */
2312         rbd_dev->header.image_size = h.image_size;
2313         rbd_update_mapping_size(rbd_dev);
2314
2315         /* rbd_dev->header.object_prefix shouldn't change */
2316         kfree(rbd_dev->header.snap_sizes);
2317         kfree(rbd_dev->header.snap_names);
2318         /* osd requests may still refer to snapc */
2319         ceph_put_snap_context(rbd_dev->header.snapc);
2320
2321         if (hver)
2322                 *hver = h.obj_version;
2323         rbd_dev->header.obj_version = h.obj_version;
2324         rbd_dev->header.image_size = h.image_size;
2325         rbd_dev->header.snapc = h.snapc;
2326         rbd_dev->header.snap_names = h.snap_names;
2327         rbd_dev->header.snap_sizes = h.snap_sizes;
2328         /* Free the extra copy of the object prefix */
2329         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2330         kfree(h.object_prefix);
2331
2332         ret = rbd_dev_snaps_update(rbd_dev);
2333         if (!ret)
2334                 ret = rbd_dev_snaps_register(rbd_dev);
2335
2336         up_write(&rbd_dev->header_rwsem);
2337
2338         return ret;
2339 }
2340
2341 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2342 {
2343         int ret;
2344
2345         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2346         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2347         if (rbd_dev->image_format == 1)
2348                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2349         else
2350                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2351         mutex_unlock(&ctl_mutex);
2352
2353         return ret;
2354 }
2355
2356 static int rbd_init_disk(struct rbd_device *rbd_dev)
2357 {
2358         struct gendisk *disk;
2359         struct request_queue *q;
2360         u64 segment_size;
2361
2362         /* create gendisk info */
2363         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2364         if (!disk)
2365                 return -ENOMEM;
2366
2367         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2368                  rbd_dev->dev_id);
2369         disk->major = rbd_dev->major;
2370         disk->first_minor = 0;
2371         disk->fops = &rbd_bd_ops;
2372         disk->private_data = rbd_dev;
2373
2374         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2375         if (!q)
2376                 goto out_disk;
2377
2378         /* We use the default size, but let's be explicit about it. */
2379         blk_queue_physical_block_size(q, SECTOR_SIZE);
2380
2381         /* set io sizes to object size */
2382         segment_size = rbd_obj_bytes(&rbd_dev->header);
2383         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2384         blk_queue_max_segment_size(q, segment_size);
2385         blk_queue_io_min(q, segment_size);
2386         blk_queue_io_opt(q, segment_size);
2387
2388         blk_queue_merge_bvec(q, rbd_merge_bvec);
2389         disk->queue = q;
2390
2391         q->queuedata = rbd_dev;
2392
2393         rbd_dev->disk = disk;
2394
2395         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2396
2397         return 0;
2398 out_disk:
2399         put_disk(disk);
2400
2401         return -ENOMEM;
2402 }
2403
2404 /*
2405   sysfs
2406 */
2407
2408 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2409 {
2410         return container_of(dev, struct rbd_device, dev);
2411 }
2412
2413 static ssize_t rbd_size_show(struct device *dev,
2414                              struct device_attribute *attr, char *buf)
2415 {
2416         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2417         sector_t size;
2418
2419         down_read(&rbd_dev->header_rwsem);
2420         size = get_capacity(rbd_dev->disk);
2421         up_read(&rbd_dev->header_rwsem);
2422
2423         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2424 }
2425
2426 /*
2427  * Note this shows the features for whatever's mapped, which is not
2428  * necessarily the base image.
2429  */
2430 static ssize_t rbd_features_show(struct device *dev,
2431                              struct device_attribute *attr, char *buf)
2432 {
2433         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2434
2435         return sprintf(buf, "0x%016llx\n",
2436                         (unsigned long long) rbd_dev->mapping.features);
2437 }
2438
2439 static ssize_t rbd_major_show(struct device *dev,
2440                               struct device_attribute *attr, char *buf)
2441 {
2442         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2443
2444         return sprintf(buf, "%d\n", rbd_dev->major);
2445 }
2446
2447 static ssize_t rbd_client_id_show(struct device *dev,
2448                                   struct device_attribute *attr, char *buf)
2449 {
2450         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2451
2452         return sprintf(buf, "client%lld\n",
2453                         ceph_client_id(rbd_dev->rbd_client->client));
2454 }
2455
2456 static ssize_t rbd_pool_show(struct device *dev,
2457                              struct device_attribute *attr, char *buf)
2458 {
2459         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2460
2461         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2462 }
2463
2464 static ssize_t rbd_pool_id_show(struct device *dev,
2465                              struct device_attribute *attr, char *buf)
2466 {
2467         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2468
2469         return sprintf(buf, "%llu\n",
2470                 (unsigned long long) rbd_dev->spec->pool_id);
2471 }
2472
2473 static ssize_t rbd_name_show(struct device *dev,
2474                              struct device_attribute *attr, char *buf)
2475 {
2476         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2477
2478         if (rbd_dev->spec->image_name)
2479                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2480
2481         return sprintf(buf, "(unknown)\n");
2482 }
2483
2484 static ssize_t rbd_image_id_show(struct device *dev,
2485                              struct device_attribute *attr, char *buf)
2486 {
2487         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2488
2489         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2490 }
2491
2492 /*
2493  * Shows the name of the currently-mapped snapshot (or
2494  * RBD_SNAP_HEAD_NAME for the base image).
2495  */
2496 static ssize_t rbd_snap_show(struct device *dev,
2497                              struct device_attribute *attr,
2498                              char *buf)
2499 {
2500         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2501
2502         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2503 }
2504
2505 /*
2506  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2507  * for the parent image.  If there is no parent, simply shows
2508  * "(no parent image)".
2509  */
2510 static ssize_t rbd_parent_show(struct device *dev,
2511                              struct device_attribute *attr,
2512                              char *buf)
2513 {
2514         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2515         struct rbd_spec *spec = rbd_dev->parent_spec;
2516         int count;
2517         char *bufp = buf;
2518
2519         if (!spec)
2520                 return sprintf(buf, "(no parent image)\n");
2521
2522         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2523                         (unsigned long long) spec->pool_id, spec->pool_name);
2524         if (count < 0)
2525                 return count;
2526         bufp += count;
2527
2528         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2529                         spec->image_name ? spec->image_name : "(unknown)");
2530         if (count < 0)
2531                 return count;
2532         bufp += count;
2533
2534         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2535                         (unsigned long long) spec->snap_id, spec->snap_name);
2536         if (count < 0)
2537                 return count;
2538         bufp += count;
2539
2540         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2541         if (count < 0)
2542                 return count;
2543         bufp += count;
2544
2545         return (ssize_t) (bufp - buf);
2546 }
2547
2548 static ssize_t rbd_image_refresh(struct device *dev,
2549                                  struct device_attribute *attr,
2550                                  const char *buf,
2551                                  size_t size)
2552 {
2553         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2554         int ret;
2555
2556         ret = rbd_dev_refresh(rbd_dev, NULL);
2557
2558         return ret < 0 ? ret : size;
2559 }
2560
2561 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2562 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2563 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2564 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2565 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2566 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2567 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2568 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2569 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2570 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2571 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2572
2573 static struct attribute *rbd_attrs[] = {
2574         &dev_attr_size.attr,
2575         &dev_attr_features.attr,
2576         &dev_attr_major.attr,
2577         &dev_attr_client_id.attr,
2578         &dev_attr_pool.attr,
2579         &dev_attr_pool_id.attr,
2580         &dev_attr_name.attr,
2581         &dev_attr_image_id.attr,
2582         &dev_attr_current_snap.attr,
2583         &dev_attr_parent.attr,
2584         &dev_attr_refresh.attr,
2585         NULL
2586 };
2587
2588 static struct attribute_group rbd_attr_group = {
2589         .attrs = rbd_attrs,
2590 };
2591
2592 static const struct attribute_group *rbd_attr_groups[] = {
2593         &rbd_attr_group,
2594         NULL
2595 };
2596
2597 static void rbd_sysfs_dev_release(struct device *dev)
2598 {
2599 }
2600
2601 static struct device_type rbd_device_type = {
2602         .name           = "rbd",
2603         .groups         = rbd_attr_groups,
2604         .release        = rbd_sysfs_dev_release,
2605 };
2606
2607
2608 /*
2609   sysfs - snapshots
2610 */
2611
2612 static ssize_t rbd_snap_size_show(struct device *dev,
2613                                   struct device_attribute *attr,
2614                                   char *buf)
2615 {
2616         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2617
2618         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2619 }
2620
2621 static ssize_t rbd_snap_id_show(struct device *dev,
2622                                 struct device_attribute *attr,
2623                                 char *buf)
2624 {
2625         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2626
2627         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2628 }
2629
2630 static ssize_t rbd_snap_features_show(struct device *dev,
2631                                 struct device_attribute *attr,
2632                                 char *buf)
2633 {
2634         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2635
2636         return sprintf(buf, "0x%016llx\n",
2637                         (unsigned long long) snap->features);
2638 }
2639
2640 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2641 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2642 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2643
2644 static struct attribute *rbd_snap_attrs[] = {
2645         &dev_attr_snap_size.attr,
2646         &dev_attr_snap_id.attr,
2647         &dev_attr_snap_features.attr,
2648         NULL,
2649 };
2650
2651 static struct attribute_group rbd_snap_attr_group = {
2652         .attrs = rbd_snap_attrs,
2653 };
2654
2655 static void rbd_snap_dev_release(struct device *dev)
2656 {
2657         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2658         kfree(snap->name);
2659         kfree(snap);
2660 }
2661
2662 static const struct attribute_group *rbd_snap_attr_groups[] = {
2663         &rbd_snap_attr_group,
2664         NULL
2665 };
2666
2667 static struct device_type rbd_snap_device_type = {
2668         .groups         = rbd_snap_attr_groups,
2669         .release        = rbd_snap_dev_release,
2670 };
2671
2672 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2673 {
2674         kref_get(&spec->kref);
2675
2676         return spec;
2677 }
2678
2679 static void rbd_spec_free(struct kref *kref);
2680 static void rbd_spec_put(struct rbd_spec *spec)
2681 {
2682         if (spec)
2683                 kref_put(&spec->kref, rbd_spec_free);
2684 }
2685
2686 static struct rbd_spec *rbd_spec_alloc(void)
2687 {
2688         struct rbd_spec *spec;
2689
2690         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2691         if (!spec)
2692                 return NULL;
2693         kref_init(&spec->kref);
2694
2695         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2696
2697         return spec;
2698 }
2699
2700 static void rbd_spec_free(struct kref *kref)
2701 {
2702         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2703
2704         kfree(spec->pool_name);
2705         kfree(spec->image_id);
2706         kfree(spec->image_name);
2707         kfree(spec->snap_name);
2708         kfree(spec);
2709 }
2710
2711 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2712                                 struct rbd_spec *spec)
2713 {
2714         struct rbd_device *rbd_dev;
2715
2716         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2717         if (!rbd_dev)
2718                 return NULL;
2719
2720         spin_lock_init(&rbd_dev->lock);
2721         rbd_dev->flags = 0;
2722         INIT_LIST_HEAD(&rbd_dev->node);
2723         INIT_LIST_HEAD(&rbd_dev->snaps);
2724         init_rwsem(&rbd_dev->header_rwsem);
2725
2726         rbd_dev->spec = spec;
2727         rbd_dev->rbd_client = rbdc;
2728
2729         /* Initialize the layout used for all rbd requests */
2730
2731         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2732         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2733         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2734         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2735
2736         return rbd_dev;
2737 }
2738
2739 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2740 {
2741         rbd_spec_put(rbd_dev->parent_spec);
2742         kfree(rbd_dev->header_name);
2743         rbd_put_client(rbd_dev->rbd_client);
2744         rbd_spec_put(rbd_dev->spec);
2745         kfree(rbd_dev);
2746 }
2747
2748 static bool rbd_snap_registered(struct rbd_snap *snap)
2749 {
2750         bool ret = snap->dev.type == &rbd_snap_device_type;
2751         bool reg = device_is_registered(&snap->dev);
2752
2753         rbd_assert(!ret ^ reg);
2754
2755         return ret;
2756 }
2757
2758 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2759 {
2760         list_del(&snap->node);
2761         if (device_is_registered(&snap->dev))
2762                 device_unregister(&snap->dev);
2763 }
2764
2765 static int rbd_register_snap_dev(struct rbd_snap *snap,
2766                                   struct device *parent)
2767 {
2768         struct device *dev = &snap->dev;
2769         int ret;
2770
2771         dev->type = &rbd_snap_device_type;
2772         dev->parent = parent;
2773         dev->release = rbd_snap_dev_release;
2774         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2775         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2776
2777         ret = device_register(dev);
2778
2779         return ret;
2780 }
2781
2782 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2783                                                 const char *snap_name,
2784                                                 u64 snap_id, u64 snap_size,
2785                                                 u64 snap_features)
2786 {
2787         struct rbd_snap *snap;
2788         int ret;
2789
2790         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2791         if (!snap)
2792                 return ERR_PTR(-ENOMEM);
2793
2794         ret = -ENOMEM;
2795         snap->name = kstrdup(snap_name, GFP_KERNEL);
2796         if (!snap->name)
2797                 goto err;
2798
2799         snap->id = snap_id;
2800         snap->size = snap_size;
2801         snap->features = snap_features;
2802
2803         return snap;
2804
2805 err:
2806         kfree(snap->name);
2807         kfree(snap);
2808
2809         return ERR_PTR(ret);
2810 }
2811
2812 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2813                 u64 *snap_size, u64 *snap_features)
2814 {
2815         char *snap_name;
2816
2817         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2818
2819         *snap_size = rbd_dev->header.snap_sizes[which];
2820         *snap_features = 0;     /* No features for v1 */
2821
2822         /* Skip over names until we find the one we are looking for */
2823
2824         snap_name = rbd_dev->header.snap_names;
2825         while (which--)
2826                 snap_name += strlen(snap_name) + 1;
2827
2828         return snap_name;
2829 }
2830
2831 /*
2832  * Get the size and object order for an image snapshot, or if
2833  * snap_id is CEPH_NOSNAP, gets this information for the base
2834  * image.
2835  */
2836 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2837                                 u8 *order, u64 *snap_size)
2838 {
2839         __le64 snapid = cpu_to_le64(snap_id);
2840         int ret;
2841         struct {
2842                 u8 order;
2843                 __le64 size;
2844         } __attribute__ ((packed)) size_buf = { 0 };
2845
2846         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2847                                 "rbd", "get_size",
2848                                 (char *) &snapid, sizeof (snapid),
2849                                 (char *) &size_buf, sizeof (size_buf), NULL);
2850         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2851         if (ret < 0)
2852                 return ret;
2853
2854         *order = size_buf.order;
2855         *snap_size = le64_to_cpu(size_buf.size);
2856
2857         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2858                 (unsigned long long) snap_id, (unsigned int) *order,
2859                 (unsigned long long) *snap_size);
2860
2861         return 0;
2862 }
2863
2864 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2865 {
2866         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2867                                         &rbd_dev->header.obj_order,
2868                                         &rbd_dev->header.image_size);
2869 }
2870
2871 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2872 {
2873         void *reply_buf;
2874         int ret;
2875         void *p;
2876
2877         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2878         if (!reply_buf)
2879                 return -ENOMEM;
2880
2881         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2882                                 "rbd", "get_object_prefix",
2883                                 NULL, 0,
2884                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2885         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2886         if (ret < 0)
2887                 goto out;
2888
2889         p = reply_buf;
2890         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2891                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2892                                                 NULL, GFP_NOIO);
2893
2894         if (IS_ERR(rbd_dev->header.object_prefix)) {
2895                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2896                 rbd_dev->header.object_prefix = NULL;
2897         } else {
2898                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2899         }
2900
2901 out:
2902         kfree(reply_buf);
2903
2904         return ret;
2905 }
2906
2907 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2908                 u64 *snap_features)
2909 {
2910         __le64 snapid = cpu_to_le64(snap_id);
2911         struct {
2912                 __le64 features;
2913                 __le64 incompat;
2914         } features_buf = { 0 };
2915         u64 incompat;
2916         int ret;
2917
2918         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2919                                 "rbd", "get_features",
2920                                 (char *) &snapid, sizeof (snapid),
2921                                 (char *) &features_buf, sizeof (features_buf),
2922                                 NULL);
2923         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2924         if (ret < 0)
2925                 return ret;
2926
2927         incompat = le64_to_cpu(features_buf.incompat);
2928         if (incompat & ~RBD_FEATURES_ALL)
2929                 return -ENXIO;
2930
2931         *snap_features = le64_to_cpu(features_buf.features);
2932
2933         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2934                 (unsigned long long) snap_id,
2935                 (unsigned long long) *snap_features,
2936                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2937
2938         return 0;
2939 }
2940
2941 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2942 {
2943         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2944                                                 &rbd_dev->header.features);
2945 }
2946
2947 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2948 {
2949         struct rbd_spec *parent_spec;
2950         size_t size;
2951         void *reply_buf = NULL;
2952         __le64 snapid;
2953         void *p;
2954         void *end;
2955         char *image_id;
2956         u64 overlap;
2957         int ret;
2958
2959         parent_spec = rbd_spec_alloc();
2960         if (!parent_spec)
2961                 return -ENOMEM;
2962
2963         size = sizeof (__le64) +                                /* pool_id */
2964                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2965                 sizeof (__le64) +                               /* snap_id */
2966                 sizeof (__le64);                                /* overlap */
2967         reply_buf = kmalloc(size, GFP_KERNEL);
2968         if (!reply_buf) {
2969                 ret = -ENOMEM;
2970                 goto out_err;
2971         }
2972
2973         snapid = cpu_to_le64(CEPH_NOSNAP);
2974         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2975                                 "rbd", "get_parent",
2976                                 (char *) &snapid, sizeof (snapid),
2977                                 (char *) reply_buf, size, NULL);
2978         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2979         if (ret < 0)
2980                 goto out_err;
2981
2982         ret = -ERANGE;
2983         p = reply_buf;
2984         end = (char *) reply_buf + size;
2985         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2986         if (parent_spec->pool_id == CEPH_NOPOOL)
2987                 goto out;       /* No parent?  No problem. */
2988
2989         /* The ceph file layout needs to fit pool id in 32 bits */
2990
2991         ret = -EIO;
2992         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2993                 goto out;
2994
2995         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2996         if (IS_ERR(image_id)) {
2997                 ret = PTR_ERR(image_id);
2998                 goto out_err;
2999         }
3000         parent_spec->image_id = image_id;
3001         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3002         ceph_decode_64_safe(&p, end, overlap, out_err);
3003
3004         rbd_dev->parent_overlap = overlap;
3005         rbd_dev->parent_spec = parent_spec;
3006         parent_spec = NULL;     /* rbd_dev now owns this */
3007 out:
3008         ret = 0;
3009 out_err:
3010         kfree(reply_buf);
3011         rbd_spec_put(parent_spec);
3012
3013         return ret;
3014 }
3015
3016 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3017 {
3018         size_t image_id_size;
3019         char *image_id;
3020         void *p;
3021         void *end;
3022         size_t size;
3023         void *reply_buf = NULL;
3024         size_t len = 0;
3025         char *image_name = NULL;
3026         int ret;
3027
3028         rbd_assert(!rbd_dev->spec->image_name);
3029
3030         len = strlen(rbd_dev->spec->image_id);
3031         image_id_size = sizeof (__le32) + len;
3032         image_id = kmalloc(image_id_size, GFP_KERNEL);
3033         if (!image_id)
3034                 return NULL;
3035
3036         p = image_id;
3037         end = (char *) image_id + image_id_size;
3038         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3039
3040         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3041         reply_buf = kmalloc(size, GFP_KERNEL);
3042         if (!reply_buf)
3043                 goto out;
3044
3045         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3046                                 "rbd", "dir_get_name",
3047                                 image_id, image_id_size,
3048                                 (char *) reply_buf, size, NULL);
3049         if (ret < 0)
3050                 goto out;
3051         p = reply_buf;
3052         end = (char *) reply_buf + size;
3053         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3054         if (IS_ERR(image_name))
3055                 image_name = NULL;
3056         else
3057                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3058 out:
3059         kfree(reply_buf);
3060         kfree(image_id);
3061
3062         return image_name;
3063 }
3064
3065 /*
3066  * When a parent image gets probed, we only have the pool, image,
3067  * and snapshot ids but not the names of any of them.  This call
3068  * is made later to fill in those names.  It has to be done after
3069  * rbd_dev_snaps_update() has completed because some of the
3070  * information (in particular, snapshot name) is not available
3071  * until then.
3072  */
3073 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3074 {
3075         struct ceph_osd_client *osdc;
3076         const char *name;
3077         void *reply_buf = NULL;
3078         int ret;
3079
3080         if (rbd_dev->spec->pool_name)
3081                 return 0;       /* Already have the names */
3082
3083         /* Look up the pool name */
3084
3085         osdc = &rbd_dev->rbd_client->client->osdc;
3086         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3087         if (!name) {
3088                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3089                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3090                 return -EIO;
3091         }
3092
3093         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3094         if (!rbd_dev->spec->pool_name)
3095                 return -ENOMEM;
3096
3097         /* Fetch the image name; tolerate failure here */
3098
3099         name = rbd_dev_image_name(rbd_dev);
3100         if (name)
3101                 rbd_dev->spec->image_name = (char *) name;
3102         else
3103                 rbd_warn(rbd_dev, "unable to get image name");
3104
3105         /* Look up the snapshot name. */
3106
3107         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3108         if (!name) {
3109                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3110                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3111                 ret = -EIO;
3112                 goto out_err;
3113         }
3114         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3115         if(!rbd_dev->spec->snap_name)
3116                 goto out_err;
3117
3118         return 0;
3119 out_err:
3120         kfree(reply_buf);
3121         kfree(rbd_dev->spec->pool_name);
3122         rbd_dev->spec->pool_name = NULL;
3123
3124         return ret;
3125 }
3126
3127 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3128 {
3129         size_t size;
3130         int ret;
3131         void *reply_buf;
3132         void *p;
3133         void *end;
3134         u64 seq;
3135         u32 snap_count;
3136         struct ceph_snap_context *snapc;
3137         u32 i;
3138
3139         /*
3140          * We'll need room for the seq value (maximum snapshot id),
3141          * snapshot count, and array of that many snapshot ids.
3142          * For now we have a fixed upper limit on the number we're
3143          * prepared to receive.
3144          */
3145         size = sizeof (__le64) + sizeof (__le32) +
3146                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3147         reply_buf = kzalloc(size, GFP_KERNEL);
3148         if (!reply_buf)
3149                 return -ENOMEM;
3150
3151         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3152                                 "rbd", "get_snapcontext",
3153                                 NULL, 0,
3154                                 reply_buf, size, ver);
3155         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3156         if (ret < 0)
3157                 goto out;
3158
3159         ret = -ERANGE;
3160         p = reply_buf;
3161         end = (char *) reply_buf + size;
3162         ceph_decode_64_safe(&p, end, seq, out);
3163         ceph_decode_32_safe(&p, end, snap_count, out);
3164
3165         /*
3166          * Make sure the reported number of snapshot ids wouldn't go
3167          * beyond the end of our buffer.  But before checking that,
3168          * make sure the computed size of the snapshot context we
3169          * allocate is representable in a size_t.
3170          */
3171         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3172                                  / sizeof (u64)) {
3173                 ret = -EINVAL;
3174                 goto out;
3175         }
3176         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3177                 goto out;
3178
3179         size = sizeof (struct ceph_snap_context) +
3180                                 snap_count * sizeof (snapc->snaps[0]);
3181         snapc = kmalloc(size, GFP_KERNEL);
3182         if (!snapc) {
3183                 ret = -ENOMEM;
3184                 goto out;
3185         }
3186
3187         atomic_set(&snapc->nref, 1);
3188         snapc->seq = seq;
3189         snapc->num_snaps = snap_count;
3190         for (i = 0; i < snap_count; i++)
3191                 snapc->snaps[i] = ceph_decode_64(&p);
3192
3193         rbd_dev->header.snapc = snapc;
3194
3195         dout("  snap context seq = %llu, snap_count = %u\n",
3196                 (unsigned long long) seq, (unsigned int) snap_count);
3197
3198 out:
3199         kfree(reply_buf);
3200
3201         return 0;
3202 }
3203
3204 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3205 {
3206         size_t size;
3207         void *reply_buf;
3208         __le64 snap_id;
3209         int ret;
3210         void *p;
3211         void *end;
3212         char *snap_name;
3213
3214         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3215         reply_buf = kmalloc(size, GFP_KERNEL);
3216         if (!reply_buf)
3217                 return ERR_PTR(-ENOMEM);
3218
3219         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3220         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3221                                 "rbd", "get_snapshot_name",
3222                                 (char *) &snap_id, sizeof (snap_id),
3223                                 reply_buf, size, NULL);
3224         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3225         if (ret < 0)
3226                 goto out;
3227
3228         p = reply_buf;
3229         end = (char *) reply_buf + size;
3230         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3231         if (IS_ERR(snap_name)) {
3232                 ret = PTR_ERR(snap_name);
3233                 goto out;
3234         } else {
3235                 dout("  snap_id 0x%016llx snap_name = %s\n",
3236                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3237         }
3238         kfree(reply_buf);
3239
3240         return snap_name;
3241 out:
3242         kfree(reply_buf);
3243
3244         return ERR_PTR(ret);
3245 }
3246
3247 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3248                 u64 *snap_size, u64 *snap_features)
3249 {
3250         u64 snap_id;
3251         u8 order;
3252         int ret;
3253
3254         snap_id = rbd_dev->header.snapc->snaps[which];
3255         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3256         if (ret)
3257                 return ERR_PTR(ret);
3258         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3259         if (ret)
3260                 return ERR_PTR(ret);
3261
3262         return rbd_dev_v2_snap_name(rbd_dev, which);
3263 }
3264
3265 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3266                 u64 *snap_size, u64 *snap_features)
3267 {
3268         if (rbd_dev->image_format == 1)
3269                 return rbd_dev_v1_snap_info(rbd_dev, which,
3270                                         snap_size, snap_features);
3271         if (rbd_dev->image_format == 2)
3272                 return rbd_dev_v2_snap_info(rbd_dev, which,
3273                                         snap_size, snap_features);
3274         return ERR_PTR(-EINVAL);
3275 }
3276
3277 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3278 {
3279         int ret;
3280         __u8 obj_order;
3281
3282         down_write(&rbd_dev->header_rwsem);
3283
3284         /* Grab old order first, to see if it changes */
3285
3286         obj_order = rbd_dev->header.obj_order,
3287         ret = rbd_dev_v2_image_size(rbd_dev);
3288         if (ret)
3289                 goto out;
3290         if (rbd_dev->header.obj_order != obj_order) {
3291                 ret = -EIO;
3292                 goto out;
3293         }
3294         rbd_update_mapping_size(rbd_dev);
3295
3296         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3297         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3298         if (ret)
3299                 goto out;
3300         ret = rbd_dev_snaps_update(rbd_dev);
3301         dout("rbd_dev_snaps_update returned %d\n", ret);
3302         if (ret)
3303                 goto out;
3304         ret = rbd_dev_snaps_register(rbd_dev);
3305         dout("rbd_dev_snaps_register returned %d\n", ret);
3306 out:
3307         up_write(&rbd_dev->header_rwsem);
3308
3309         return ret;
3310 }
3311
3312 /*
3313  * Scan the rbd device's current snapshot list and compare it to the
3314  * newly-received snapshot context.  Remove any existing snapshots
3315  * not present in the new snapshot context.  Add a new snapshot for
3316  * any snaphots in the snapshot context not in the current list.
3317  * And verify there are no changes to snapshots we already know
3318  * about.
3319  *
3320  * Assumes the snapshots in the snapshot context are sorted by
3321  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3322  * are also maintained in that order.)
3323  */
3324 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3325 {
3326         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3327         const u32 snap_count = snapc->num_snaps;
3328         struct list_head *head = &rbd_dev->snaps;
3329         struct list_head *links = head->next;
3330         u32 index = 0;
3331
3332         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3333         while (index < snap_count || links != head) {
3334                 u64 snap_id;
3335                 struct rbd_snap *snap;
3336                 char *snap_name;
3337                 u64 snap_size = 0;
3338                 u64 snap_features = 0;
3339
3340                 snap_id = index < snap_count ? snapc->snaps[index]
3341                                              : CEPH_NOSNAP;
3342                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3343                                      : NULL;
3344                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3345
3346                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3347                         struct list_head *next = links->next;
3348
3349                         /*
3350                          * A previously-existing snapshot is not in
3351                          * the new snap context.
3352                          *
3353                          * If the now missing snapshot is the one the
3354                          * image is mapped to, clear its exists flag
3355                          * so we can avoid sending any more requests
3356                          * to it.
3357                          */
3358                         if (rbd_dev->spec->snap_id == snap->id)
3359                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3360                         rbd_remove_snap_dev(snap);
3361                         dout("%ssnap id %llu has been removed\n",
3362                                 rbd_dev->spec->snap_id == snap->id ?
3363                                                         "mapped " : "",
3364                                 (unsigned long long) snap->id);
3365
3366                         /* Done with this list entry; advance */
3367
3368                         links = next;
3369                         continue;
3370                 }
3371
3372                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3373                                         &snap_size, &snap_features);
3374                 if (IS_ERR(snap_name))
3375                         return PTR_ERR(snap_name);
3376
3377                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3378                         (unsigned long long) snap_id);
3379                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3380                         struct rbd_snap *new_snap;
3381
3382                         /* We haven't seen this snapshot before */
3383
3384                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3385                                         snap_id, snap_size, snap_features);
3386                         if (IS_ERR(new_snap)) {
3387                                 int err = PTR_ERR(new_snap);
3388
3389                                 dout("  failed to add dev, error %d\n", err);
3390
3391                                 return err;
3392                         }
3393
3394                         /* New goes before existing, or at end of list */
3395
3396                         dout("  added dev%s\n", snap ? "" : " at end\n");
3397                         if (snap)
3398                                 list_add_tail(&new_snap->node, &snap->node);
3399                         else
3400                                 list_add_tail(&new_snap->node, head);
3401                 } else {
3402                         /* Already have this one */
3403
3404                         dout("  already present\n");
3405
3406                         rbd_assert(snap->size == snap_size);
3407                         rbd_assert(!strcmp(snap->name, snap_name));
3408                         rbd_assert(snap->features == snap_features);
3409
3410                         /* Done with this list entry; advance */
3411
3412                         links = links->next;
3413                 }
3414
3415                 /* Advance to the next entry in the snapshot context */
3416
3417                 index++;
3418         }
3419         dout("%s: done\n", __func__);
3420
3421         return 0;
3422 }
3423
3424 /*
3425  * Scan the list of snapshots and register the devices for any that
3426  * have not already been registered.
3427  */
3428 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3429 {
3430         struct rbd_snap *snap;
3431         int ret = 0;
3432
3433         dout("%s:\n", __func__);
3434         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3435                 return -EIO;
3436
3437         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3438                 if (!rbd_snap_registered(snap)) {
3439                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3440                         if (ret < 0)
3441                                 break;
3442                 }
3443         }
3444         dout("%s: returning %d\n", __func__, ret);
3445
3446         return ret;
3447 }
3448
3449 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3450 {
3451         struct device *dev;
3452         int ret;
3453
3454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3455
3456         dev = &rbd_dev->dev;
3457         dev->bus = &rbd_bus_type;
3458         dev->type = &rbd_device_type;
3459         dev->parent = &rbd_root_dev;
3460         dev->release = rbd_dev_release;
3461         dev_set_name(dev, "%d", rbd_dev->dev_id);
3462         ret = device_register(dev);
3463
3464         mutex_unlock(&ctl_mutex);
3465
3466         return ret;
3467 }
3468
3469 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3470 {
3471         device_unregister(&rbd_dev->dev);
3472 }
3473
3474 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3475
3476 /*
3477  * Get a unique rbd identifier for the given new rbd_dev, and add
3478  * the rbd_dev to the global list.  The minimum rbd id is 1.
3479  */
3480 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3481 {
3482         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3483
3484         spin_lock(&rbd_dev_list_lock);
3485         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3486         spin_unlock(&rbd_dev_list_lock);
3487         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3488                 (unsigned long long) rbd_dev->dev_id);
3489 }
3490
3491 /*
3492  * Remove an rbd_dev from the global list, and record that its
3493  * identifier is no longer in use.
3494  */
3495 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3496 {
3497         struct list_head *tmp;
3498         int rbd_id = rbd_dev->dev_id;
3499         int max_id;
3500
3501         rbd_assert(rbd_id > 0);
3502
3503         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3504                 (unsigned long long) rbd_dev->dev_id);
3505         spin_lock(&rbd_dev_list_lock);
3506         list_del_init(&rbd_dev->node);
3507
3508         /*
3509          * If the id being "put" is not the current maximum, there
3510          * is nothing special we need to do.
3511          */
3512         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3513                 spin_unlock(&rbd_dev_list_lock);
3514                 return;
3515         }
3516
3517         /*
3518          * We need to update the current maximum id.  Search the
3519          * list to find out what it is.  We're more likely to find
3520          * the maximum at the end, so search the list backward.
3521          */
3522         max_id = 0;
3523         list_for_each_prev(tmp, &rbd_dev_list) {
3524                 struct rbd_device *rbd_dev;
3525
3526                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3527                 if (rbd_dev->dev_id > max_id)
3528                         max_id = rbd_dev->dev_id;
3529         }
3530         spin_unlock(&rbd_dev_list_lock);
3531
3532         /*
3533          * The max id could have been updated by rbd_dev_id_get(), in
3534          * which case it now accurately reflects the new maximum.
3535          * Be careful not to overwrite the maximum value in that
3536          * case.
3537          */
3538         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3539         dout("  max dev id has been reset\n");
3540 }
3541
3542 /*
3543  * Skips over white space at *buf, and updates *buf to point to the
3544  * first found non-space character (if any). Returns the length of
3545  * the token (string of non-white space characters) found.  Note
3546  * that *buf must be terminated with '\0'.
3547  */
3548 static inline size_t next_token(const char **buf)
3549 {
3550         /*
3551         * These are the characters that produce nonzero for
3552         * isspace() in the "C" and "POSIX" locales.
3553         */
3554         const char *spaces = " \f\n\r\t\v";
3555
3556         *buf += strspn(*buf, spaces);   /* Find start of token */
3557
3558         return strcspn(*buf, spaces);   /* Return token length */
3559 }
3560
3561 /*
3562  * Finds the next token in *buf, and if the provided token buffer is
3563  * big enough, copies the found token into it.  The result, if
3564  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3565  * must be terminated with '\0' on entry.
3566  *
3567  * Returns the length of the token found (not including the '\0').
3568  * Return value will be 0 if no token is found, and it will be >=
3569  * token_size if the token would not fit.
3570  *
3571  * The *buf pointer will be updated to point beyond the end of the
3572  * found token.  Note that this occurs even if the token buffer is
3573  * too small to hold it.
3574  */
3575 static inline size_t copy_token(const char **buf,
3576                                 char *token,
3577                                 size_t token_size)
3578 {
3579         size_t len;
3580
3581         len = next_token(buf);
3582         if (len < token_size) {
3583                 memcpy(token, *buf, len);
3584                 *(token + len) = '\0';
3585         }
3586         *buf += len;
3587
3588         return len;
3589 }
3590
3591 /*
3592  * Finds the next token in *buf, dynamically allocates a buffer big
3593  * enough to hold a copy of it, and copies the token into the new
3594  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3595  * that a duplicate buffer is created even for a zero-length token.
3596  *
3597  * Returns a pointer to the newly-allocated duplicate, or a null
3598  * pointer if memory for the duplicate was not available.  If
3599  * the lenp argument is a non-null pointer, the length of the token
3600  * (not including the '\0') is returned in *lenp.
3601  *
3602  * If successful, the *buf pointer will be updated to point beyond
3603  * the end of the found token.
3604  *
3605  * Note: uses GFP_KERNEL for allocation.
3606  */
3607 static inline char *dup_token(const char **buf, size_t *lenp)
3608 {
3609         char *dup;
3610         size_t len;
3611
3612         len = next_token(buf);
3613         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3614         if (!dup)
3615                 return NULL;
3616         *(dup + len) = '\0';
3617         *buf += len;
3618
3619         if (lenp)
3620                 *lenp = len;
3621
3622         return dup;
3623 }
3624
3625 /*
3626  * Parse the options provided for an "rbd add" (i.e., rbd image
3627  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3628  * and the data written is passed here via a NUL-terminated buffer.
3629  * Returns 0 if successful or an error code otherwise.
3630  *
3631  * The information extracted from these options is recorded in
3632  * the other parameters which return dynamically-allocated
3633  * structures:
3634  *  ceph_opts
3635  *      The address of a pointer that will refer to a ceph options
3636  *      structure.  Caller must release the returned pointer using
3637  *      ceph_destroy_options() when it is no longer needed.
3638  *  rbd_opts
3639  *      Address of an rbd options pointer.  Fully initialized by
3640  *      this function; caller must release with kfree().
3641  *  spec
3642  *      Address of an rbd image specification pointer.  Fully
3643  *      initialized by this function based on parsed options.
3644  *      Caller must release with rbd_spec_put().
3645  *
3646  * The options passed take this form:
3647  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3648  * where:
3649  *  <mon_addrs>
3650  *      A comma-separated list of one or more monitor addresses.
3651  *      A monitor address is an ip address, optionally followed
3652  *      by a port number (separated by a colon).
3653  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3654  *  <options>
3655  *      A comma-separated list of ceph and/or rbd options.
3656  *  <pool_name>
3657  *      The name of the rados pool containing the rbd image.
3658  *  <image_name>
3659  *      The name of the image in that pool to map.
3660  *  <snap_id>
3661  *      An optional snapshot id.  If provided, the mapping will
3662  *      present data from the image at the time that snapshot was
3663  *      created.  The image head is used if no snapshot id is
3664  *      provided.  Snapshot mappings are always read-only.
3665  */
3666 static int rbd_add_parse_args(const char *buf,
3667                                 struct ceph_options **ceph_opts,
3668                                 struct rbd_options **opts,
3669                                 struct rbd_spec **rbd_spec)
3670 {
3671         size_t len;
3672         char *options;
3673         const char *mon_addrs;
3674         size_t mon_addrs_size;
3675         struct rbd_spec *spec = NULL;
3676         struct rbd_options *rbd_opts = NULL;
3677         struct ceph_options *copts;
3678         int ret;
3679
3680         /* The first four tokens are required */
3681
3682         len = next_token(&buf);
3683         if (!len) {
3684                 rbd_warn(NULL, "no monitor address(es) provided");
3685                 return -EINVAL;
3686         }
3687         mon_addrs = buf;
3688         mon_addrs_size = len + 1;
3689         buf += len;
3690
3691         ret = -EINVAL;
3692         options = dup_token(&buf, NULL);
3693         if (!options)
3694                 return -ENOMEM;
3695         if (!*options) {
3696                 rbd_warn(NULL, "no options provided");
3697                 goto out_err;
3698         }
3699
3700         spec = rbd_spec_alloc();
3701         if (!spec)
3702                 goto out_mem;
3703
3704         spec->pool_name = dup_token(&buf, NULL);
3705         if (!spec->pool_name)
3706                 goto out_mem;
3707         if (!*spec->pool_name) {
3708                 rbd_warn(NULL, "no pool name provided");
3709                 goto out_err;
3710         }
3711
3712         spec->image_name = dup_token(&buf, NULL);
3713         if (!spec->image_name)
3714                 goto out_mem;
3715         if (!*spec->image_name) {
3716                 rbd_warn(NULL, "no image name provided");
3717                 goto out_err;
3718         }
3719
3720         /*
3721          * Snapshot name is optional; default is to use "-"
3722          * (indicating the head/no snapshot).
3723          */
3724         len = next_token(&buf);
3725         if (!len) {
3726                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3727                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3728         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3729                 ret = -ENAMETOOLONG;
3730                 goto out_err;
3731         }
3732         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3733         if (!spec->snap_name)
3734                 goto out_mem;
3735         *(spec->snap_name + len) = '\0';
3736
3737         /* Initialize all rbd options to the defaults */
3738
3739         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3740         if (!rbd_opts)
3741                 goto out_mem;
3742
3743         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3744
3745         copts = ceph_parse_options(options, mon_addrs,
3746                                         mon_addrs + mon_addrs_size - 1,
3747                                         parse_rbd_opts_token, rbd_opts);
3748         if (IS_ERR(copts)) {
3749                 ret = PTR_ERR(copts);
3750                 goto out_err;
3751         }
3752         kfree(options);
3753
3754         *ceph_opts = copts;
3755         *opts = rbd_opts;
3756         *rbd_spec = spec;
3757
3758         return 0;
3759 out_mem:
3760         ret = -ENOMEM;
3761 out_err:
3762         kfree(rbd_opts);
3763         rbd_spec_put(spec);
3764         kfree(options);
3765
3766         return ret;
3767 }
3768
3769 /*
3770  * An rbd format 2 image has a unique identifier, distinct from the
3771  * name given to it by the user.  Internally, that identifier is
3772  * what's used to specify the names of objects related to the image.
3773  *
3774  * A special "rbd id" object is used to map an rbd image name to its
3775  * id.  If that object doesn't exist, then there is no v2 rbd image
3776  * with the supplied name.
3777  *
3778  * This function will record the given rbd_dev's image_id field if
3779  * it can be determined, and in that case will return 0.  If any
3780  * errors occur a negative errno will be returned and the rbd_dev's
3781  * image_id field will be unchanged (and should be NULL).
3782  */
3783 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3784 {
3785         int ret;
3786         size_t size;
3787         char *object_name;
3788         void *response;
3789         void *p;
3790
3791         /*
3792          * When probing a parent image, the image id is already
3793          * known (and the image name likely is not).  There's no
3794          * need to fetch the image id again in this case.
3795          */
3796         if (rbd_dev->spec->image_id)
3797                 return 0;
3798
3799         /*
3800          * First, see if the format 2 image id file exists, and if
3801          * so, get the image's persistent id from it.
3802          */
3803         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3804         object_name = kmalloc(size, GFP_NOIO);
3805         if (!object_name)
3806                 return -ENOMEM;
3807         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3808         dout("rbd id object name is %s\n", object_name);
3809
3810         /* Response will be an encoded string, which includes a length */
3811
3812         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3813         response = kzalloc(size, GFP_NOIO);
3814         if (!response) {
3815                 ret = -ENOMEM;
3816                 goto out;
3817         }
3818
3819         ret = rbd_obj_method_sync(rbd_dev, object_name,
3820                                 "rbd", "get_id",
3821                                 NULL, 0,
3822                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3823         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3824         if (ret < 0)
3825                 goto out;
3826
3827         p = response;
3828         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3829                                                 p + RBD_IMAGE_ID_LEN_MAX,
3830                                                 NULL, GFP_NOIO);
3831         if (IS_ERR(rbd_dev->spec->image_id)) {
3832                 ret = PTR_ERR(rbd_dev->spec->image_id);
3833                 rbd_dev->spec->image_id = NULL;
3834         } else {
3835                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3836         }
3837 out:
3838         kfree(response);
3839         kfree(object_name);
3840
3841         return ret;
3842 }
3843
3844 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3845 {
3846         int ret;
3847         size_t size;
3848
3849         /* Version 1 images have no id; empty string is used */
3850
3851         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3852         if (!rbd_dev->spec->image_id)
3853                 return -ENOMEM;
3854
3855         /* Record the header object name for this rbd image. */
3856
3857         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3858         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3859         if (!rbd_dev->header_name) {
3860                 ret = -ENOMEM;
3861                 goto out_err;
3862         }
3863         sprintf(rbd_dev->header_name, "%s%s",
3864                 rbd_dev->spec->image_name, RBD_SUFFIX);
3865
3866         /* Populate rbd image metadata */
3867
3868         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3869         if (ret < 0)
3870                 goto out_err;
3871
3872         /* Version 1 images have no parent (no layering) */
3873
3874         rbd_dev->parent_spec = NULL;
3875         rbd_dev->parent_overlap = 0;
3876
3877         rbd_dev->image_format = 1;
3878
3879         dout("discovered version 1 image, header name is %s\n",
3880                 rbd_dev->header_name);
3881
3882         return 0;
3883
3884 out_err:
3885         kfree(rbd_dev->header_name);
3886         rbd_dev->header_name = NULL;
3887         kfree(rbd_dev->spec->image_id);
3888         rbd_dev->spec->image_id = NULL;
3889
3890         return ret;
3891 }
3892
3893 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3894 {
3895         size_t size;
3896         int ret;
3897         u64 ver = 0;
3898
3899         /*
3900          * Image id was filled in by the caller.  Record the header
3901          * object name for this rbd image.
3902          */
3903         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3904         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3905         if (!rbd_dev->header_name)
3906                 return -ENOMEM;
3907         sprintf(rbd_dev->header_name, "%s%s",
3908                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3909
3910         /* Get the size and object order for the image */
3911
3912         ret = rbd_dev_v2_image_size(rbd_dev);
3913         if (ret < 0)
3914                 goto out_err;
3915
3916         /* Get the object prefix (a.k.a. block_name) for the image */
3917
3918         ret = rbd_dev_v2_object_prefix(rbd_dev);
3919         if (ret < 0)
3920                 goto out_err;
3921
3922         /* Get the and check features for the image */
3923
3924         ret = rbd_dev_v2_features(rbd_dev);
3925         if (ret < 0)
3926                 goto out_err;
3927
3928         /* If the image supports layering, get the parent info */
3929
3930         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3931                 ret = rbd_dev_v2_parent_info(rbd_dev);
3932                 if (ret < 0)
3933                         goto out_err;
3934         }
3935
3936         /* crypto and compression type aren't (yet) supported for v2 images */
3937
3938         rbd_dev->header.crypt_type = 0;
3939         rbd_dev->header.comp_type = 0;
3940
3941         /* Get the snapshot context, plus the header version */
3942
3943         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3944         if (ret)
3945                 goto out_err;
3946         rbd_dev->header.obj_version = ver;
3947
3948         rbd_dev->image_format = 2;
3949
3950         dout("discovered version 2 image, header name is %s\n",
3951                 rbd_dev->header_name);
3952
3953         return 0;
3954 out_err:
3955         rbd_dev->parent_overlap = 0;
3956         rbd_spec_put(rbd_dev->parent_spec);
3957         rbd_dev->parent_spec = NULL;
3958         kfree(rbd_dev->header_name);
3959         rbd_dev->header_name = NULL;
3960         kfree(rbd_dev->header.object_prefix);
3961         rbd_dev->header.object_prefix = NULL;
3962
3963         return ret;
3964 }
3965
3966 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3967 {
3968         int ret;
3969
3970         /* no need to lock here, as rbd_dev is not registered yet */
3971         ret = rbd_dev_snaps_update(rbd_dev);
3972         if (ret)
3973                 return ret;
3974
3975         ret = rbd_dev_probe_update_spec(rbd_dev);
3976         if (ret)
3977                 goto err_out_snaps;
3978
3979         ret = rbd_dev_set_mapping(rbd_dev);
3980         if (ret)
3981                 goto err_out_snaps;
3982
3983         /* generate unique id: find highest unique id, add one */
3984         rbd_dev_id_get(rbd_dev);
3985
3986         /* Fill in the device name, now that we have its id. */
3987         BUILD_BUG_ON(DEV_NAME_LEN
3988                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3989         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3990
3991         /* Get our block major device number. */
3992
3993         ret = register_blkdev(0, rbd_dev->name);
3994         if (ret < 0)
3995                 goto err_out_id;
3996         rbd_dev->major = ret;
3997
3998         /* Set up the blkdev mapping. */
3999
4000         ret = rbd_init_disk(rbd_dev);
4001         if (ret)
4002                 goto err_out_blkdev;
4003
4004         ret = rbd_bus_add_dev(rbd_dev);
4005         if (ret)
4006                 goto err_out_disk;
4007
4008         /*
4009          * At this point cleanup in the event of an error is the job
4010          * of the sysfs code (initiated by rbd_bus_del_dev()).
4011          */
4012         down_write(&rbd_dev->header_rwsem);
4013         ret = rbd_dev_snaps_register(rbd_dev);
4014         up_write(&rbd_dev->header_rwsem);
4015         if (ret)
4016                 goto err_out_bus;
4017
4018         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4019         if (ret)
4020                 goto err_out_bus;
4021
4022         /* Everything's ready.  Announce the disk to the world. */
4023
4024         add_disk(rbd_dev->disk);
4025
4026         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4027                 (unsigned long long) rbd_dev->mapping.size);
4028
4029         return ret;
4030 err_out_bus:
4031         /* this will also clean up rest of rbd_dev stuff */
4032
4033         rbd_bus_del_dev(rbd_dev);
4034
4035         return ret;
4036 err_out_disk:
4037         rbd_free_disk(rbd_dev);
4038 err_out_blkdev:
4039         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4040 err_out_id:
4041         rbd_dev_id_put(rbd_dev);
4042 err_out_snaps:
4043         rbd_remove_all_snaps(rbd_dev);
4044
4045         return ret;
4046 }
4047
4048 /*
4049  * Probe for the existence of the header object for the given rbd
4050  * device.  For format 2 images this includes determining the image
4051  * id.
4052  */
4053 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4054 {
4055         int ret;
4056
4057         /*
4058          * Get the id from the image id object.  If it's not a
4059          * format 2 image, we'll get ENOENT back, and we'll assume
4060          * it's a format 1 image.
4061          */
4062         ret = rbd_dev_image_id(rbd_dev);
4063         if (ret)
4064                 ret = rbd_dev_v1_probe(rbd_dev);
4065         else
4066                 ret = rbd_dev_v2_probe(rbd_dev);
4067         if (ret) {
4068                 dout("probe failed, returning %d\n", ret);
4069
4070                 return ret;
4071         }
4072
4073         ret = rbd_dev_probe_finish(rbd_dev);
4074         if (ret)
4075                 rbd_header_free(&rbd_dev->header);
4076
4077         return ret;
4078 }
4079
4080 static ssize_t rbd_add(struct bus_type *bus,
4081                        const char *buf,
4082                        size_t count)
4083 {
4084         struct rbd_device *rbd_dev = NULL;
4085         struct ceph_options *ceph_opts = NULL;
4086         struct rbd_options *rbd_opts = NULL;
4087         struct rbd_spec *spec = NULL;
4088         struct rbd_client *rbdc;
4089         struct ceph_osd_client *osdc;
4090         int rc = -ENOMEM;
4091
4092         if (!try_module_get(THIS_MODULE))
4093                 return -ENODEV;
4094
4095         /* parse add command */
4096         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4097         if (rc < 0)
4098                 goto err_out_module;
4099
4100         rbdc = rbd_get_client(ceph_opts);
4101         if (IS_ERR(rbdc)) {
4102                 rc = PTR_ERR(rbdc);
4103                 goto err_out_args;
4104         }
4105         ceph_opts = NULL;       /* rbd_dev client now owns this */
4106
4107         /* pick the pool */
4108         osdc = &rbdc->client->osdc;
4109         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4110         if (rc < 0)
4111                 goto err_out_client;
4112         spec->pool_id = (u64) rc;
4113
4114         /* The ceph file layout needs to fit pool id in 32 bits */
4115
4116         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4117                 rc = -EIO;
4118                 goto err_out_client;
4119         }
4120
4121         rbd_dev = rbd_dev_create(rbdc, spec);
4122         if (!rbd_dev)
4123                 goto err_out_client;
4124         rbdc = NULL;            /* rbd_dev now owns this */
4125         spec = NULL;            /* rbd_dev now owns this */
4126
4127         rbd_dev->mapping.read_only = rbd_opts->read_only;
4128         kfree(rbd_opts);
4129         rbd_opts = NULL;        /* done with this */
4130
4131         rc = rbd_dev_probe(rbd_dev);
4132         if (rc < 0)
4133                 goto err_out_rbd_dev;
4134
4135         return count;
4136 err_out_rbd_dev:
4137         rbd_dev_destroy(rbd_dev);
4138 err_out_client:
4139         rbd_put_client(rbdc);
4140 err_out_args:
4141         if (ceph_opts)
4142                 ceph_destroy_options(ceph_opts);
4143         kfree(rbd_opts);
4144         rbd_spec_put(spec);
4145 err_out_module:
4146         module_put(THIS_MODULE);
4147
4148         dout("Error adding device %s\n", buf);
4149
4150         return (ssize_t) rc;
4151 }
4152
4153 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4154 {
4155         struct list_head *tmp;
4156         struct rbd_device *rbd_dev;
4157
4158         spin_lock(&rbd_dev_list_lock);
4159         list_for_each(tmp, &rbd_dev_list) {
4160                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4161                 if (rbd_dev->dev_id == dev_id) {
4162                         spin_unlock(&rbd_dev_list_lock);
4163                         return rbd_dev;
4164                 }
4165         }
4166         spin_unlock(&rbd_dev_list_lock);
4167         return NULL;
4168 }
4169
4170 static void rbd_dev_release(struct device *dev)
4171 {
4172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4173
4174         if (rbd_dev->watch_event)
4175                 rbd_dev_header_watch_sync(rbd_dev, 0);
4176
4177         /* clean up and free blkdev */
4178         rbd_free_disk(rbd_dev);
4179         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4180
4181         /* release allocated disk header fields */
4182         rbd_header_free(&rbd_dev->header);
4183
4184         /* done with the id, and with the rbd_dev */
4185         rbd_dev_id_put(rbd_dev);
4186         rbd_assert(rbd_dev->rbd_client != NULL);
4187         rbd_dev_destroy(rbd_dev);
4188
4189         /* release module ref */
4190         module_put(THIS_MODULE);
4191 }
4192
4193 static ssize_t rbd_remove(struct bus_type *bus,
4194                           const char *buf,
4195                           size_t count)
4196 {
4197         struct rbd_device *rbd_dev = NULL;
4198         int target_id, rc;
4199         unsigned long ul;
4200         int ret = count;
4201
4202         rc = strict_strtoul(buf, 10, &ul);
4203         if (rc)
4204                 return rc;
4205
4206         /* convert to int; abort if we lost anything in the conversion */
4207         target_id = (int) ul;
4208         if (target_id != ul)
4209                 return -EINVAL;
4210
4211         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4212
4213         rbd_dev = __rbd_get_dev(target_id);
4214         if (!rbd_dev) {
4215                 ret = -ENOENT;
4216                 goto done;
4217         }
4218
4219         spin_lock_irq(&rbd_dev->lock);
4220         if (rbd_dev->open_count)
4221                 ret = -EBUSY;
4222         else
4223                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4224         spin_unlock_irq(&rbd_dev->lock);
4225         if (ret < 0)
4226                 goto done;
4227
4228         rbd_remove_all_snaps(rbd_dev);
4229         rbd_bus_del_dev(rbd_dev);
4230
4231 done:
4232         mutex_unlock(&ctl_mutex);
4233
4234         return ret;
4235 }
4236
4237 /*
4238  * create control files in sysfs
4239  * /sys/bus/rbd/...
4240  */
4241 static int rbd_sysfs_init(void)
4242 {
4243         int ret;
4244
4245         ret = device_register(&rbd_root_dev);
4246         if (ret < 0)
4247                 return ret;
4248
4249         ret = bus_register(&rbd_bus_type);
4250         if (ret < 0)
4251                 device_unregister(&rbd_root_dev);
4252
4253         return ret;
4254 }
4255
4256 static void rbd_sysfs_cleanup(void)
4257 {
4258         bus_unregister(&rbd_bus_type);
4259         device_unregister(&rbd_root_dev);
4260 }
4261
4262 static int __init rbd_init(void)
4263 {
4264         int rc;
4265
4266         if (!libceph_compatible(NULL)) {
4267                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4268
4269                 return -EINVAL;
4270         }
4271         rc = rbd_sysfs_init();
4272         if (rc)
4273                 return rc;
4274         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4275         return 0;
4276 }
4277
4278 static void __exit rbd_exit(void)
4279 {
4280         rbd_sysfs_cleanup();
4281 }
4282
4283 module_init(rbd_init);
4284 module_exit(rbd_exit);
4285
4286 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4287 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4288 MODULE_DESCRIPTION("rados block device");
4289
4290 /* following authorship retained from original osdblk.c */
4291 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4292
4293 MODULE_LICENSE("GPL");