floppy: use del_timer_sync() in init cleanup
[linux-2.6.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101         struct request          *rq;            /* blk layer request */
102         struct bio              *bio;           /* cloned bio */
103         struct page             **pages;        /* list of used pages */
104         u64                     len;
105         int                     coll_index;
106         struct rbd_req_coll     *coll;
107 };
108
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 struct rbd_snap {
126         struct  device          dev;
127         const char              *name;
128         size_t                  size;
129         struct list_head        node;
130         u64                     id;
131 };
132
133 /*
134  * a single device
135  */
136 struct rbd_device {
137         int                     id;             /* blkdev unique id */
138
139         int                     major;          /* blkdev assigned major */
140         struct gendisk          *disk;          /* blkdev's gendisk and rq */
141         struct request_queue    *q;
142
143         struct ceph_client      *client;
144         struct rbd_client       *rbd_client;
145
146         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148         spinlock_t              lock;           /* queue lock */
149
150         struct rbd_image_header header;
151         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152         int                     obj_len;
153         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
155         int                     poolid;
156
157         struct ceph_osd_event   *watch_event;
158         struct ceph_osd_request *watch_request;
159
160         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161         u32 cur_snap;   /* index+1 of current snapshot within snap context
162                            0 - for the head */
163         int read_only;
164
165         struct list_head        node;
166
167         /* list of snapshots */
168         struct list_head        snaps;
169
170         /* sysfs related */
171         struct device           dev;
172 };
173
174 static struct bus_type rbd_bus_type = {
175         .name           = "rbd",
176 };
177
178 static spinlock_t node_lock;      /* protects client get/put */
179
180 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187                                  struct device_attribute *attr,
188                                  const char *buf,
189                                  size_t size);
190 static ssize_t rbd_snap_add(struct device *dev,
191                             struct device_attribute *attr,
192                             const char *buf,
193                             size_t count);
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195                                   struct rbd_snap *snap);;
196
197
198 static struct rbd_device *dev_to_rbd(struct device *dev)
199 {
200         return container_of(dev, struct rbd_device, dev);
201 }
202
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204 {
205         return get_device(&rbd_dev->dev);
206 }
207
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
209 {
210         put_device(&rbd_dev->dev);
211 }
212
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
216 {
217         struct gendisk *disk = bdev->bd_disk;
218         struct rbd_device *rbd_dev = disk->private_data;
219
220         rbd_get_dev(rbd_dev);
221
222         set_device_ro(bdev, rbd_dev->read_only);
223
224         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225                 return -EROFS;
226
227         return 0;
228 }
229
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
231 {
232         struct rbd_device *rbd_dev = disk->private_data;
233
234         rbd_put_dev(rbd_dev);
235
236         return 0;
237 }
238
239 static const struct block_device_operations rbd_bd_ops = {
240         .owner                  = THIS_MODULE,
241         .open                   = rbd_open,
242         .release                = rbd_release,
243 };
244
245 /*
246  * Initialize an rbd client instance.
247  * We own *opt.
248  */
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250                                             struct rbd_options *rbd_opts)
251 {
252         struct rbd_client *rbdc;
253         int ret = -ENOMEM;
254
255         dout("rbd_client_create\n");
256         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257         if (!rbdc)
258                 goto out_opt;
259
260         kref_init(&rbdc->kref);
261         INIT_LIST_HEAD(&rbdc->node);
262
263         rbdc->client = ceph_create_client(opt, rbdc);
264         if (IS_ERR(rbdc->client))
265                 goto out_rbdc;
266         opt = NULL; /* Now rbdc->client is responsible for opt */
267
268         ret = ceph_open_session(rbdc->client);
269         if (ret < 0)
270                 goto out_err;
271
272         rbdc->rbd_opts = rbd_opts;
273
274         spin_lock(&node_lock);
275         list_add_tail(&rbdc->node, &rbd_client_list);
276         spin_unlock(&node_lock);
277
278         dout("rbd_client_create created %p\n", rbdc);
279         return rbdc;
280
281 out_err:
282         ceph_destroy_client(rbdc->client);
283 out_rbdc:
284         kfree(rbdc);
285 out_opt:
286         if (opt)
287                 ceph_destroy_options(opt);
288         return ERR_PTR(ret);
289 }
290
291 /*
292  * Find a ceph client with specific addr and configuration.
293  */
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295 {
296         struct rbd_client *client_node;
297
298         if (opt->flags & CEPH_OPT_NOSHARE)
299                 return NULL;
300
301         list_for_each_entry(client_node, &rbd_client_list, node)
302                 if (ceph_compare_options(opt, client_node->client) == 0)
303                         return client_node;
304         return NULL;
305 }
306
307 /*
308  * mount options
309  */
310 enum {
311         Opt_notify_timeout,
312         Opt_last_int,
313         /* int args above */
314         Opt_last_string,
315         /* string args above */
316 };
317
318 static match_table_t rbdopt_tokens = {
319         {Opt_notify_timeout, "notify_timeout=%d"},
320         /* int args above */
321         /* string args above */
322         {-1, NULL}
323 };
324
325 static int parse_rbd_opts_token(char *c, void *private)
326 {
327         struct rbd_options *rbdopt = private;
328         substring_t argstr[MAX_OPT_ARGS];
329         int token, intval, ret;
330
331         token = match_token((char *)c, rbdopt_tokens, argstr);
332         if (token < 0)
333                 return -EINVAL;
334
335         if (token < Opt_last_int) {
336                 ret = match_int(&argstr[0], &intval);
337                 if (ret < 0) {
338                         pr_err("bad mount option arg (not int) "
339                                "at '%s'\n", c);
340                         return ret;
341                 }
342                 dout("got int token %d val %d\n", token, intval);
343         } else if (token > Opt_last_int && token < Opt_last_string) {
344                 dout("got string token %d val %s\n", token,
345                      argstr[0].from);
346         } else {
347                 dout("got token %d\n", token);
348         }
349
350         switch (token) {
351         case Opt_notify_timeout:
352                 rbdopt->notify_timeout = intval;
353                 break;
354         default:
355                 BUG_ON(token);
356         }
357         return 0;
358 }
359
360 /*
361  * Get a ceph client with specific addr and configuration, if one does
362  * not exist create it.
363  */
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365                           char *options)
366 {
367         struct rbd_client *rbdc;
368         struct ceph_options *opt;
369         int ret;
370         struct rbd_options *rbd_opts;
371
372         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373         if (!rbd_opts)
374                 return -ENOMEM;
375
376         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
377
378         ret = ceph_parse_options(&opt, options, mon_addr,
379                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380         if (ret < 0)
381                 goto done_err;
382
383         spin_lock(&node_lock);
384         rbdc = __rbd_client_find(opt);
385         if (rbdc) {
386                 ceph_destroy_options(opt);
387
388                 /* using an existing client */
389                 kref_get(&rbdc->kref);
390                 rbd_dev->rbd_client = rbdc;
391                 rbd_dev->client = rbdc->client;
392                 spin_unlock(&node_lock);
393                 return 0;
394         }
395         spin_unlock(&node_lock);
396
397         rbdc = rbd_client_create(opt, rbd_opts);
398         if (IS_ERR(rbdc)) {
399                 ret = PTR_ERR(rbdc);
400                 goto done_err;
401         }
402
403         rbd_dev->rbd_client = rbdc;
404         rbd_dev->client = rbdc->client;
405         return 0;
406 done_err:
407         kfree(rbd_opts);
408         return ret;
409 }
410
411 /*
412  * Destroy ceph client
413  */
414 static void rbd_client_release(struct kref *kref)
415 {
416         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417
418         dout("rbd_release_client %p\n", rbdc);
419         spin_lock(&node_lock);
420         list_del(&rbdc->node);
421         spin_unlock(&node_lock);
422
423         ceph_destroy_client(rbdc->client);
424         kfree(rbdc->rbd_opts);
425         kfree(rbdc);
426 }
427
428 /*
429  * Drop reference to ceph client node. If it's not referenced anymore, release
430  * it.
431  */
432 static void rbd_put_client(struct rbd_device *rbd_dev)
433 {
434         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435         rbd_dev->rbd_client = NULL;
436         rbd_dev->client = NULL;
437 }
438
439 /*
440  * Destroy requests collection
441  */
442 static void rbd_coll_release(struct kref *kref)
443 {
444         struct rbd_req_coll *coll =
445                 container_of(kref, struct rbd_req_coll, kref);
446
447         dout("rbd_coll_release %p\n", coll);
448         kfree(coll);
449 }
450
451 /*
452  * Create a new header structure, translate header format from the on-disk
453  * header.
454  */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456                                  struct rbd_image_header_ondisk *ondisk,
457                                  int allocated_snaps,
458                                  gfp_t gfp_flags)
459 {
460         int i;
461         u32 snap_count = le32_to_cpu(ondisk->snap_count);
462         int ret = -ENOMEM;
463
464         init_rwsem(&header->snap_rwsem);
465         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467                                 snap_count *
468                                  sizeof(struct rbd_image_snap_ondisk),
469                                 gfp_flags);
470         if (!header->snapc)
471                 return -ENOMEM;
472         if (snap_count) {
473                 header->snap_names = kmalloc(header->snap_names_len,
474                                              GFP_KERNEL);
475                 if (!header->snap_names)
476                         goto err_snapc;
477                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478                                              GFP_KERNEL);
479                 if (!header->snap_sizes)
480                         goto err_names;
481         } else {
482                 header->snap_names = NULL;
483                 header->snap_sizes = NULL;
484         }
485         memcpy(header->block_name, ondisk->block_name,
486                sizeof(ondisk->block_name));
487
488         header->image_size = le64_to_cpu(ondisk->image_size);
489         header->obj_order = ondisk->options.order;
490         header->crypt_type = ondisk->options.crypt_type;
491         header->comp_type = ondisk->options.comp_type;
492
493         atomic_set(&header->snapc->nref, 1);
494         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495         header->snapc->num_snaps = snap_count;
496         header->total_snaps = snap_count;
497
498         if (snap_count &&
499             allocated_snaps == snap_count) {
500                 for (i = 0; i < snap_count; i++) {
501                         header->snapc->snaps[i] =
502                                 le64_to_cpu(ondisk->snaps[i].id);
503                         header->snap_sizes[i] =
504                                 le64_to_cpu(ondisk->snaps[i].image_size);
505                 }
506
507                 /* copy snapshot names */
508                 memcpy(header->snap_names, &ondisk->snaps[i],
509                         header->snap_names_len);
510         }
511
512         return 0;
513
514 err_names:
515         kfree(header->snap_names);
516 err_snapc:
517         kfree(header->snapc);
518         return ret;
519 }
520
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523         return header->total_snaps - snap_num;
524 }
525
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528         struct rbd_image_header *header = &rbd_dev->header;
529
530         if (!rbd_dev->cur_snap)
531                 return 0;
532
533         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537                         u64 *seq, u64 *size)
538 {
539         int i;
540         char *p = header->snap_names;
541
542         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543                 if (strcmp(snap_name, p) == 0)
544                         break;
545         }
546         if (i == header->total_snaps)
547                 return -ENOENT;
548         if (seq)
549                 *seq = header->snapc->snaps[i];
550
551         if (size)
552                 *size = header->snap_sizes[i];
553
554         return i;
555 }
556
557 static int rbd_header_set_snap(struct rbd_device *dev,
558                                const char *snap_name,
559                                u64 *size)
560 {
561         struct rbd_image_header *header = &dev->header;
562         struct ceph_snap_context *snapc = header->snapc;
563         int ret = -ENOENT;
564
565         down_write(&header->snap_rwsem);
566
567         if (!snap_name ||
568             !*snap_name ||
569             strcmp(snap_name, "-") == 0 ||
570             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571                 if (header->total_snaps)
572                         snapc->seq = header->snap_seq;
573                 else
574                         snapc->seq = 0;
575                 dev->cur_snap = 0;
576                 dev->read_only = 0;
577                 if (size)
578                         *size = header->image_size;
579         } else {
580                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
581                 if (ret < 0)
582                         goto done;
583
584                 dev->cur_snap = header->total_snaps - ret;
585                 dev->read_only = 1;
586         }
587
588         ret = 0;
589 done:
590         up_write(&header->snap_rwsem);
591         return ret;
592 }
593
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596         kfree(header->snapc);
597         kfree(header->snap_names);
598         kfree(header->snap_sizes);
599 }
600
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605                            const char *block_name,
606                            u64 ofs, u64 len,
607                            char *seg_name, u64 *segofs)
608 {
609         u64 seg = ofs >> header->obj_order;
610
611         if (seg_name)
612                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613                          "%s.%012llx", block_name, seg);
614
615         ofs = ofs & ((1 << header->obj_order) - 1);
616         len = min_t(u64, len, (1 << header->obj_order) - ofs);
617
618         if (segofs)
619                 *segofs = ofs;
620
621         return len;
622 }
623
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625                                 u64 ofs, u64 len)
626 {
627         u64 start_seg = ofs >> header->obj_order;
628         u64 end_seg = (ofs + len - 1) >> header->obj_order;
629         return end_seg - start_seg + 1;
630 }
631
632 /*
633  * returns the size of an object in the image
634  */
635 static u64 rbd_obj_bytes(struct rbd_image_header *header)
636 {
637         return 1 << header->obj_order;
638 }
639
640 /*
641  * bio helpers
642  */
643
644 static void bio_chain_put(struct bio *chain)
645 {
646         struct bio *tmp;
647
648         while (chain) {
649                 tmp = chain;
650                 chain = chain->bi_next;
651                 bio_put(tmp);
652         }
653 }
654
655 /*
656  * zeros a bio chain, starting at specific offset
657  */
658 static void zero_bio_chain(struct bio *chain, int start_ofs)
659 {
660         struct bio_vec *bv;
661         unsigned long flags;
662         void *buf;
663         int i;
664         int pos = 0;
665
666         while (chain) {
667                 bio_for_each_segment(bv, chain, i) {
668                         if (pos + bv->bv_len > start_ofs) {
669                                 int remainder = max(start_ofs - pos, 0);
670                                 buf = bvec_kmap_irq(bv, &flags);
671                                 memset(buf + remainder, 0,
672                                        bv->bv_len - remainder);
673                                 bvec_kunmap_irq(buf, &flags);
674                         }
675                         pos += bv->bv_len;
676                 }
677
678                 chain = chain->bi_next;
679         }
680 }
681
682 /*
683  * bio_chain_clone - clone a chain of bios up to a certain length.
684  * might return a bio_pair that will need to be released.
685  */
686 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
687                                    struct bio_pair **bp,
688                                    int len, gfp_t gfpmask)
689 {
690         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
691         int total = 0;
692
693         if (*bp) {
694                 bio_pair_release(*bp);
695                 *bp = NULL;
696         }
697
698         while (old_chain && (total < len)) {
699                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
700                 if (!tmp)
701                         goto err_out;
702
703                 if (total + old_chain->bi_size > len) {
704                         struct bio_pair *bp;
705
706                         /*
707                          * this split can only happen with a single paged bio,
708                          * split_bio will BUG_ON if this is not the case
709                          */
710                         dout("bio_chain_clone split! total=%d remaining=%d"
711                              "bi_size=%d\n",
712                              (int)total, (int)len-total,
713                              (int)old_chain->bi_size);
714
715                         /* split the bio. We'll release it either in the next
716                            call, or it will have to be released outside */
717                         bp = bio_split(old_chain, (len - total) / 512ULL);
718                         if (!bp)
719                                 goto err_out;
720
721                         __bio_clone(tmp, &bp->bio1);
722
723                         *next = &bp->bio2;
724                 } else {
725                         __bio_clone(tmp, old_chain);
726                         *next = old_chain->bi_next;
727                 }
728
729                 tmp->bi_bdev = NULL;
730                 gfpmask &= ~__GFP_WAIT;
731                 tmp->bi_next = NULL;
732
733                 if (!new_chain) {
734                         new_chain = tail = tmp;
735                 } else {
736                         tail->bi_next = tmp;
737                         tail = tmp;
738                 }
739                 old_chain = old_chain->bi_next;
740
741                 total += tmp->bi_size;
742         }
743
744         BUG_ON(total < len);
745
746         if (tail)
747                 tail->bi_next = NULL;
748
749         *old = old_chain;
750
751         return new_chain;
752
753 err_out:
754         dout("bio_chain_clone with err\n");
755         bio_chain_put(new_chain);
756         return NULL;
757 }
758
759 /*
760  * helpers for osd request op vectors.
761  */
762 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
763                             int num_ops,
764                             int opcode,
765                             u32 payload_len)
766 {
767         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
768                        GFP_NOIO);
769         if (!*ops)
770                 return -ENOMEM;
771         (*ops)[0].op = opcode;
772         /*
773          * op extent offset and length will be set later on
774          * in calc_raw_layout()
775          */
776         (*ops)[0].payload_len = payload_len;
777         return 0;
778 }
779
780 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
781 {
782         kfree(ops);
783 }
784
785 static void rbd_coll_end_req_index(struct request *rq,
786                                    struct rbd_req_coll *coll,
787                                    int index,
788                                    int ret, u64 len)
789 {
790         struct request_queue *q;
791         int min, max, i;
792
793         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
794              coll, index, ret, len);
795
796         if (!rq)
797                 return;
798
799         if (!coll) {
800                 blk_end_request(rq, ret, len);
801                 return;
802         }
803
804         q = rq->q;
805
806         spin_lock_irq(q->queue_lock);
807         coll->status[index].done = 1;
808         coll->status[index].rc = ret;
809         coll->status[index].bytes = len;
810         max = min = coll->num_done;
811         while (max < coll->total && coll->status[max].done)
812                 max++;
813
814         for (i = min; i<max; i++) {
815                 __blk_end_request(rq, coll->status[i].rc,
816                                   coll->status[i].bytes);
817                 coll->num_done++;
818                 kref_put(&coll->kref, rbd_coll_release);
819         }
820         spin_unlock_irq(q->queue_lock);
821 }
822
823 static void rbd_coll_end_req(struct rbd_request *req,
824                              int ret, u64 len)
825 {
826         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
827 }
828
829 /*
830  * Send ceph osd request
831  */
832 static int rbd_do_request(struct request *rq,
833                           struct rbd_device *dev,
834                           struct ceph_snap_context *snapc,
835                           u64 snapid,
836                           const char *obj, u64 ofs, u64 len,
837                           struct bio *bio,
838                           struct page **pages,
839                           int num_pages,
840                           int flags,
841                           struct ceph_osd_req_op *ops,
842                           int num_reply,
843                           struct rbd_req_coll *coll,
844                           int coll_index,
845                           void (*rbd_cb)(struct ceph_osd_request *req,
846                                          struct ceph_msg *msg),
847                           struct ceph_osd_request **linger_req,
848                           u64 *ver)
849 {
850         struct ceph_osd_request *req;
851         struct ceph_file_layout *layout;
852         int ret;
853         u64 bno;
854         struct timespec mtime = CURRENT_TIME;
855         struct rbd_request *req_data;
856         struct ceph_osd_request_head *reqhead;
857         struct rbd_image_header *header = &dev->header;
858
859         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
860         if (!req_data) {
861                 if (coll)
862                         rbd_coll_end_req_index(rq, coll, coll_index,
863                                                -ENOMEM, len);
864                 return -ENOMEM;
865         }
866
867         if (coll) {
868                 req_data->coll = coll;
869                 req_data->coll_index = coll_index;
870         }
871
872         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
873
874         down_read(&header->snap_rwsem);
875
876         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
877                                       snapc,
878                                       ops,
879                                       false,
880                                       GFP_NOIO, pages, bio);
881         if (!req) {
882                 up_read(&header->snap_rwsem);
883                 ret = -ENOMEM;
884                 goto done_pages;
885         }
886
887         req->r_callback = rbd_cb;
888
889         req_data->rq = rq;
890         req_data->bio = bio;
891         req_data->pages = pages;
892         req_data->len = len;
893
894         req->r_priv = req_data;
895
896         reqhead = req->r_request->front.iov_base;
897         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
898
899         strncpy(req->r_oid, obj, sizeof(req->r_oid));
900         req->r_oid_len = strlen(req->r_oid);
901
902         layout = &req->r_file_layout;
903         memset(layout, 0, sizeof(*layout));
904         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905         layout->fl_stripe_count = cpu_to_le32(1);
906         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
907         layout->fl_pg_preferred = cpu_to_le32(-1);
908         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
909         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
910                              ofs, &len, &bno, req, ops);
911
912         ceph_osdc_build_request(req, ofs, &len,
913                                 ops,
914                                 snapc,
915                                 &mtime,
916                                 req->r_oid, req->r_oid_len);
917         up_read(&header->snap_rwsem);
918
919         if (linger_req) {
920                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
921                 *linger_req = req;
922         }
923
924         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
925         if (ret < 0)
926                 goto done_err;
927
928         if (!rbd_cb) {
929                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
930                 if (ver)
931                         *ver = le64_to_cpu(req->r_reassert_version.version);
932                 dout("reassert_ver=%lld\n",
933                      le64_to_cpu(req->r_reassert_version.version));
934                 ceph_osdc_put_request(req);
935         }
936         return ret;
937
938 done_err:
939         bio_chain_put(req_data->bio);
940         ceph_osdc_put_request(req);
941 done_pages:
942         rbd_coll_end_req(req_data, ret, len);
943         kfree(req_data);
944         return ret;
945 }
946
947 /*
948  * Ceph osd op callback
949  */
950 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
951 {
952         struct rbd_request *req_data = req->r_priv;
953         struct ceph_osd_reply_head *replyhead;
954         struct ceph_osd_op *op;
955         __s32 rc;
956         u64 bytes;
957         int read_op;
958
959         /* parse reply */
960         replyhead = msg->front.iov_base;
961         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
962         op = (void *)(replyhead + 1);
963         rc = le32_to_cpu(replyhead->result);
964         bytes = le64_to_cpu(op->extent.length);
965         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
966
967         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
968
969         if (rc == -ENOENT && read_op) {
970                 zero_bio_chain(req_data->bio, 0);
971                 rc = 0;
972         } else if (rc == 0 && read_op && bytes < req_data->len) {
973                 zero_bio_chain(req_data->bio, bytes);
974                 bytes = req_data->len;
975         }
976
977         rbd_coll_end_req(req_data, rc, bytes);
978
979         if (req_data->bio)
980                 bio_chain_put(req_data->bio);
981
982         ceph_osdc_put_request(req);
983         kfree(req_data);
984 }
985
986 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
987 {
988         ceph_osdc_put_request(req);
989 }
990
991 /*
992  * Do a synchronous ceph osd operation
993  */
994 static int rbd_req_sync_op(struct rbd_device *dev,
995                            struct ceph_snap_context *snapc,
996                            u64 snapid,
997                            int opcode,
998                            int flags,
999                            struct ceph_osd_req_op *orig_ops,
1000                            int num_reply,
1001                            const char *obj,
1002                            u64 ofs, u64 len,
1003                            char *buf,
1004                            struct ceph_osd_request **linger_req,
1005                            u64 *ver)
1006 {
1007         int ret;
1008         struct page **pages;
1009         int num_pages;
1010         struct ceph_osd_req_op *ops = orig_ops;
1011         u32 payload_len;
1012
1013         num_pages = calc_pages_for(ofs , len);
1014         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1015         if (IS_ERR(pages))
1016                 return PTR_ERR(pages);
1017
1018         if (!orig_ops) {
1019                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1020                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1021                 if (ret < 0)
1022                         goto done;
1023
1024                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1025                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1026                         if (ret < 0)
1027                                 goto done_ops;
1028                 }
1029         }
1030
1031         ret = rbd_do_request(NULL, dev, snapc, snapid,
1032                           obj, ofs, len, NULL,
1033                           pages, num_pages,
1034                           flags,
1035                           ops,
1036                           2,
1037                           NULL, 0,
1038                           NULL,
1039                           linger_req, ver);
1040         if (ret < 0)
1041                 goto done_ops;
1042
1043         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1044                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1045
1046 done_ops:
1047         if (!orig_ops)
1048                 rbd_destroy_ops(ops);
1049 done:
1050         ceph_release_page_vector(pages, num_pages);
1051         return ret;
1052 }
1053
1054 /*
1055  * Do an asynchronous ceph osd operation
1056  */
1057 static int rbd_do_op(struct request *rq,
1058                      struct rbd_device *rbd_dev ,
1059                      struct ceph_snap_context *snapc,
1060                      u64 snapid,
1061                      int opcode, int flags, int num_reply,
1062                      u64 ofs, u64 len,
1063                      struct bio *bio,
1064                      struct rbd_req_coll *coll,
1065                      int coll_index)
1066 {
1067         char *seg_name;
1068         u64 seg_ofs;
1069         u64 seg_len;
1070         int ret;
1071         struct ceph_osd_req_op *ops;
1072         u32 payload_len;
1073
1074         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1075         if (!seg_name)
1076                 return -ENOMEM;
1077
1078         seg_len = rbd_get_segment(&rbd_dev->header,
1079                                   rbd_dev->header.block_name,
1080                                   ofs, len,
1081                                   seg_name, &seg_ofs);
1082
1083         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1084
1085         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1086         if (ret < 0)
1087                 goto done;
1088
1089         /* we've taken care of segment sizes earlier when we
1090            cloned the bios. We should never have a segment
1091            truncated at this point */
1092         BUG_ON(seg_len < len);
1093
1094         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1095                              seg_name, seg_ofs, seg_len,
1096                              bio,
1097                              NULL, 0,
1098                              flags,
1099                              ops,
1100                              num_reply,
1101                              coll, coll_index,
1102                              rbd_req_cb, 0, NULL);
1103
1104         rbd_destroy_ops(ops);
1105 done:
1106         kfree(seg_name);
1107         return ret;
1108 }
1109
1110 /*
1111  * Request async osd write
1112  */
1113 static int rbd_req_write(struct request *rq,
1114                          struct rbd_device *rbd_dev,
1115                          struct ceph_snap_context *snapc,
1116                          u64 ofs, u64 len,
1117                          struct bio *bio,
1118                          struct rbd_req_coll *coll,
1119                          int coll_index)
1120 {
1121         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1122                          CEPH_OSD_OP_WRITE,
1123                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1124                          2,
1125                          ofs, len, bio, coll, coll_index);
1126 }
1127
1128 /*
1129  * Request async osd read
1130  */
1131 static int rbd_req_read(struct request *rq,
1132                          struct rbd_device *rbd_dev,
1133                          u64 snapid,
1134                          u64 ofs, u64 len,
1135                          struct bio *bio,
1136                          struct rbd_req_coll *coll,
1137                          int coll_index)
1138 {
1139         return rbd_do_op(rq, rbd_dev, NULL,
1140                          (snapid ? snapid : CEPH_NOSNAP),
1141                          CEPH_OSD_OP_READ,
1142                          CEPH_OSD_FLAG_READ,
1143                          2,
1144                          ofs, len, bio, coll, coll_index);
1145 }
1146
1147 /*
1148  * Request sync osd read
1149  */
1150 static int rbd_req_sync_read(struct rbd_device *dev,
1151                           struct ceph_snap_context *snapc,
1152                           u64 snapid,
1153                           const char *obj,
1154                           u64 ofs, u64 len,
1155                           char *buf,
1156                           u64 *ver)
1157 {
1158         return rbd_req_sync_op(dev, NULL,
1159                                (snapid ? snapid : CEPH_NOSNAP),
1160                                CEPH_OSD_OP_READ,
1161                                CEPH_OSD_FLAG_READ,
1162                                NULL,
1163                                1, obj, ofs, len, buf, NULL, ver);
1164 }
1165
1166 /*
1167  * Request sync osd watch
1168  */
1169 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1170                                    u64 ver,
1171                                    u64 notify_id,
1172                                    const char *obj)
1173 {
1174         struct ceph_osd_req_op *ops;
1175         struct page **pages = NULL;
1176         int ret;
1177
1178         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1179         if (ret < 0)
1180                 return ret;
1181
1182         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1183         ops[0].watch.cookie = notify_id;
1184         ops[0].watch.flag = 0;
1185
1186         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1187                           obj, 0, 0, NULL,
1188                           pages, 0,
1189                           CEPH_OSD_FLAG_READ,
1190                           ops,
1191                           1,
1192                           NULL, 0,
1193                           rbd_simple_req_cb, 0, NULL);
1194
1195         rbd_destroy_ops(ops);
1196         return ret;
1197 }
1198
1199 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1200 {
1201         struct rbd_device *dev = (struct rbd_device *)data;
1202         int rc;
1203
1204         if (!dev)
1205                 return;
1206
1207         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1208                 notify_id, (int)opcode);
1209         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1210         rc = __rbd_update_snaps(dev);
1211         mutex_unlock(&ctl_mutex);
1212         if (rc)
1213                 pr_warning(DRV_NAME "%d got notification but failed to update"
1214                            " snaps: %d\n", dev->major, rc);
1215
1216         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1217 }
1218
1219 /*
1220  * Request sync osd watch
1221  */
1222 static int rbd_req_sync_watch(struct rbd_device *dev,
1223                               const char *obj,
1224                               u64 ver)
1225 {
1226         struct ceph_osd_req_op *ops;
1227         struct ceph_osd_client *osdc = &dev->client->osdc;
1228
1229         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1230         if (ret < 0)
1231                 return ret;
1232
1233         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1234                                      (void *)dev, &dev->watch_event);
1235         if (ret < 0)
1236                 goto fail;
1237
1238         ops[0].watch.ver = cpu_to_le64(ver);
1239         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1240         ops[0].watch.flag = 1;
1241
1242         ret = rbd_req_sync_op(dev, NULL,
1243                               CEPH_NOSNAP,
1244                               0,
1245                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1246                               ops,
1247                               1, obj, 0, 0, NULL,
1248                               &dev->watch_request, NULL);
1249
1250         if (ret < 0)
1251                 goto fail_event;
1252
1253         rbd_destroy_ops(ops);
1254         return 0;
1255
1256 fail_event:
1257         ceph_osdc_cancel_event(dev->watch_event);
1258         dev->watch_event = NULL;
1259 fail:
1260         rbd_destroy_ops(ops);
1261         return ret;
1262 }
1263
1264 /*
1265  * Request sync osd unwatch
1266  */
1267 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268                                 const char *obj)
1269 {
1270         struct ceph_osd_req_op *ops;
1271
1272         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1273         if (ret < 0)
1274                 return ret;
1275
1276         ops[0].watch.ver = 0;
1277         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278         ops[0].watch.flag = 0;
1279
1280         ret = rbd_req_sync_op(dev, NULL,
1281                               CEPH_NOSNAP,
1282                               0,
1283                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1284                               ops,
1285                               1, obj, 0, 0, NULL, NULL, NULL);
1286
1287         rbd_destroy_ops(ops);
1288         ceph_osdc_cancel_event(dev->watch_event);
1289         dev->watch_event = NULL;
1290         return ret;
1291 }
1292
1293 struct rbd_notify_info {
1294         struct rbd_device *dev;
1295 };
1296
1297 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1298 {
1299         struct rbd_device *dev = (struct rbd_device *)data;
1300         if (!dev)
1301                 return;
1302
1303         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1304                 notify_id, (int)opcode);
1305 }
1306
1307 /*
1308  * Request sync osd notify
1309  */
1310 static int rbd_req_sync_notify(struct rbd_device *dev,
1311                           const char *obj)
1312 {
1313         struct ceph_osd_req_op *ops;
1314         struct ceph_osd_client *osdc = &dev->client->osdc;
1315         struct ceph_osd_event *event;
1316         struct rbd_notify_info info;
1317         int payload_len = sizeof(u32) + sizeof(u32);
1318         int ret;
1319
1320         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1321         if (ret < 0)
1322                 return ret;
1323
1324         info.dev = dev;
1325
1326         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1327                                      (void *)&info, &event);
1328         if (ret < 0)
1329                 goto fail;
1330
1331         ops[0].watch.ver = 1;
1332         ops[0].watch.flag = 1;
1333         ops[0].watch.cookie = event->cookie;
1334         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1335         ops[0].watch.timeout = 12;
1336
1337         ret = rbd_req_sync_op(dev, NULL,
1338                                CEPH_NOSNAP,
1339                                0,
1340                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341                                ops,
1342                                1, obj, 0, 0, NULL, NULL, NULL);
1343         if (ret < 0)
1344                 goto fail_event;
1345
1346         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1347         dout("ceph_osdc_wait_event returned %d\n", ret);
1348         rbd_destroy_ops(ops);
1349         return 0;
1350
1351 fail_event:
1352         ceph_osdc_cancel_event(event);
1353 fail:
1354         rbd_destroy_ops(ops);
1355         return ret;
1356 }
1357
1358 /*
1359  * Request sync osd rollback
1360  */
1361 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1362                                      u64 snapid,
1363                                      const char *obj)
1364 {
1365         struct ceph_osd_req_op *ops;
1366         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1367         if (ret < 0)
1368                 return ret;
1369
1370         ops[0].snap.snapid = snapid;
1371
1372         ret = rbd_req_sync_op(dev, NULL,
1373                                CEPH_NOSNAP,
1374                                0,
1375                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1376                                ops,
1377                                1, obj, 0, 0, NULL, NULL, NULL);
1378
1379         rbd_destroy_ops(ops);
1380
1381         return ret;
1382 }
1383
1384 /*
1385  * Request sync osd read
1386  */
1387 static int rbd_req_sync_exec(struct rbd_device *dev,
1388                              const char *obj,
1389                              const char *cls,
1390                              const char *method,
1391                              const char *data,
1392                              int len,
1393                              u64 *ver)
1394 {
1395         struct ceph_osd_req_op *ops;
1396         int cls_len = strlen(cls);
1397         int method_len = strlen(method);
1398         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1399                                     cls_len + method_len + len);
1400         if (ret < 0)
1401                 return ret;
1402
1403         ops[0].cls.class_name = cls;
1404         ops[0].cls.class_len = (__u8)cls_len;
1405         ops[0].cls.method_name = method;
1406         ops[0].cls.method_len = (__u8)method_len;
1407         ops[0].cls.argc = 0;
1408         ops[0].cls.indata = data;
1409         ops[0].cls.indata_len = len;
1410
1411         ret = rbd_req_sync_op(dev, NULL,
1412                                CEPH_NOSNAP,
1413                                0,
1414                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1415                                ops,
1416                                1, obj, 0, 0, NULL, NULL, ver);
1417
1418         rbd_destroy_ops(ops);
1419
1420         dout("cls_exec returned %d\n", ret);
1421         return ret;
1422 }
1423
1424 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1425 {
1426         struct rbd_req_coll *coll =
1427                         kzalloc(sizeof(struct rbd_req_coll) +
1428                                 sizeof(struct rbd_req_status) * num_reqs,
1429                                 GFP_ATOMIC);
1430
1431         if (!coll)
1432                 return NULL;
1433         coll->total = num_reqs;
1434         kref_init(&coll->kref);
1435         return coll;
1436 }
1437
1438 /*
1439  * block device queue callback
1440  */
1441 static void rbd_rq_fn(struct request_queue *q)
1442 {
1443         struct rbd_device *rbd_dev = q->queuedata;
1444         struct request *rq;
1445         struct bio_pair *bp = NULL;
1446
1447         rq = blk_fetch_request(q);
1448
1449         while (1) {
1450                 struct bio *bio;
1451                 struct bio *rq_bio, *next_bio = NULL;
1452                 bool do_write;
1453                 int size, op_size = 0;
1454                 u64 ofs;
1455                 int num_segs, cur_seg = 0;
1456                 struct rbd_req_coll *coll;
1457
1458                 /* peek at request from block layer */
1459                 if (!rq)
1460                         break;
1461
1462                 dout("fetched request\n");
1463
1464                 /* filter out block requests we don't understand */
1465                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466                         __blk_end_request_all(rq, 0);
1467                         goto next;
1468                 }
1469
1470                 /* deduce our operation (read, write) */
1471                 do_write = (rq_data_dir(rq) == WRITE);
1472
1473                 size = blk_rq_bytes(rq);
1474                 ofs = blk_rq_pos(rq) * 512ULL;
1475                 rq_bio = rq->bio;
1476                 if (do_write && rbd_dev->read_only) {
1477                         __blk_end_request_all(rq, -EROFS);
1478                         goto next;
1479                 }
1480
1481                 spin_unlock_irq(q->queue_lock);
1482
1483                 dout("%s 0x%x bytes at 0x%llx\n",
1484                      do_write ? "write" : "read",
1485                      size, blk_rq_pos(rq) * 512ULL);
1486
1487                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1488                 coll = rbd_alloc_coll(num_segs);
1489                 if (!coll) {
1490                         spin_lock_irq(q->queue_lock);
1491                         __blk_end_request_all(rq, -ENOMEM);
1492                         goto next;
1493                 }
1494
1495                 do {
1496                         /* a bio clone to be passed down to OSD req */
1497                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1498                         op_size = rbd_get_segment(&rbd_dev->header,
1499                                                   rbd_dev->header.block_name,
1500                                                   ofs, size,
1501                                                   NULL, NULL);
1502                         kref_get(&coll->kref);
1503                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1504                                               op_size, GFP_ATOMIC);
1505                         if (!bio) {
1506                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1507                                                        -ENOMEM, op_size);
1508                                 goto next_seg;
1509                         }
1510
1511
1512                         /* init OSD command: write or read */
1513                         if (do_write)
1514                                 rbd_req_write(rq, rbd_dev,
1515                                               rbd_dev->header.snapc,
1516                                               ofs,
1517                                               op_size, bio,
1518                                               coll, cur_seg);
1519                         else
1520                                 rbd_req_read(rq, rbd_dev,
1521                                              cur_snap_id(rbd_dev),
1522                                              ofs,
1523                                              op_size, bio,
1524                                              coll, cur_seg);
1525
1526 next_seg:
1527                         size -= op_size;
1528                         ofs += op_size;
1529
1530                         cur_seg++;
1531                         rq_bio = next_bio;
1532                 } while (size > 0);
1533                 kref_put(&coll->kref, rbd_coll_release);
1534
1535                 if (bp)
1536                         bio_pair_release(bp);
1537                 spin_lock_irq(q->queue_lock);
1538 next:
1539                 rq = blk_fetch_request(q);
1540         }
1541 }
1542
1543 /*
1544  * a queue callback. Makes sure that we don't create a bio that spans across
1545  * multiple osd objects. One exception would be with a single page bios,
1546  * which we handle later at bio_chain_clone
1547  */
1548 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1549                           struct bio_vec *bvec)
1550 {
1551         struct rbd_device *rbd_dev = q->queuedata;
1552         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1553         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1554         unsigned int bio_sectors = bmd->bi_size >> 9;
1555         int max;
1556
1557         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1558                                  + bio_sectors)) << 9;
1559         if (max < 0)
1560                 max = 0; /* bio_add cannot handle a negative return */
1561         if (max <= bvec->bv_len && bio_sectors == 0)
1562                 return bvec->bv_len;
1563         return max;
1564 }
1565
1566 static void rbd_free_disk(struct rbd_device *rbd_dev)
1567 {
1568         struct gendisk *disk = rbd_dev->disk;
1569
1570         if (!disk)
1571                 return;
1572
1573         rbd_header_free(&rbd_dev->header);
1574
1575         if (disk->flags & GENHD_FL_UP)
1576                 del_gendisk(disk);
1577         if (disk->queue)
1578                 blk_cleanup_queue(disk->queue);
1579         put_disk(disk);
1580 }
1581
1582 /*
1583  * reload the ondisk the header 
1584  */
1585 static int rbd_read_header(struct rbd_device *rbd_dev,
1586                            struct rbd_image_header *header)
1587 {
1588         ssize_t rc;
1589         struct rbd_image_header_ondisk *dh;
1590         int snap_count = 0;
1591         u64 snap_names_len = 0;
1592         u64 ver;
1593
1594         while (1) {
1595                 int len = sizeof(*dh) +
1596                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1597                           snap_names_len;
1598
1599                 rc = -ENOMEM;
1600                 dh = kmalloc(len, GFP_KERNEL);
1601                 if (!dh)
1602                         return -ENOMEM;
1603
1604                 rc = rbd_req_sync_read(rbd_dev,
1605                                        NULL, CEPH_NOSNAP,
1606                                        rbd_dev->obj_md_name,
1607                                        0, len,
1608                                        (char *)dh, &ver);
1609                 if (rc < 0)
1610                         goto out_dh;
1611
1612                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1613                 if (rc < 0)
1614                         goto out_dh;
1615
1616                 if (snap_count != header->total_snaps) {
1617                         snap_count = header->total_snaps;
1618                         snap_names_len = header->snap_names_len;
1619                         rbd_header_free(header);
1620                         kfree(dh);
1621                         continue;
1622                 }
1623                 break;
1624         }
1625         header->obj_version = ver;
1626
1627 out_dh:
1628         kfree(dh);
1629         return rc;
1630 }
1631
1632 /*
1633  * create a snapshot
1634  */
1635 static int rbd_header_add_snap(struct rbd_device *dev,
1636                                const char *snap_name,
1637                                gfp_t gfp_flags)
1638 {
1639         int name_len = strlen(snap_name);
1640         u64 new_snapid;
1641         int ret;
1642         void *data, *p, *e;
1643         u64 ver;
1644
1645         /* we should create a snapshot only if we're pointing at the head */
1646         if (dev->cur_snap)
1647                 return -EINVAL;
1648
1649         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1650                                       &new_snapid);
1651         dout("created snapid=%lld\n", new_snapid);
1652         if (ret < 0)
1653                 return ret;
1654
1655         data = kmalloc(name_len + 16, gfp_flags);
1656         if (!data)
1657                 return -ENOMEM;
1658
1659         p = data;
1660         e = data + name_len + 16;
1661
1662         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1663         ceph_encode_64_safe(&p, e, new_snapid, bad);
1664
1665         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1666                                 data, p - data, &ver);
1667
1668         kfree(data);
1669
1670         if (ret < 0)
1671                 return ret;
1672
1673         dev->header.snapc->seq =  new_snapid;
1674
1675         return 0;
1676 bad:
1677         return -ERANGE;
1678 }
1679
1680 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1681 {
1682         struct rbd_snap *snap;
1683
1684         while (!list_empty(&rbd_dev->snaps)) {
1685                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1686                 __rbd_remove_snap_dev(rbd_dev, snap);
1687         }
1688 }
1689
1690 /*
1691  * only read the first part of the ondisk header, without the snaps info
1692  */
1693 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1694 {
1695         int ret;
1696         struct rbd_image_header h;
1697         u64 snap_seq;
1698         int follow_seq = 0;
1699
1700         ret = rbd_read_header(rbd_dev, &h);
1701         if (ret < 0)
1702                 return ret;
1703
1704         /* resized? */
1705         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1706
1707         down_write(&rbd_dev->header.snap_rwsem);
1708
1709         snap_seq = rbd_dev->header.snapc->seq;
1710         if (rbd_dev->header.total_snaps &&
1711             rbd_dev->header.snapc->snaps[0] == snap_seq)
1712                 /* pointing at the head, will need to follow that
1713                    if head moves */
1714                 follow_seq = 1;
1715
1716         kfree(rbd_dev->header.snapc);
1717         kfree(rbd_dev->header.snap_names);
1718         kfree(rbd_dev->header.snap_sizes);
1719
1720         rbd_dev->header.total_snaps = h.total_snaps;
1721         rbd_dev->header.snapc = h.snapc;
1722         rbd_dev->header.snap_names = h.snap_names;
1723         rbd_dev->header.snap_names_len = h.snap_names_len;
1724         rbd_dev->header.snap_sizes = h.snap_sizes;
1725         if (follow_seq)
1726                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1727         else
1728                 rbd_dev->header.snapc->seq = snap_seq;
1729
1730         ret = __rbd_init_snaps_header(rbd_dev);
1731
1732         up_write(&rbd_dev->header.snap_rwsem);
1733
1734         return ret;
1735 }
1736
1737 static int rbd_init_disk(struct rbd_device *rbd_dev)
1738 {
1739         struct gendisk *disk;
1740         struct request_queue *q;
1741         int rc;
1742         u64 total_size = 0;
1743
1744         /* contact OSD, request size info about the object being mapped */
1745         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746         if (rc)
1747                 return rc;
1748
1749         /* no need to lock here, as rbd_dev is not registered yet */
1750         rc = __rbd_init_snaps_header(rbd_dev);
1751         if (rc)
1752                 return rc;
1753
1754         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1755         if (rc)
1756                 return rc;
1757
1758         /* create gendisk info */
1759         rc = -ENOMEM;
1760         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761         if (!disk)
1762                 goto out;
1763
1764         snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1765                  rbd_dev->id);
1766         disk->major = rbd_dev->major;
1767         disk->first_minor = 0;
1768         disk->fops = &rbd_bd_ops;
1769         disk->private_data = rbd_dev;
1770
1771         /* init rq */
1772         rc = -ENOMEM;
1773         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774         if (!q)
1775                 goto out_disk;
1776
1777         /* set io sizes to object size */
1778         blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1779         blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1780         blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1781         blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1782
1783         blk_queue_merge_bvec(q, rbd_merge_bvec);
1784         disk->queue = q;
1785
1786         q->queuedata = rbd_dev;
1787
1788         rbd_dev->disk = disk;
1789         rbd_dev->q = q;
1790
1791         /* finally, announce the disk to the world */
1792         set_capacity(disk, total_size / 512ULL);
1793         add_disk(disk);
1794
1795         pr_info("%s: added with size 0x%llx\n",
1796                 disk->disk_name, (unsigned long long)total_size);
1797         return 0;
1798
1799 out_disk:
1800         put_disk(disk);
1801 out:
1802         return rc;
1803 }
1804
1805 /*
1806   sysfs
1807 */
1808
1809 static ssize_t rbd_size_show(struct device *dev,
1810                              struct device_attribute *attr, char *buf)
1811 {
1812         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1813
1814         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1815 }
1816
1817 static ssize_t rbd_major_show(struct device *dev,
1818                               struct device_attribute *attr, char *buf)
1819 {
1820         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1821
1822         return sprintf(buf, "%d\n", rbd_dev->major);
1823 }
1824
1825 static ssize_t rbd_client_id_show(struct device *dev,
1826                                   struct device_attribute *attr, char *buf)
1827 {
1828         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1829
1830         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1831 }
1832
1833 static ssize_t rbd_pool_show(struct device *dev,
1834                              struct device_attribute *attr, char *buf)
1835 {
1836         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1837
1838         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1839 }
1840
1841 static ssize_t rbd_name_show(struct device *dev,
1842                              struct device_attribute *attr, char *buf)
1843 {
1844         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1845
1846         return sprintf(buf, "%s\n", rbd_dev->obj);
1847 }
1848
1849 static ssize_t rbd_snap_show(struct device *dev,
1850                              struct device_attribute *attr,
1851                              char *buf)
1852 {
1853         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1854
1855         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1856 }
1857
1858 static ssize_t rbd_image_refresh(struct device *dev,
1859                                  struct device_attribute *attr,
1860                                  const char *buf,
1861                                  size_t size)
1862 {
1863         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1864         int rc;
1865         int ret = size;
1866
1867         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1868
1869         rc = __rbd_update_snaps(rbd_dev);
1870         if (rc < 0)
1871                 ret = rc;
1872
1873         mutex_unlock(&ctl_mutex);
1874         return ret;
1875 }
1876
1877 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1878 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1879 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1880 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1881 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1882 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1883 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1884 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1885 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1886
1887 static struct attribute *rbd_attrs[] = {
1888         &dev_attr_size.attr,
1889         &dev_attr_major.attr,
1890         &dev_attr_client_id.attr,
1891         &dev_attr_pool.attr,
1892         &dev_attr_name.attr,
1893         &dev_attr_current_snap.attr,
1894         &dev_attr_refresh.attr,
1895         &dev_attr_create_snap.attr,
1896         &dev_attr_rollback_snap.attr,
1897         NULL
1898 };
1899
1900 static struct attribute_group rbd_attr_group = {
1901         .attrs = rbd_attrs,
1902 };
1903
1904 static const struct attribute_group *rbd_attr_groups[] = {
1905         &rbd_attr_group,
1906         NULL
1907 };
1908
1909 static void rbd_sysfs_dev_release(struct device *dev)
1910 {
1911 }
1912
1913 static struct device_type rbd_device_type = {
1914         .name           = "rbd",
1915         .groups         = rbd_attr_groups,
1916         .release        = rbd_sysfs_dev_release,
1917 };
1918
1919
1920 /*
1921   sysfs - snapshots
1922 */
1923
1924 static ssize_t rbd_snap_size_show(struct device *dev,
1925                                   struct device_attribute *attr,
1926                                   char *buf)
1927 {
1928         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1929
1930         return sprintf(buf, "%lld\n", (long long)snap->size);
1931 }
1932
1933 static ssize_t rbd_snap_id_show(struct device *dev,
1934                                 struct device_attribute *attr,
1935                                 char *buf)
1936 {
1937         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1938
1939         return sprintf(buf, "%lld\n", (long long)snap->id);
1940 }
1941
1942 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1943 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1944
1945 static struct attribute *rbd_snap_attrs[] = {
1946         &dev_attr_snap_size.attr,
1947         &dev_attr_snap_id.attr,
1948         NULL,
1949 };
1950
1951 static struct attribute_group rbd_snap_attr_group = {
1952         .attrs = rbd_snap_attrs,
1953 };
1954
1955 static void rbd_snap_dev_release(struct device *dev)
1956 {
1957         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1958         kfree(snap->name);
1959         kfree(snap);
1960 }
1961
1962 static const struct attribute_group *rbd_snap_attr_groups[] = {
1963         &rbd_snap_attr_group,
1964         NULL
1965 };
1966
1967 static struct device_type rbd_snap_device_type = {
1968         .groups         = rbd_snap_attr_groups,
1969         .release        = rbd_snap_dev_release,
1970 };
1971
1972 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1973                                   struct rbd_snap *snap)
1974 {
1975         list_del(&snap->node);
1976         device_unregister(&snap->dev);
1977 }
1978
1979 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1980                                   struct rbd_snap *snap,
1981                                   struct device *parent)
1982 {
1983         struct device *dev = &snap->dev;
1984         int ret;
1985
1986         dev->type = &rbd_snap_device_type;
1987         dev->parent = parent;
1988         dev->release = rbd_snap_dev_release;
1989         dev_set_name(dev, "snap_%s", snap->name);
1990         ret = device_register(dev);
1991
1992         return ret;
1993 }
1994
1995 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1996                               int i, const char *name,
1997                               struct rbd_snap **snapp)
1998 {
1999         int ret;
2000         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2001         if (!snap)
2002                 return -ENOMEM;
2003         snap->name = kstrdup(name, GFP_KERNEL);
2004         snap->size = rbd_dev->header.snap_sizes[i];
2005         snap->id = rbd_dev->header.snapc->snaps[i];
2006         if (device_is_registered(&rbd_dev->dev)) {
2007                 ret = rbd_register_snap_dev(rbd_dev, snap,
2008                                              &rbd_dev->dev);
2009                 if (ret < 0)
2010                         goto err;
2011         }
2012         *snapp = snap;
2013         return 0;
2014 err:
2015         kfree(snap->name);
2016         kfree(snap);
2017         return ret;
2018 }
2019
2020 /*
2021  * search for the previous snap in a null delimited string list
2022  */
2023 const char *rbd_prev_snap_name(const char *name, const char *start)
2024 {
2025         if (name < start + 2)
2026                 return NULL;
2027
2028         name -= 2;
2029         while (*name) {
2030                 if (name == start)
2031                         return start;
2032                 name--;
2033         }
2034         return name + 1;
2035 }
2036
2037 /*
2038  * compare the old list of snapshots that we have to what's in the header
2039  * and update it accordingly. Note that the header holds the snapshots
2040  * in a reverse order (from newest to oldest) and we need to go from
2041  * older to new so that we don't get a duplicate snap name when
2042  * doing the process (e.g., removed snapshot and recreated a new
2043  * one with the same name.
2044  */
2045 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2046 {
2047         const char *name, *first_name;
2048         int i = rbd_dev->header.total_snaps;
2049         struct rbd_snap *snap, *old_snap = NULL;
2050         int ret;
2051         struct list_head *p, *n;
2052
2053         first_name = rbd_dev->header.snap_names;
2054         name = first_name + rbd_dev->header.snap_names_len;
2055
2056         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2057                 u64 cur_id;
2058
2059                 old_snap = list_entry(p, struct rbd_snap, node);
2060
2061                 if (i)
2062                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2063
2064                 if (!i || old_snap->id < cur_id) {
2065                         /* old_snap->id was skipped, thus was removed */
2066                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2067                         continue;
2068                 }
2069                 if (old_snap->id == cur_id) {
2070                         /* we have this snapshot already */
2071                         i--;
2072                         name = rbd_prev_snap_name(name, first_name);
2073                         continue;
2074                 }
2075                 for (; i > 0;
2076                      i--, name = rbd_prev_snap_name(name, first_name)) {
2077                         if (!name) {
2078                                 WARN_ON(1);
2079                                 return -EINVAL;
2080                         }
2081                         cur_id = rbd_dev->header.snapc->snaps[i];
2082                         /* snapshot removal? handle it above */
2083                         if (cur_id >= old_snap->id)
2084                                 break;
2085                         /* a new snapshot */
2086                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2087                         if (ret < 0)
2088                                 return ret;
2089
2090                         /* note that we add it backward so using n and not p */
2091                         list_add(&snap->node, n);
2092                         p = &snap->node;
2093                 }
2094         }
2095         /* we're done going over the old snap list, just add what's left */
2096         for (; i > 0; i--) {
2097                 name = rbd_prev_snap_name(name, first_name);
2098                 if (!name) {
2099                         WARN_ON(1);
2100                         return -EINVAL;
2101                 }
2102                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2103                 if (ret < 0)
2104                         return ret;
2105                 list_add(&snap->node, &rbd_dev->snaps);
2106         }
2107
2108         return 0;
2109 }
2110
2111
2112 static void rbd_root_dev_release(struct device *dev)
2113 {
2114 }
2115
2116 static struct device rbd_root_dev = {
2117         .init_name =    "rbd",
2118         .release =      rbd_root_dev_release,
2119 };
2120
2121 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2122 {
2123         int ret = -ENOMEM;
2124         struct device *dev;
2125         struct rbd_snap *snap;
2126
2127         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2128         dev = &rbd_dev->dev;
2129
2130         dev->bus = &rbd_bus_type;
2131         dev->type = &rbd_device_type;
2132         dev->parent = &rbd_root_dev;
2133         dev->release = rbd_dev_release;
2134         dev_set_name(dev, "%d", rbd_dev->id);
2135         ret = device_register(dev);
2136         if (ret < 0)
2137                 goto done_free;
2138
2139         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2140                 ret = rbd_register_snap_dev(rbd_dev, snap,
2141                                              &rbd_dev->dev);
2142                 if (ret < 0)
2143                         break;
2144         }
2145
2146         mutex_unlock(&ctl_mutex);
2147         return 0;
2148 done_free:
2149         mutex_unlock(&ctl_mutex);
2150         return ret;
2151 }
2152
2153 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2154 {
2155         device_unregister(&rbd_dev->dev);
2156 }
2157
2158 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2159 {
2160         int ret, rc;
2161
2162         do {
2163                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2164                                          rbd_dev->header.obj_version);
2165                 if (ret == -ERANGE) {
2166                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2167                         rc = __rbd_update_snaps(rbd_dev);
2168                         mutex_unlock(&ctl_mutex);
2169                         if (rc < 0)
2170                                 return rc;
2171                 }
2172         } while (ret == -ERANGE);
2173
2174         return ret;
2175 }
2176
2177 static ssize_t rbd_add(struct bus_type *bus,
2178                        const char *buf,
2179                        size_t count)
2180 {
2181         struct ceph_osd_client *osdc;
2182         struct rbd_device *rbd_dev;
2183         ssize_t rc = -ENOMEM;
2184         int irc, new_id = 0;
2185         struct list_head *tmp;
2186         char *mon_dev_name;
2187         char *options;
2188
2189         if (!try_module_get(THIS_MODULE))
2190                 return -ENODEV;
2191
2192         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2193         if (!mon_dev_name)
2194                 goto err_out_mod;
2195
2196         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2197         if (!options)
2198                 goto err_mon_dev;
2199
2200         /* new rbd_device object */
2201         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2202         if (!rbd_dev)
2203                 goto err_out_opt;
2204
2205         /* static rbd_device initialization */
2206         spin_lock_init(&rbd_dev->lock);
2207         INIT_LIST_HEAD(&rbd_dev->node);
2208         INIT_LIST_HEAD(&rbd_dev->snaps);
2209
2210         /* generate unique id: find highest unique id, add one */
2211         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2212
2213         list_for_each(tmp, &rbd_dev_list) {
2214                 struct rbd_device *rbd_dev;
2215
2216                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2217                 if (rbd_dev->id >= new_id)
2218                         new_id = rbd_dev->id + 1;
2219         }
2220
2221         rbd_dev->id = new_id;
2222
2223         /* add to global list */
2224         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2225
2226         /* parse add command */
2227         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2228                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2229                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2230                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2231                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2232                    mon_dev_name, options, rbd_dev->pool_name,
2233                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2234                 rc = -EINVAL;
2235                 goto err_out_slot;
2236         }
2237
2238         if (rbd_dev->snap_name[0] == 0)
2239                 rbd_dev->snap_name[0] = '-';
2240
2241         rbd_dev->obj_len = strlen(rbd_dev->obj);
2242         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2243                  rbd_dev->obj, RBD_SUFFIX);
2244
2245         /* initialize rest of new object */
2246         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2247         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2248         if (rc < 0)
2249                 goto err_out_slot;
2250
2251         mutex_unlock(&ctl_mutex);
2252
2253         /* pick the pool */
2254         osdc = &rbd_dev->client->osdc;
2255         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2256         if (rc < 0)
2257                 goto err_out_client;
2258         rbd_dev->poolid = rc;
2259
2260         /* register our block device */
2261         irc = register_blkdev(0, rbd_dev->name);
2262         if (irc < 0) {
2263                 rc = irc;
2264                 goto err_out_client;
2265         }
2266         rbd_dev->major = irc;
2267
2268         rc = rbd_bus_add_dev(rbd_dev);
2269         if (rc)
2270                 goto err_out_blkdev;
2271
2272         /* set up and announce blkdev mapping */
2273         rc = rbd_init_disk(rbd_dev);
2274         if (rc)
2275                 goto err_out_bus;
2276
2277         rc = rbd_init_watch_dev(rbd_dev);
2278         if (rc)
2279                 goto err_out_bus;
2280
2281         return count;
2282
2283 err_out_bus:
2284         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2285         list_del_init(&rbd_dev->node);
2286         mutex_unlock(&ctl_mutex);
2287
2288         /* this will also clean up rest of rbd_dev stuff */
2289
2290         rbd_bus_del_dev(rbd_dev);
2291         kfree(options);
2292         kfree(mon_dev_name);
2293         return rc;
2294
2295 err_out_blkdev:
2296         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2297 err_out_client:
2298         rbd_put_client(rbd_dev);
2299         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2300 err_out_slot:
2301         list_del_init(&rbd_dev->node);
2302         mutex_unlock(&ctl_mutex);
2303
2304         kfree(rbd_dev);
2305 err_out_opt:
2306         kfree(options);
2307 err_mon_dev:
2308         kfree(mon_dev_name);
2309 err_out_mod:
2310         dout("Error adding device %s\n", buf);
2311         module_put(THIS_MODULE);
2312         return rc;
2313 }
2314
2315 static struct rbd_device *__rbd_get_dev(unsigned long id)
2316 {
2317         struct list_head *tmp;
2318         struct rbd_device *rbd_dev;
2319
2320         list_for_each(tmp, &rbd_dev_list) {
2321                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2322                 if (rbd_dev->id == id)
2323                         return rbd_dev;
2324         }
2325         return NULL;
2326 }
2327
2328 static void rbd_dev_release(struct device *dev)
2329 {
2330         struct rbd_device *rbd_dev =
2331                         container_of(dev, struct rbd_device, dev);
2332
2333         if (rbd_dev->watch_request)
2334                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2335                                                     rbd_dev->watch_request);
2336         if (rbd_dev->watch_event)
2337                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2338
2339         rbd_put_client(rbd_dev);
2340
2341         /* clean up and free blkdev */
2342         rbd_free_disk(rbd_dev);
2343         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2344         kfree(rbd_dev);
2345
2346         /* release module ref */
2347         module_put(THIS_MODULE);
2348 }
2349
2350 static ssize_t rbd_remove(struct bus_type *bus,
2351                           const char *buf,
2352                           size_t count)
2353 {
2354         struct rbd_device *rbd_dev = NULL;
2355         int target_id, rc;
2356         unsigned long ul;
2357         int ret = count;
2358
2359         rc = strict_strtoul(buf, 10, &ul);
2360         if (rc)
2361                 return rc;
2362
2363         /* convert to int; abort if we lost anything in the conversion */
2364         target_id = (int) ul;
2365         if (target_id != ul)
2366                 return -EINVAL;
2367
2368         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2369
2370         rbd_dev = __rbd_get_dev(target_id);
2371         if (!rbd_dev) {
2372                 ret = -ENOENT;
2373                 goto done;
2374         }
2375
2376         list_del_init(&rbd_dev->node);
2377
2378         __rbd_remove_all_snaps(rbd_dev);
2379         rbd_bus_del_dev(rbd_dev);
2380
2381 done:
2382         mutex_unlock(&ctl_mutex);
2383         return ret;
2384 }
2385
2386 static ssize_t rbd_snap_add(struct device *dev,
2387                             struct device_attribute *attr,
2388                             const char *buf,
2389                             size_t count)
2390 {
2391         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2392         int ret;
2393         char *name = kmalloc(count + 1, GFP_KERNEL);
2394         if (!name)
2395                 return -ENOMEM;
2396
2397         snprintf(name, count, "%s", buf);
2398
2399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2400
2401         ret = rbd_header_add_snap(rbd_dev,
2402                                   name, GFP_KERNEL);
2403         if (ret < 0)
2404                 goto err_unlock;
2405
2406         ret = __rbd_update_snaps(rbd_dev);
2407         if (ret < 0)
2408                 goto err_unlock;
2409
2410         /* shouldn't hold ctl_mutex when notifying.. notify might
2411            trigger a watch callback that would need to get that mutex */
2412         mutex_unlock(&ctl_mutex);
2413
2414         /* make a best effort, don't error if failed */
2415         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2416
2417         ret = count;
2418         kfree(name);
2419         return ret;
2420
2421 err_unlock:
2422         mutex_unlock(&ctl_mutex);
2423         kfree(name);
2424         return ret;
2425 }
2426
2427 static ssize_t rbd_snap_rollback(struct device *dev,
2428                                  struct device_attribute *attr,
2429                                  const char *buf,
2430                                  size_t count)
2431 {
2432         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2433         int ret;
2434         u64 snapid;
2435         u64 cur_ofs;
2436         char *seg_name = NULL;
2437         char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2438         ret = -ENOMEM;
2439         if (!snap_name)
2440                 return ret;
2441
2442         /* parse snaps add command */
2443         snprintf(snap_name, count, "%s", buf);
2444         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2445         if (!seg_name)
2446                 goto done;
2447
2448         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2449
2450         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2451         if (ret < 0)
2452                 goto done_unlock;
2453
2454         dout("snapid=%lld\n", snapid);
2455
2456         cur_ofs = 0;
2457         while (cur_ofs < rbd_dev->header.image_size) {
2458                 cur_ofs += rbd_get_segment(&rbd_dev->header,
2459                                            rbd_dev->obj,
2460                                            cur_ofs, (u64)-1,
2461                                            seg_name, NULL);
2462                 dout("seg_name=%s\n", seg_name);
2463
2464                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2465                 if (ret < 0)
2466                         pr_warning("could not roll back obj %s err=%d\n",
2467                                    seg_name, ret);
2468         }
2469
2470         ret = __rbd_update_snaps(rbd_dev);
2471         if (ret < 0)
2472                 goto done_unlock;
2473
2474         ret = count;
2475
2476 done_unlock:
2477         mutex_unlock(&ctl_mutex);
2478 done:
2479         kfree(seg_name);
2480         kfree(snap_name);
2481
2482         return ret;
2483 }
2484
2485 static struct bus_attribute rbd_bus_attrs[] = {
2486         __ATTR(add, S_IWUSR, NULL, rbd_add),
2487         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2488         __ATTR_NULL
2489 };
2490
2491 /*
2492  * create control files in sysfs
2493  * /sys/bus/rbd/...
2494  */
2495 static int rbd_sysfs_init(void)
2496 {
2497         int ret;
2498
2499         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2500
2501         ret = bus_register(&rbd_bus_type);
2502          if (ret < 0)
2503                 return ret;
2504
2505         ret = device_register(&rbd_root_dev);
2506
2507         return ret;
2508 }
2509
2510 static void rbd_sysfs_cleanup(void)
2511 {
2512         device_unregister(&rbd_root_dev);
2513         bus_unregister(&rbd_bus_type);
2514 }
2515
2516 int __init rbd_init(void)
2517 {
2518         int rc;
2519
2520         rc = rbd_sysfs_init();
2521         if (rc)
2522                 return rc;
2523         spin_lock_init(&node_lock);
2524         pr_info("loaded " DRV_NAME_LONG "\n");
2525         return 0;
2526 }
2527
2528 void __exit rbd_exit(void)
2529 {
2530         rbd_sysfs_cleanup();
2531 }
2532
2533 module_init(rbd_init);
2534 module_exit(rbd_exit);
2535
2536 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2537 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2538 MODULE_DESCRIPTION("rados block device");
2539
2540 /* following authorship retained from original osdblk.c */
2541 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2542
2543 MODULE_LICENSE("GPL");