2146cab1c61be3769c5b5792bffad0da922703db
[linux-2.6.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 /*
96  * a single io request
97  */
98 struct rbd_request {
99         struct request          *rq;            /* blk layer request */
100         struct bio              *bio;           /* cloned bio */
101         struct page             **pages;        /* list of used pages */
102         u64                     len;
103 };
104
105 struct rbd_snap {
106         struct  device          dev;
107         const char              *name;
108         size_t                  size;
109         struct list_head        node;
110         u64                     id;
111 };
112
113 /*
114  * a single device
115  */
116 struct rbd_device {
117         int                     id;             /* blkdev unique id */
118
119         int                     major;          /* blkdev assigned major */
120         struct gendisk          *disk;          /* blkdev's gendisk and rq */
121         struct request_queue    *q;
122
123         struct ceph_client      *client;
124         struct rbd_client       *rbd_client;
125
126         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
127
128         spinlock_t              lock;           /* queue lock */
129
130         struct rbd_image_header header;
131         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
132         int                     obj_len;
133         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
134         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
135         int                     poolid;
136
137         struct ceph_osd_event   *watch_event;
138         struct ceph_osd_request *watch_request;
139
140         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
141         u32 cur_snap;   /* index+1 of current snapshot within snap context
142                            0 - for the head */
143         int read_only;
144
145         struct list_head        node;
146
147         /* list of snapshots */
148         struct list_head        snaps;
149
150         /* sysfs related */
151         struct device           dev;
152 };
153
154 static struct bus_type rbd_bus_type = {
155         .name           = "rbd",
156 };
157
158 static spinlock_t node_lock;      /* protects client get/put */
159
160 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
161 static LIST_HEAD(rbd_dev_list);    /* devices */
162 static LIST_HEAD(rbd_client_list);      /* clients */
163
164 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
165 static void rbd_dev_release(struct device *dev);
166 static ssize_t rbd_snap_rollback(struct device *dev,
167                                  struct device_attribute *attr,
168                                  const char *buf,
169                                  size_t size);
170 static ssize_t rbd_snap_add(struct device *dev,
171                             struct device_attribute *attr,
172                             const char *buf,
173                             size_t count);
174 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
175                                   struct rbd_snap *snap);;
176
177
178 static struct rbd_device *dev_to_rbd(struct device *dev)
179 {
180         return container_of(dev, struct rbd_device, dev);
181 }
182
183 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
184 {
185         return get_device(&rbd_dev->dev);
186 }
187
188 static void rbd_put_dev(struct rbd_device *rbd_dev)
189 {
190         put_device(&rbd_dev->dev);
191 }
192
193 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
194
195 static int rbd_open(struct block_device *bdev, fmode_t mode)
196 {
197         struct gendisk *disk = bdev->bd_disk;
198         struct rbd_device *rbd_dev = disk->private_data;
199
200         rbd_get_dev(rbd_dev);
201
202         set_device_ro(bdev, rbd_dev->read_only);
203
204         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
205                 return -EROFS;
206
207         return 0;
208 }
209
210 static int rbd_release(struct gendisk *disk, fmode_t mode)
211 {
212         struct rbd_device *rbd_dev = disk->private_data;
213
214         rbd_put_dev(rbd_dev);
215
216         return 0;
217 }
218
219 static const struct block_device_operations rbd_bd_ops = {
220         .owner                  = THIS_MODULE,
221         .open                   = rbd_open,
222         .release                = rbd_release,
223 };
224
225 /*
226  * Initialize an rbd client instance.
227  * We own *opt.
228  */
229 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
230                                             struct rbd_options *rbd_opts)
231 {
232         struct rbd_client *rbdc;
233         int ret = -ENOMEM;
234
235         dout("rbd_client_create\n");
236         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
237         if (!rbdc)
238                 goto out_opt;
239
240         kref_init(&rbdc->kref);
241         INIT_LIST_HEAD(&rbdc->node);
242
243         rbdc->client = ceph_create_client(opt, rbdc);
244         if (IS_ERR(rbdc->client))
245                 goto out_rbdc;
246         opt = NULL; /* Now rbdc->client is responsible for opt */
247
248         ret = ceph_open_session(rbdc->client);
249         if (ret < 0)
250                 goto out_err;
251
252         rbdc->rbd_opts = rbd_opts;
253
254         spin_lock(&node_lock);
255         list_add_tail(&rbdc->node, &rbd_client_list);
256         spin_unlock(&node_lock);
257
258         dout("rbd_client_create created %p\n", rbdc);
259         return rbdc;
260
261 out_err:
262         ceph_destroy_client(rbdc->client);
263 out_rbdc:
264         kfree(rbdc);
265 out_opt:
266         if (opt)
267                 ceph_destroy_options(opt);
268         return ERR_PTR(ret);
269 }
270
271 /*
272  * Find a ceph client with specific addr and configuration.
273  */
274 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
275 {
276         struct rbd_client *client_node;
277
278         if (opt->flags & CEPH_OPT_NOSHARE)
279                 return NULL;
280
281         list_for_each_entry(client_node, &rbd_client_list, node)
282                 if (ceph_compare_options(opt, client_node->client) == 0)
283                         return client_node;
284         return NULL;
285 }
286
287 /*
288  * mount options
289  */
290 enum {
291         Opt_notify_timeout,
292         Opt_last_int,
293         /* int args above */
294         Opt_last_string,
295         /* string args above */
296 };
297
298 static match_table_t rbdopt_tokens = {
299         {Opt_notify_timeout, "notify_timeout=%d"},
300         /* int args above */
301         /* string args above */
302         {-1, NULL}
303 };
304
305 static int parse_rbd_opts_token(char *c, void *private)
306 {
307         struct rbd_options *rbdopt = private;
308         substring_t argstr[MAX_OPT_ARGS];
309         int token, intval, ret;
310
311         token = match_token((char *)c, rbdopt_tokens, argstr);
312         if (token < 0)
313                 return -EINVAL;
314
315         if (token < Opt_last_int) {
316                 ret = match_int(&argstr[0], &intval);
317                 if (ret < 0) {
318                         pr_err("bad mount option arg (not int) "
319                                "at '%s'\n", c);
320                         return ret;
321                 }
322                 dout("got int token %d val %d\n", token, intval);
323         } else if (token > Opt_last_int && token < Opt_last_string) {
324                 dout("got string token %d val %s\n", token,
325                      argstr[0].from);
326         } else {
327                 dout("got token %d\n", token);
328         }
329
330         switch (token) {
331         case Opt_notify_timeout:
332                 rbdopt->notify_timeout = intval;
333                 break;
334         default:
335                 BUG_ON(token);
336         }
337         return 0;
338 }
339
340 /*
341  * Get a ceph client with specific addr and configuration, if one does
342  * not exist create it.
343  */
344 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
345                           char *options)
346 {
347         struct rbd_client *rbdc;
348         struct ceph_options *opt;
349         int ret;
350         struct rbd_options *rbd_opts;
351
352         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
353         if (!rbd_opts)
354                 return -ENOMEM;
355
356         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
357
358         ret = ceph_parse_options(&opt, options, mon_addr,
359                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
360         if (ret < 0)
361                 goto done_err;
362
363         spin_lock(&node_lock);
364         rbdc = __rbd_client_find(opt);
365         if (rbdc) {
366                 ceph_destroy_options(opt);
367
368                 /* using an existing client */
369                 kref_get(&rbdc->kref);
370                 rbd_dev->rbd_client = rbdc;
371                 rbd_dev->client = rbdc->client;
372                 spin_unlock(&node_lock);
373                 return 0;
374         }
375         spin_unlock(&node_lock);
376
377         rbdc = rbd_client_create(opt, rbd_opts);
378         if (IS_ERR(rbdc)) {
379                 ret = PTR_ERR(rbdc);
380                 goto done_err;
381         }
382
383         rbd_dev->rbd_client = rbdc;
384         rbd_dev->client = rbdc->client;
385         return 0;
386 done_err:
387         kfree(rbd_opts);
388         return ret;
389 }
390
391 /*
392  * Destroy ceph client
393  */
394 static void rbd_client_release(struct kref *kref)
395 {
396         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
397
398         dout("rbd_release_client %p\n", rbdc);
399         spin_lock(&node_lock);
400         list_del(&rbdc->node);
401         spin_unlock(&node_lock);
402
403         ceph_destroy_client(rbdc->client);
404         kfree(rbdc->rbd_opts);
405         kfree(rbdc);
406 }
407
408 /*
409  * Drop reference to ceph client node. If it's not referenced anymore, release
410  * it.
411  */
412 static void rbd_put_client(struct rbd_device *rbd_dev)
413 {
414         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
415         rbd_dev->rbd_client = NULL;
416         rbd_dev->client = NULL;
417 }
418
419
420 /*
421  * Create a new header structure, translate header format from the on-disk
422  * header.
423  */
424 static int rbd_header_from_disk(struct rbd_image_header *header,
425                                  struct rbd_image_header_ondisk *ondisk,
426                                  int allocated_snaps,
427                                  gfp_t gfp_flags)
428 {
429         int i;
430         u32 snap_count = le32_to_cpu(ondisk->snap_count);
431         int ret = -ENOMEM;
432
433         init_rwsem(&header->snap_rwsem);
434         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
435         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
436                                 snap_count *
437                                  sizeof(struct rbd_image_snap_ondisk),
438                                 gfp_flags);
439         if (!header->snapc)
440                 return -ENOMEM;
441         if (snap_count) {
442                 header->snap_names = kmalloc(header->snap_names_len,
443                                              GFP_KERNEL);
444                 if (!header->snap_names)
445                         goto err_snapc;
446                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
447                                              GFP_KERNEL);
448                 if (!header->snap_sizes)
449                         goto err_names;
450         } else {
451                 header->snap_names = NULL;
452                 header->snap_sizes = NULL;
453         }
454         memcpy(header->block_name, ondisk->block_name,
455                sizeof(ondisk->block_name));
456
457         header->image_size = le64_to_cpu(ondisk->image_size);
458         header->obj_order = ondisk->options.order;
459         header->crypt_type = ondisk->options.crypt_type;
460         header->comp_type = ondisk->options.comp_type;
461
462         atomic_set(&header->snapc->nref, 1);
463         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
464         header->snapc->num_snaps = snap_count;
465         header->total_snaps = snap_count;
466
467         if (snap_count &&
468             allocated_snaps == snap_count) {
469                 for (i = 0; i < snap_count; i++) {
470                         header->snapc->snaps[i] =
471                                 le64_to_cpu(ondisk->snaps[i].id);
472                         header->snap_sizes[i] =
473                                 le64_to_cpu(ondisk->snaps[i].image_size);
474                 }
475
476                 /* copy snapshot names */
477                 memcpy(header->snap_names, &ondisk->snaps[i],
478                         header->snap_names_len);
479         }
480
481         return 0;
482
483 err_names:
484         kfree(header->snap_names);
485 err_snapc:
486         kfree(header->snapc);
487         return ret;
488 }
489
490 static int snap_index(struct rbd_image_header *header, int snap_num)
491 {
492         return header->total_snaps - snap_num;
493 }
494
495 static u64 cur_snap_id(struct rbd_device *rbd_dev)
496 {
497         struct rbd_image_header *header = &rbd_dev->header;
498
499         if (!rbd_dev->cur_snap)
500                 return 0;
501
502         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
503 }
504
505 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
506                         u64 *seq, u64 *size)
507 {
508         int i;
509         char *p = header->snap_names;
510
511         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
512                 if (strcmp(snap_name, p) == 0)
513                         break;
514         }
515         if (i == header->total_snaps)
516                 return -ENOENT;
517         if (seq)
518                 *seq = header->snapc->snaps[i];
519
520         if (size)
521                 *size = header->snap_sizes[i];
522
523         return i;
524 }
525
526 static int rbd_header_set_snap(struct rbd_device *dev,
527                                const char *snap_name,
528                                u64 *size)
529 {
530         struct rbd_image_header *header = &dev->header;
531         struct ceph_snap_context *snapc = header->snapc;
532         int ret = -ENOENT;
533
534         down_write(&header->snap_rwsem);
535
536         if (!snap_name ||
537             !*snap_name ||
538             strcmp(snap_name, "-") == 0 ||
539             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
540                 if (header->total_snaps)
541                         snapc->seq = header->snap_seq;
542                 else
543                         snapc->seq = 0;
544                 dev->cur_snap = 0;
545                 dev->read_only = 0;
546                 if (size)
547                         *size = header->image_size;
548         } else {
549                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
550                 if (ret < 0)
551                         goto done;
552
553                 dev->cur_snap = header->total_snaps - ret;
554                 dev->read_only = 1;
555         }
556
557         ret = 0;
558 done:
559         up_write(&header->snap_rwsem);
560         return ret;
561 }
562
563 static void rbd_header_free(struct rbd_image_header *header)
564 {
565         kfree(header->snapc);
566         kfree(header->snap_names);
567         kfree(header->snap_sizes);
568 }
569
570 /*
571  * get the actual striped segment name, offset and length
572  */
573 static u64 rbd_get_segment(struct rbd_image_header *header,
574                            const char *block_name,
575                            u64 ofs, u64 len,
576                            char *seg_name, u64 *segofs)
577 {
578         u64 seg = ofs >> header->obj_order;
579
580         if (seg_name)
581                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
582                          "%s.%012llx", block_name, seg);
583
584         ofs = ofs & ((1 << header->obj_order) - 1);
585         len = min_t(u64, len, (1 << header->obj_order) - ofs);
586
587         if (segofs)
588                 *segofs = ofs;
589
590         return len;
591 }
592
593 /*
594  * bio helpers
595  */
596
597 static void bio_chain_put(struct bio *chain)
598 {
599         struct bio *tmp;
600
601         while (chain) {
602                 tmp = chain;
603                 chain = chain->bi_next;
604                 bio_put(tmp);
605         }
606 }
607
608 /*
609  * zeros a bio chain, starting at specific offset
610  */
611 static void zero_bio_chain(struct bio *chain, int start_ofs)
612 {
613         struct bio_vec *bv;
614         unsigned long flags;
615         void *buf;
616         int i;
617         int pos = 0;
618
619         while (chain) {
620                 bio_for_each_segment(bv, chain, i) {
621                         if (pos + bv->bv_len > start_ofs) {
622                                 int remainder = max(start_ofs - pos, 0);
623                                 buf = bvec_kmap_irq(bv, &flags);
624                                 memset(buf + remainder, 0,
625                                        bv->bv_len - remainder);
626                                 bvec_kunmap_irq(buf, &flags);
627                         }
628                         pos += bv->bv_len;
629                 }
630
631                 chain = chain->bi_next;
632         }
633 }
634
635 /*
636  * bio_chain_clone - clone a chain of bios up to a certain length.
637  * might return a bio_pair that will need to be released.
638  */
639 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
640                                    struct bio_pair **bp,
641                                    int len, gfp_t gfpmask)
642 {
643         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
644         int total = 0;
645
646         if (*bp) {
647                 bio_pair_release(*bp);
648                 *bp = NULL;
649         }
650
651         while (old_chain && (total < len)) {
652                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
653                 if (!tmp)
654                         goto err_out;
655
656                 if (total + old_chain->bi_size > len) {
657                         struct bio_pair *bp;
658
659                         /*
660                          * this split can only happen with a single paged bio,
661                          * split_bio will BUG_ON if this is not the case
662                          */
663                         dout("bio_chain_clone split! total=%d remaining=%d"
664                              "bi_size=%d\n",
665                              (int)total, (int)len-total,
666                              (int)old_chain->bi_size);
667
668                         /* split the bio. We'll release it either in the next
669                            call, or it will have to be released outside */
670                         bp = bio_split(old_chain, (len - total) / 512ULL);
671                         if (!bp)
672                                 goto err_out;
673
674                         __bio_clone(tmp, &bp->bio1);
675
676                         *next = &bp->bio2;
677                 } else {
678                         __bio_clone(tmp, old_chain);
679                         *next = old_chain->bi_next;
680                 }
681
682                 tmp->bi_bdev = NULL;
683                 gfpmask &= ~__GFP_WAIT;
684                 tmp->bi_next = NULL;
685
686                 if (!new_chain) {
687                         new_chain = tail = tmp;
688                 } else {
689                         tail->bi_next = tmp;
690                         tail = tmp;
691                 }
692                 old_chain = old_chain->bi_next;
693
694                 total += tmp->bi_size;
695         }
696
697         BUG_ON(total < len);
698
699         if (tail)
700                 tail->bi_next = NULL;
701
702         *old = old_chain;
703
704         return new_chain;
705
706 err_out:
707         dout("bio_chain_clone with err\n");
708         bio_chain_put(new_chain);
709         return NULL;
710 }
711
712 /*
713  * helpers for osd request op vectors.
714  */
715 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
716                             int num_ops,
717                             int opcode,
718                             u32 payload_len)
719 {
720         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
721                        GFP_NOIO);
722         if (!*ops)
723                 return -ENOMEM;
724         (*ops)[0].op = opcode;
725         /*
726          * op extent offset and length will be set later on
727          * in calc_raw_layout()
728          */
729         (*ops)[0].payload_len = payload_len;
730         return 0;
731 }
732
733 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
734 {
735         kfree(ops);
736 }
737
738 /*
739  * Send ceph osd request
740  */
741 static int rbd_do_request(struct request *rq,
742                           struct rbd_device *dev,
743                           struct ceph_snap_context *snapc,
744                           u64 snapid,
745                           const char *obj, u64 ofs, u64 len,
746                           struct bio *bio,
747                           struct page **pages,
748                           int num_pages,
749                           int flags,
750                           struct ceph_osd_req_op *ops,
751                           int num_reply,
752                           void (*rbd_cb)(struct ceph_osd_request *req,
753                                          struct ceph_msg *msg),
754                           struct ceph_osd_request **linger_req,
755                           u64 *ver)
756 {
757         struct ceph_osd_request *req;
758         struct ceph_file_layout *layout;
759         int ret;
760         u64 bno;
761         struct timespec mtime = CURRENT_TIME;
762         struct rbd_request *req_data;
763         struct ceph_osd_request_head *reqhead;
764         struct rbd_image_header *header = &dev->header;
765
766         ret = -ENOMEM;
767         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768         if (!req_data)
769                 goto done;
770
771         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
772
773         down_read(&header->snap_rwsem);
774
775         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
776                                       snapc,
777                                       ops,
778                                       false,
779                                       GFP_NOIO, pages, bio);
780         if (!req) {
781                 up_read(&header->snap_rwsem);
782                 ret = -ENOMEM;
783                 goto done_pages;
784         }
785
786         req->r_callback = rbd_cb;
787
788         req_data->rq = rq;
789         req_data->bio = bio;
790         req_data->pages = pages;
791         req_data->len = len;
792
793         req->r_priv = req_data;
794
795         reqhead = req->r_request->front.iov_base;
796         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
797
798         strncpy(req->r_oid, obj, sizeof(req->r_oid));
799         req->r_oid_len = strlen(req->r_oid);
800
801         layout = &req->r_file_layout;
802         memset(layout, 0, sizeof(*layout));
803         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
804         layout->fl_stripe_count = cpu_to_le32(1);
805         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
806         layout->fl_pg_preferred = cpu_to_le32(-1);
807         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
808         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
809                              ofs, &len, &bno, req, ops);
810
811         ceph_osdc_build_request(req, ofs, &len,
812                                 ops,
813                                 snapc,
814                                 &mtime,
815                                 req->r_oid, req->r_oid_len);
816         up_read(&header->snap_rwsem);
817
818         if (linger_req) {
819                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
820                 *linger_req = req;
821         }
822
823         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
824         if (ret < 0)
825                 goto done_err;
826
827         if (!rbd_cb) {
828                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829                 if (ver)
830                         *ver = le64_to_cpu(req->r_reassert_version.version);
831                 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
832                 ceph_osdc_put_request(req);
833         }
834         return ret;
835
836 done_err:
837         bio_chain_put(req_data->bio);
838         ceph_osdc_put_request(req);
839 done_pages:
840         kfree(req_data);
841 done:
842         if (rq)
843                 blk_end_request(rq, ret, len);
844         return ret;
845 }
846
847 /*
848  * Ceph osd op callback
849  */
850 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
851 {
852         struct rbd_request *req_data = req->r_priv;
853         struct ceph_osd_reply_head *replyhead;
854         struct ceph_osd_op *op;
855         __s32 rc;
856         u64 bytes;
857         int read_op;
858
859         /* parse reply */
860         replyhead = msg->front.iov_base;
861         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
862         op = (void *)(replyhead + 1);
863         rc = le32_to_cpu(replyhead->result);
864         bytes = le64_to_cpu(op->extent.length);
865         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
866
867         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
868
869         if (rc == -ENOENT && read_op) {
870                 zero_bio_chain(req_data->bio, 0);
871                 rc = 0;
872         } else if (rc == 0 && read_op && bytes < req_data->len) {
873                 zero_bio_chain(req_data->bio, bytes);
874                 bytes = req_data->len;
875         }
876
877         blk_end_request(req_data->rq, rc, bytes);
878
879         if (req_data->bio)
880                 bio_chain_put(req_data->bio);
881
882         ceph_osdc_put_request(req);
883         kfree(req_data);
884 }
885
886 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
887 {
888         ceph_osdc_put_request(req);
889 }
890
891 /*
892  * Do a synchronous ceph osd operation
893  */
894 static int rbd_req_sync_op(struct rbd_device *dev,
895                            struct ceph_snap_context *snapc,
896                            u64 snapid,
897                            int opcode,
898                            int flags,
899                            struct ceph_osd_req_op *orig_ops,
900                            int num_reply,
901                            const char *obj,
902                            u64 ofs, u64 len,
903                            char *buf,
904                            struct ceph_osd_request **linger_req,
905                            u64 *ver)
906 {
907         int ret;
908         struct page **pages;
909         int num_pages;
910         struct ceph_osd_req_op *ops = orig_ops;
911         u32 payload_len;
912
913         num_pages = calc_pages_for(ofs , len);
914         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
915         if (IS_ERR(pages))
916                 return PTR_ERR(pages);
917
918         if (!orig_ops) {
919                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
920                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
921                 if (ret < 0)
922                         goto done;
923
924                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
925                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
926                         if (ret < 0)
927                                 goto done_ops;
928                 }
929         }
930
931         ret = rbd_do_request(NULL, dev, snapc, snapid,
932                           obj, ofs, len, NULL,
933                           pages, num_pages,
934                           flags,
935                           ops,
936                           2,
937                           NULL,
938                           linger_req, ver);
939         if (ret < 0)
940                 goto done_ops;
941
942         if ((flags & CEPH_OSD_FLAG_READ) && buf)
943                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
944
945 done_ops:
946         if (!orig_ops)
947                 rbd_destroy_ops(ops);
948 done:
949         ceph_release_page_vector(pages, num_pages);
950         return ret;
951 }
952
953 /*
954  * Do an asynchronous ceph osd operation
955  */
956 static int rbd_do_op(struct request *rq,
957                      struct rbd_device *rbd_dev ,
958                      struct ceph_snap_context *snapc,
959                      u64 snapid,
960                      int opcode, int flags, int num_reply,
961                      u64 ofs, u64 len,
962                      struct bio *bio)
963 {
964         char *seg_name;
965         u64 seg_ofs;
966         u64 seg_len;
967         int ret;
968         struct ceph_osd_req_op *ops;
969         u32 payload_len;
970
971         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
972         if (!seg_name)
973                 return -ENOMEM;
974
975         seg_len = rbd_get_segment(&rbd_dev->header,
976                                   rbd_dev->header.block_name,
977                                   ofs, len,
978                                   seg_name, &seg_ofs);
979
980         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
981
982         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
983         if (ret < 0)
984                 goto done;
985
986         /* we've taken care of segment sizes earlier when we
987            cloned the bios. We should never have a segment
988            truncated at this point */
989         BUG_ON(seg_len < len);
990
991         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
992                              seg_name, seg_ofs, seg_len,
993                              bio,
994                              NULL, 0,
995                              flags,
996                              ops,
997                              num_reply,
998                              rbd_req_cb, 0, NULL);
999
1000         rbd_destroy_ops(ops);
1001 done:
1002         kfree(seg_name);
1003         return ret;
1004 }
1005
1006 /*
1007  * Request async osd write
1008  */
1009 static int rbd_req_write(struct request *rq,
1010                          struct rbd_device *rbd_dev,
1011                          struct ceph_snap_context *snapc,
1012                          u64 ofs, u64 len,
1013                          struct bio *bio)
1014 {
1015         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1016                          CEPH_OSD_OP_WRITE,
1017                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1018                          2,
1019                          ofs, len, bio);
1020 }
1021
1022 /*
1023  * Request async osd read
1024  */
1025 static int rbd_req_read(struct request *rq,
1026                          struct rbd_device *rbd_dev,
1027                          u64 snapid,
1028                          u64 ofs, u64 len,
1029                          struct bio *bio)
1030 {
1031         return rbd_do_op(rq, rbd_dev, NULL,
1032                          (snapid ? snapid : CEPH_NOSNAP),
1033                          CEPH_OSD_OP_READ,
1034                          CEPH_OSD_FLAG_READ,
1035                          2,
1036                          ofs, len, bio);
1037 }
1038
1039 /*
1040  * Request sync osd read
1041  */
1042 static int rbd_req_sync_read(struct rbd_device *dev,
1043                           struct ceph_snap_context *snapc,
1044                           u64 snapid,
1045                           const char *obj,
1046                           u64 ofs, u64 len,
1047                           char *buf,
1048                           u64 *ver)
1049 {
1050         return rbd_req_sync_op(dev, NULL,
1051                                (snapid ? snapid : CEPH_NOSNAP),
1052                                CEPH_OSD_OP_READ,
1053                                CEPH_OSD_FLAG_READ,
1054                                NULL,
1055                                1, obj, ofs, len, buf, NULL, ver);
1056 }
1057
1058 /*
1059  * Request sync osd watch
1060  */
1061 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1062                                    u64 ver,
1063                                    u64 notify_id,
1064                                    const char *obj)
1065 {
1066         struct ceph_osd_req_op *ops;
1067         struct page **pages = NULL;
1068         int ret;
1069
1070         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1071         if (ret < 0)
1072                 return ret;
1073
1074         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1075         ops[0].watch.cookie = notify_id;
1076         ops[0].watch.flag = 0;
1077
1078         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1079                           obj, 0, 0, NULL,
1080                           pages, 0,
1081                           CEPH_OSD_FLAG_READ,
1082                           ops,
1083                           1,
1084                           rbd_simple_req_cb, 0, NULL);
1085
1086         rbd_destroy_ops(ops);
1087         return ret;
1088 }
1089
1090 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1091 {
1092         struct rbd_device *dev = (struct rbd_device *)data;
1093         if (!dev)
1094                 return;
1095
1096         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1097                 notify_id, (int)opcode);
1098         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1099         __rbd_update_snaps(dev);
1100         mutex_unlock(&ctl_mutex);
1101
1102         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1103 }
1104
1105 /*
1106  * Request sync osd watch
1107  */
1108 static int rbd_req_sync_watch(struct rbd_device *dev,
1109                               const char *obj,
1110                               u64 ver)
1111 {
1112         struct ceph_osd_req_op *ops;
1113         struct ceph_osd_client *osdc = &dev->client->osdc;
1114
1115         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1116         if (ret < 0)
1117                 return ret;
1118
1119         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1120                                      (void *)dev, &dev->watch_event);
1121         if (ret < 0)
1122                 goto fail;
1123
1124         ops[0].watch.ver = cpu_to_le64(ver);
1125         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1126         ops[0].watch.flag = 1;
1127
1128         ret = rbd_req_sync_op(dev, NULL,
1129                               CEPH_NOSNAP,
1130                               0,
1131                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1132                               ops,
1133                               1, obj, 0, 0, NULL,
1134                               &dev->watch_request, NULL);
1135
1136         if (ret < 0)
1137                 goto fail_event;
1138
1139         rbd_destroy_ops(ops);
1140         return 0;
1141
1142 fail_event:
1143         ceph_osdc_cancel_event(dev->watch_event);
1144         dev->watch_event = NULL;
1145 fail:
1146         rbd_destroy_ops(ops);
1147         return ret;
1148 }
1149
1150 struct rbd_notify_info {
1151         struct rbd_device *dev;
1152 };
1153
1154 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1155 {
1156         struct rbd_device *dev = (struct rbd_device *)data;
1157         if (!dev)
1158                 return;
1159
1160         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1161                 notify_id, (int)opcode);
1162 }
1163
1164 /*
1165  * Request sync osd notify
1166  */
1167 static int rbd_req_sync_notify(struct rbd_device *dev,
1168                           const char *obj)
1169 {
1170         struct ceph_osd_req_op *ops;
1171         struct ceph_osd_client *osdc = &dev->client->osdc;
1172         struct ceph_osd_event *event;
1173         struct rbd_notify_info info;
1174         int payload_len = sizeof(u32) + sizeof(u32);
1175         int ret;
1176
1177         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1178         if (ret < 0)
1179                 return ret;
1180
1181         info.dev = dev;
1182
1183         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1184                                      (void *)&info, &event);
1185         if (ret < 0)
1186                 goto fail;
1187
1188         ops[0].watch.ver = 1;
1189         ops[0].watch.flag = 1;
1190         ops[0].watch.cookie = event->cookie;
1191         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1192         ops[0].watch.timeout = 12;
1193
1194         ret = rbd_req_sync_op(dev, NULL,
1195                                CEPH_NOSNAP,
1196                                0,
1197                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1198                                ops,
1199                                1, obj, 0, 0, NULL, NULL, NULL);
1200         if (ret < 0)
1201                 goto fail_event;
1202
1203         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1204         dout("ceph_osdc_wait_event returned %d\n", ret);
1205         rbd_destroy_ops(ops);
1206         return 0;
1207
1208 fail_event:
1209         ceph_osdc_cancel_event(event);
1210 fail:
1211         rbd_destroy_ops(ops);
1212         return ret;
1213 }
1214
1215 /*
1216  * Request sync osd rollback
1217  */
1218 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1219                                      u64 snapid,
1220                                      const char *obj)
1221 {
1222         struct ceph_osd_req_op *ops;
1223         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1224         if (ret < 0)
1225                 return ret;
1226
1227         ops[0].snap.snapid = snapid;
1228
1229         ret = rbd_req_sync_op(dev, NULL,
1230                                CEPH_NOSNAP,
1231                                0,
1232                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1233                                ops,
1234                                1, obj, 0, 0, NULL, NULL, NULL);
1235
1236         rbd_destroy_ops(ops);
1237
1238         return ret;
1239 }
1240
1241 /*
1242  * Request sync osd read
1243  */
1244 static int rbd_req_sync_exec(struct rbd_device *dev,
1245                              const char *obj,
1246                              const char *cls,
1247                              const char *method,
1248                              const char *data,
1249                              int len,
1250                              u64 *ver)
1251 {
1252         struct ceph_osd_req_op *ops;
1253         int cls_len = strlen(cls);
1254         int method_len = strlen(method);
1255         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1256                                     cls_len + method_len + len);
1257         if (ret < 0)
1258                 return ret;
1259
1260         ops[0].cls.class_name = cls;
1261         ops[0].cls.class_len = (__u8)cls_len;
1262         ops[0].cls.method_name = method;
1263         ops[0].cls.method_len = (__u8)method_len;
1264         ops[0].cls.argc = 0;
1265         ops[0].cls.indata = data;
1266         ops[0].cls.indata_len = len;
1267
1268         ret = rbd_req_sync_op(dev, NULL,
1269                                CEPH_NOSNAP,
1270                                0,
1271                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1272                                ops,
1273                                1, obj, 0, 0, NULL, NULL, ver);
1274
1275         rbd_destroy_ops(ops);
1276
1277         dout("cls_exec returned %d\n", ret);
1278         return ret;
1279 }
1280
1281 /*
1282  * block device queue callback
1283  */
1284 static void rbd_rq_fn(struct request_queue *q)
1285 {
1286         struct rbd_device *rbd_dev = q->queuedata;
1287         struct request *rq;
1288         struct bio_pair *bp = NULL;
1289
1290         rq = blk_fetch_request(q);
1291
1292         while (1) {
1293                 struct bio *bio;
1294                 struct bio *rq_bio, *next_bio = NULL;
1295                 bool do_write;
1296                 int size, op_size = 0;
1297                 u64 ofs;
1298
1299                 /* peek at request from block layer */
1300                 if (!rq)
1301                         break;
1302
1303                 dout("fetched request\n");
1304
1305                 /* filter out block requests we don't understand */
1306                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1307                         __blk_end_request_all(rq, 0);
1308                         goto next;
1309                 }
1310
1311                 /* deduce our operation (read, write) */
1312                 do_write = (rq_data_dir(rq) == WRITE);
1313
1314                 size = blk_rq_bytes(rq);
1315                 ofs = blk_rq_pos(rq) * 512ULL;
1316                 rq_bio = rq->bio;
1317                 if (do_write && rbd_dev->read_only) {
1318                         __blk_end_request_all(rq, -EROFS);
1319                         goto next;
1320                 }
1321
1322                 spin_unlock_irq(q->queue_lock);
1323
1324                 dout("%s 0x%x bytes at 0x%llx\n",
1325                      do_write ? "write" : "read",
1326                      size, blk_rq_pos(rq) * 512ULL);
1327
1328                 do {
1329                         /* a bio clone to be passed down to OSD req */
1330                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1331                         op_size = rbd_get_segment(&rbd_dev->header,
1332                                                   rbd_dev->header.block_name,
1333                                                   ofs, size,
1334                                                   NULL, NULL);
1335                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1336                                               op_size, GFP_ATOMIC);
1337                         if (!bio) {
1338                                 spin_lock_irq(q->queue_lock);
1339                                 __blk_end_request_all(rq, -ENOMEM);
1340                                 goto next;
1341                         }
1342
1343                         /* init OSD command: write or read */
1344                         if (do_write)
1345                                 rbd_req_write(rq, rbd_dev,
1346                                               rbd_dev->header.snapc,
1347                                               ofs,
1348                                               op_size, bio);
1349                         else
1350                                 rbd_req_read(rq, rbd_dev,
1351                                              cur_snap_id(rbd_dev),
1352                                              ofs,
1353                                              op_size, bio);
1354
1355                         size -= op_size;
1356                         ofs += op_size;
1357
1358                         rq_bio = next_bio;
1359                 } while (size > 0);
1360
1361                 if (bp)
1362                         bio_pair_release(bp);
1363
1364                 spin_lock_irq(q->queue_lock);
1365 next:
1366                 rq = blk_fetch_request(q);
1367         }
1368 }
1369
1370 /*
1371  * a queue callback. Makes sure that we don't create a bio that spans across
1372  * multiple osd objects. One exception would be with a single page bios,
1373  * which we handle later at bio_chain_clone
1374  */
1375 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1376                           struct bio_vec *bvec)
1377 {
1378         struct rbd_device *rbd_dev = q->queuedata;
1379         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1380         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1381         unsigned int bio_sectors = bmd->bi_size >> 9;
1382         int max;
1383
1384         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1385                                  + bio_sectors)) << 9;
1386         if (max < 0)
1387                 max = 0; /* bio_add cannot handle a negative return */
1388         if (max <= bvec->bv_len && bio_sectors == 0)
1389                 return bvec->bv_len;
1390         return max;
1391 }
1392
1393 static void rbd_free_disk(struct rbd_device *rbd_dev)
1394 {
1395         struct gendisk *disk = rbd_dev->disk;
1396
1397         if (!disk)
1398                 return;
1399
1400         rbd_header_free(&rbd_dev->header);
1401
1402         if (disk->flags & GENHD_FL_UP)
1403                 del_gendisk(disk);
1404         if (disk->queue)
1405                 blk_cleanup_queue(disk->queue);
1406         put_disk(disk);
1407 }
1408
1409 /*
1410  * reload the ondisk the header 
1411  */
1412 static int rbd_read_header(struct rbd_device *rbd_dev,
1413                            struct rbd_image_header *header)
1414 {
1415         ssize_t rc;
1416         struct rbd_image_header_ondisk *dh;
1417         int snap_count = 0;
1418         u64 snap_names_len = 0;
1419         u64 ver;
1420
1421         while (1) {
1422                 int len = sizeof(*dh) +
1423                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1424                           snap_names_len;
1425
1426                 rc = -ENOMEM;
1427                 dh = kmalloc(len, GFP_KERNEL);
1428                 if (!dh)
1429                         return -ENOMEM;
1430
1431                 rc = rbd_req_sync_read(rbd_dev,
1432                                        NULL, CEPH_NOSNAP,
1433                                        rbd_dev->obj_md_name,
1434                                        0, len,
1435                                        (char *)dh, &ver);
1436                 if (rc < 0)
1437                         goto out_dh;
1438
1439                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1440                 if (rc < 0)
1441                         goto out_dh;
1442
1443                 if (snap_count != header->total_snaps) {
1444                         snap_count = header->total_snaps;
1445                         snap_names_len = header->snap_names_len;
1446                         rbd_header_free(header);
1447                         kfree(dh);
1448                         continue;
1449                 }
1450                 break;
1451         }
1452         header->obj_version = ver;
1453
1454 out_dh:
1455         kfree(dh);
1456         return rc;
1457 }
1458
1459 /*
1460  * create a snapshot
1461  */
1462 static int rbd_header_add_snap(struct rbd_device *dev,
1463                                const char *snap_name,
1464                                gfp_t gfp_flags)
1465 {
1466         int name_len = strlen(snap_name);
1467         u64 new_snapid;
1468         int ret;
1469         void *data, *data_start, *data_end;
1470         u64 ver;
1471
1472         /* we should create a snapshot only if we're pointing at the head */
1473         if (dev->cur_snap)
1474                 return -EINVAL;
1475
1476         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1477                                       &new_snapid);
1478         dout("created snapid=%lld\n", new_snapid);
1479         if (ret < 0)
1480                 return ret;
1481
1482         data = kmalloc(name_len + 16, gfp_flags);
1483         if (!data)
1484                 return -ENOMEM;
1485
1486         data_start = data;
1487         data_end = data + name_len + 16;
1488
1489         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1490         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1491
1492         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1493                                 data_start, data - data_start, &ver);
1494
1495         kfree(data_start);
1496
1497         if (ret < 0)
1498                 return ret;
1499
1500         dev->header.snapc->seq =  new_snapid;
1501
1502         return 0;
1503 bad:
1504         return -ERANGE;
1505 }
1506
1507 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1508 {
1509         struct rbd_snap *snap;
1510
1511         while (!list_empty(&rbd_dev->snaps)) {
1512                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1513                 __rbd_remove_snap_dev(rbd_dev, snap);
1514         }
1515 }
1516
1517 /*
1518  * only read the first part of the ondisk header, without the snaps info
1519  */
1520 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1521 {
1522         int ret;
1523         struct rbd_image_header h;
1524         u64 snap_seq;
1525         int follow_seq = 0;
1526
1527         ret = rbd_read_header(rbd_dev, &h);
1528         if (ret < 0)
1529                 return ret;
1530
1531         down_write(&rbd_dev->header.snap_rwsem);
1532
1533         snap_seq = rbd_dev->header.snapc->seq;
1534         if (rbd_dev->header.total_snaps &&
1535             rbd_dev->header.snapc->snaps[0] == snap_seq)
1536                 /* pointing at the head, will need to follow that
1537                    if head moves */
1538                 follow_seq = 1;
1539
1540         kfree(rbd_dev->header.snapc);
1541         kfree(rbd_dev->header.snap_names);
1542         kfree(rbd_dev->header.snap_sizes);
1543
1544         rbd_dev->header.total_snaps = h.total_snaps;
1545         rbd_dev->header.snapc = h.snapc;
1546         rbd_dev->header.snap_names = h.snap_names;
1547         rbd_dev->header.snap_names_len = h.snap_names_len;
1548         rbd_dev->header.snap_sizes = h.snap_sizes;
1549         if (follow_seq)
1550                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1551         else
1552                 rbd_dev->header.snapc->seq = snap_seq;
1553
1554         ret = __rbd_init_snaps_header(rbd_dev);
1555
1556         up_write(&rbd_dev->header.snap_rwsem);
1557
1558         return ret;
1559 }
1560
1561 static int rbd_init_disk(struct rbd_device *rbd_dev)
1562 {
1563         struct gendisk *disk;
1564         struct request_queue *q;
1565         int rc;
1566         u64 total_size = 0;
1567
1568         /* contact OSD, request size info about the object being mapped */
1569         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1570         if (rc)
1571                 return rc;
1572
1573         /* no need to lock here, as rbd_dev is not registered yet */
1574         rc = __rbd_init_snaps_header(rbd_dev);
1575         if (rc)
1576                 return rc;
1577
1578         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1579         if (rc)
1580                 return rc;
1581
1582         /* create gendisk info */
1583         rc = -ENOMEM;
1584         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1585         if (!disk)
1586                 goto out;
1587
1588         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1589         disk->major = rbd_dev->major;
1590         disk->first_minor = 0;
1591         disk->fops = &rbd_bd_ops;
1592         disk->private_data = rbd_dev;
1593
1594         /* init rq */
1595         rc = -ENOMEM;
1596         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1597         if (!q)
1598                 goto out_disk;
1599         blk_queue_merge_bvec(q, rbd_merge_bvec);
1600         disk->queue = q;
1601
1602         q->queuedata = rbd_dev;
1603
1604         rbd_dev->disk = disk;
1605         rbd_dev->q = q;
1606
1607         /* finally, announce the disk to the world */
1608         set_capacity(disk, total_size / 512ULL);
1609         add_disk(disk);
1610
1611         pr_info("%s: added with size 0x%llx\n",
1612                 disk->disk_name, (unsigned long long)total_size);
1613         return 0;
1614
1615 out_disk:
1616         put_disk(disk);
1617 out:
1618         return rc;
1619 }
1620
1621 /*
1622   sysfs
1623 */
1624
1625 static ssize_t rbd_size_show(struct device *dev,
1626                              struct device_attribute *attr, char *buf)
1627 {
1628         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1629
1630         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1631 }
1632
1633 static ssize_t rbd_major_show(struct device *dev,
1634                               struct device_attribute *attr, char *buf)
1635 {
1636         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1637
1638         return sprintf(buf, "%d\n", rbd_dev->major);
1639 }
1640
1641 static ssize_t rbd_client_id_show(struct device *dev,
1642                                   struct device_attribute *attr, char *buf)
1643 {
1644         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1645
1646         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1647 }
1648
1649 static ssize_t rbd_pool_show(struct device *dev,
1650                              struct device_attribute *attr, char *buf)
1651 {
1652         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1653
1654         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1655 }
1656
1657 static ssize_t rbd_name_show(struct device *dev,
1658                              struct device_attribute *attr, char *buf)
1659 {
1660         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1661
1662         return sprintf(buf, "%s\n", rbd_dev->obj);
1663 }
1664
1665 static ssize_t rbd_snap_show(struct device *dev,
1666                              struct device_attribute *attr,
1667                              char *buf)
1668 {
1669         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1670
1671         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1672 }
1673
1674 static ssize_t rbd_image_refresh(struct device *dev,
1675                                  struct device_attribute *attr,
1676                                  const char *buf,
1677                                  size_t size)
1678 {
1679         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1680         int rc;
1681         int ret = size;
1682
1683         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1684
1685         rc = __rbd_update_snaps(rbd_dev);
1686         if (rc < 0)
1687                 ret = rc;
1688
1689         mutex_unlock(&ctl_mutex);
1690         return ret;
1691 }
1692
1693 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1694 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1695 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1696 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1697 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1698 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1699 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1700 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1701 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1702
1703 static struct attribute *rbd_attrs[] = {
1704         &dev_attr_size.attr,
1705         &dev_attr_major.attr,
1706         &dev_attr_client_id.attr,
1707         &dev_attr_pool.attr,
1708         &dev_attr_name.attr,
1709         &dev_attr_current_snap.attr,
1710         &dev_attr_refresh.attr,
1711         &dev_attr_create_snap.attr,
1712         &dev_attr_rollback_snap.attr,
1713         NULL
1714 };
1715
1716 static struct attribute_group rbd_attr_group = {
1717         .attrs = rbd_attrs,
1718 };
1719
1720 static const struct attribute_group *rbd_attr_groups[] = {
1721         &rbd_attr_group,
1722         NULL
1723 };
1724
1725 static void rbd_sysfs_dev_release(struct device *dev)
1726 {
1727 }
1728
1729 static struct device_type rbd_device_type = {
1730         .name           = "rbd",
1731         .groups         = rbd_attr_groups,
1732         .release        = rbd_sysfs_dev_release,
1733 };
1734
1735
1736 /*
1737   sysfs - snapshots
1738 */
1739
1740 static ssize_t rbd_snap_size_show(struct device *dev,
1741                                   struct device_attribute *attr,
1742                                   char *buf)
1743 {
1744         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1745
1746         return sprintf(buf, "%lld\n", (long long)snap->size);
1747 }
1748
1749 static ssize_t rbd_snap_id_show(struct device *dev,
1750                                 struct device_attribute *attr,
1751                                 char *buf)
1752 {
1753         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1754
1755         return sprintf(buf, "%lld\n", (long long)snap->id);
1756 }
1757
1758 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1759 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1760
1761 static struct attribute *rbd_snap_attrs[] = {
1762         &dev_attr_snap_size.attr,
1763         &dev_attr_snap_id.attr,
1764         NULL,
1765 };
1766
1767 static struct attribute_group rbd_snap_attr_group = {
1768         .attrs = rbd_snap_attrs,
1769 };
1770
1771 static void rbd_snap_dev_release(struct device *dev)
1772 {
1773         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1774         kfree(snap->name);
1775         kfree(snap);
1776 }
1777
1778 static const struct attribute_group *rbd_snap_attr_groups[] = {
1779         &rbd_snap_attr_group,
1780         NULL
1781 };
1782
1783 static struct device_type rbd_snap_device_type = {
1784         .groups         = rbd_snap_attr_groups,
1785         .release        = rbd_snap_dev_release,
1786 };
1787
1788 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1789                                   struct rbd_snap *snap)
1790 {
1791         list_del(&snap->node);
1792         device_unregister(&snap->dev);
1793 }
1794
1795 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1796                                   struct rbd_snap *snap,
1797                                   struct device *parent)
1798 {
1799         struct device *dev = &snap->dev;
1800         int ret;
1801
1802         dev->type = &rbd_snap_device_type;
1803         dev->parent = parent;
1804         dev->release = rbd_snap_dev_release;
1805         dev_set_name(dev, "snap_%s", snap->name);
1806         ret = device_register(dev);
1807
1808         return ret;
1809 }
1810
1811 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1812                               int i, const char *name,
1813                               struct rbd_snap **snapp)
1814 {
1815         int ret;
1816         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1817         if (!snap)
1818                 return -ENOMEM;
1819         snap->name = kstrdup(name, GFP_KERNEL);
1820         snap->size = rbd_dev->header.snap_sizes[i];
1821         snap->id = rbd_dev->header.snapc->snaps[i];
1822         if (device_is_registered(&rbd_dev->dev)) {
1823                 ret = rbd_register_snap_dev(rbd_dev, snap,
1824                                              &rbd_dev->dev);
1825                 if (ret < 0)
1826                         goto err;
1827         }
1828         *snapp = snap;
1829         return 0;
1830 err:
1831         kfree(snap->name);
1832         kfree(snap);
1833         return ret;
1834 }
1835
1836 /*
1837  * search for the previous snap in a null delimited string list
1838  */
1839 const char *rbd_prev_snap_name(const char *name, const char *start)
1840 {
1841         if (name < start + 2)
1842                 return NULL;
1843
1844         name -= 2;
1845         while (*name) {
1846                 if (name == start)
1847                         return start;
1848                 name--;
1849         }
1850         return name + 1;
1851 }
1852
1853 /*
1854  * compare the old list of snapshots that we have to what's in the header
1855  * and update it accordingly. Note that the header holds the snapshots
1856  * in a reverse order (from newest to oldest) and we need to go from
1857  * older to new so that we don't get a duplicate snap name when
1858  * doing the process (e.g., removed snapshot and recreated a new
1859  * one with the same name.
1860  */
1861 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1862 {
1863         const char *name, *first_name;
1864         int i = rbd_dev->header.total_snaps;
1865         struct rbd_snap *snap, *old_snap = NULL;
1866         int ret;
1867         struct list_head *p, *n;
1868
1869         first_name = rbd_dev->header.snap_names;
1870         name = first_name + rbd_dev->header.snap_names_len;
1871
1872         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1873                 u64 cur_id;
1874
1875                 old_snap = list_entry(p, struct rbd_snap, node);
1876
1877                 if (i)
1878                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
1879
1880                 if (!i || old_snap->id < cur_id) {
1881                         /* old_snap->id was skipped, thus was removed */
1882                         __rbd_remove_snap_dev(rbd_dev, old_snap);
1883                         continue;
1884                 }
1885                 if (old_snap->id == cur_id) {
1886                         /* we have this snapshot already */
1887                         i--;
1888                         name = rbd_prev_snap_name(name, first_name);
1889                         continue;
1890                 }
1891                 for (; i > 0;
1892                      i--, name = rbd_prev_snap_name(name, first_name)) {
1893                         if (!name) {
1894                                 WARN_ON(1);
1895                                 return -EINVAL;
1896                         }
1897                         cur_id = rbd_dev->header.snapc->snaps[i];
1898                         /* snapshot removal? handle it above */
1899                         if (cur_id >= old_snap->id)
1900                                 break;
1901                         /* a new snapshot */
1902                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1903                         if (ret < 0)
1904                                 return ret;
1905
1906                         /* note that we add it backward so using n and not p */
1907                         list_add(&snap->node, n);
1908                         p = &snap->node;
1909                 }
1910         }
1911         /* we're done going over the old snap list, just add what's left */
1912         for (; i > 0; i--) {
1913                 name = rbd_prev_snap_name(name, first_name);
1914                 if (!name) {
1915                         WARN_ON(1);
1916                         return -EINVAL;
1917                 }
1918                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1919                 if (ret < 0)
1920                         return ret;
1921                 list_add(&snap->node, &rbd_dev->snaps);
1922         }
1923
1924         return 0;
1925 }
1926
1927
1928 static void rbd_root_dev_release(struct device *dev)
1929 {
1930 }
1931
1932 static struct device rbd_root_dev = {
1933         .init_name =    "rbd",
1934         .release =      rbd_root_dev_release,
1935 };
1936
1937 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1938 {
1939         int ret = -ENOMEM;
1940         struct device *dev;
1941         struct rbd_snap *snap;
1942
1943         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1944         dev = &rbd_dev->dev;
1945
1946         dev->bus = &rbd_bus_type;
1947         dev->type = &rbd_device_type;
1948         dev->parent = &rbd_root_dev;
1949         dev->release = rbd_dev_release;
1950         dev_set_name(dev, "%d", rbd_dev->id);
1951         ret = device_register(dev);
1952         if (ret < 0)
1953                 goto done_free;
1954
1955         list_for_each_entry(snap, &rbd_dev->snaps, node) {
1956                 ret = rbd_register_snap_dev(rbd_dev, snap,
1957                                              &rbd_dev->dev);
1958                 if (ret < 0)
1959                         break;
1960         }
1961
1962         mutex_unlock(&ctl_mutex);
1963         return 0;
1964 done_free:
1965         mutex_unlock(&ctl_mutex);
1966         return ret;
1967 }
1968
1969 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1970 {
1971         device_unregister(&rbd_dev->dev);
1972 }
1973
1974 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
1975 {
1976         int ret, rc;
1977
1978         do {
1979                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
1980                                          rbd_dev->header.obj_version);
1981                 if (ret == -ERANGE) {
1982                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1983                         rc = __rbd_update_snaps(rbd_dev);
1984                         mutex_unlock(&ctl_mutex);
1985                         if (rc < 0)
1986                                 return rc;
1987                 }
1988         } while (ret == -ERANGE);
1989
1990         return ret;
1991 }
1992
1993 static ssize_t rbd_add(struct bus_type *bus,
1994                        const char *buf,
1995                        size_t count)
1996 {
1997         struct ceph_osd_client *osdc;
1998         struct rbd_device *rbd_dev;
1999         ssize_t rc = -ENOMEM;
2000         int irc, new_id = 0;
2001         struct list_head *tmp;
2002         char *mon_dev_name;
2003         char *options;
2004
2005         if (!try_module_get(THIS_MODULE))
2006                 return -ENODEV;
2007
2008         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2009         if (!mon_dev_name)
2010                 goto err_out_mod;
2011
2012         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2013         if (!options)
2014                 goto err_mon_dev;
2015
2016         /* new rbd_device object */
2017         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2018         if (!rbd_dev)
2019                 goto err_out_opt;
2020
2021         /* static rbd_device initialization */
2022         spin_lock_init(&rbd_dev->lock);
2023         INIT_LIST_HEAD(&rbd_dev->node);
2024         INIT_LIST_HEAD(&rbd_dev->snaps);
2025
2026         /* generate unique id: find highest unique id, add one */
2027         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2028
2029         list_for_each(tmp, &rbd_dev_list) {
2030                 struct rbd_device *rbd_dev;
2031
2032                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2033                 if (rbd_dev->id >= new_id)
2034                         new_id = rbd_dev->id + 1;
2035         }
2036
2037         rbd_dev->id = new_id;
2038
2039         /* add to global list */
2040         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2041
2042         /* parse add command */
2043         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2044                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2045                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2046                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2047                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2048                    mon_dev_name, options, rbd_dev->pool_name,
2049                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2050                 rc = -EINVAL;
2051                 goto err_out_slot;
2052         }
2053
2054         if (rbd_dev->snap_name[0] == 0)
2055                 rbd_dev->snap_name[0] = '-';
2056
2057         rbd_dev->obj_len = strlen(rbd_dev->obj);
2058         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2059                  rbd_dev->obj, RBD_SUFFIX);
2060
2061         /* initialize rest of new object */
2062         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2063         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2064         if (rc < 0)
2065                 goto err_out_slot;
2066
2067         mutex_unlock(&ctl_mutex);
2068
2069         /* pick the pool */
2070         osdc = &rbd_dev->client->osdc;
2071         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2072         if (rc < 0)
2073                 goto err_out_client;
2074         rbd_dev->poolid = rc;
2075
2076         /* register our block device */
2077         irc = register_blkdev(0, rbd_dev->name);
2078         if (irc < 0) {
2079                 rc = irc;
2080                 goto err_out_client;
2081         }
2082         rbd_dev->major = irc;
2083
2084         rc = rbd_bus_add_dev(rbd_dev);
2085         if (rc)
2086                 goto err_out_blkdev;
2087
2088         /* set up and announce blkdev mapping */
2089         rc = rbd_init_disk(rbd_dev);
2090         if (rc)
2091                 goto err_out_bus;
2092
2093         rc = rbd_init_watch_dev(rbd_dev);
2094         if (rc)
2095                 goto err_out_bus;
2096
2097         return count;
2098
2099 err_out_bus:
2100         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2101         list_del_init(&rbd_dev->node);
2102         mutex_unlock(&ctl_mutex);
2103
2104         /* this will also clean up rest of rbd_dev stuff */
2105
2106         rbd_bus_del_dev(rbd_dev);
2107         kfree(options);
2108         kfree(mon_dev_name);
2109         return rc;
2110
2111 err_out_blkdev:
2112         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2113 err_out_client:
2114         rbd_put_client(rbd_dev);
2115         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2116 err_out_slot:
2117         list_del_init(&rbd_dev->node);
2118         mutex_unlock(&ctl_mutex);
2119
2120         kfree(rbd_dev);
2121 err_out_opt:
2122         kfree(options);
2123 err_mon_dev:
2124         kfree(mon_dev_name);
2125 err_out_mod:
2126         dout("Error adding device %s\n", buf);
2127         module_put(THIS_MODULE);
2128         return rc;
2129 }
2130
2131 static struct rbd_device *__rbd_get_dev(unsigned long id)
2132 {
2133         struct list_head *tmp;
2134         struct rbd_device *rbd_dev;
2135
2136         list_for_each(tmp, &rbd_dev_list) {
2137                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2138                 if (rbd_dev->id == id)
2139                         return rbd_dev;
2140         }
2141         return NULL;
2142 }
2143
2144 static void rbd_dev_release(struct device *dev)
2145 {
2146         struct rbd_device *rbd_dev =
2147                         container_of(dev, struct rbd_device, dev);
2148
2149         if (rbd_dev->watch_request)
2150                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2151                                                     rbd_dev->watch_request);
2152         if (rbd_dev->watch_event)
2153                 ceph_osdc_cancel_event(rbd_dev->watch_event);
2154
2155         rbd_put_client(rbd_dev);
2156
2157         /* clean up and free blkdev */
2158         rbd_free_disk(rbd_dev);
2159         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2160         kfree(rbd_dev);
2161
2162         /* release module ref */
2163         module_put(THIS_MODULE);
2164 }
2165
2166 static ssize_t rbd_remove(struct bus_type *bus,
2167                           const char *buf,
2168                           size_t count)
2169 {
2170         struct rbd_device *rbd_dev = NULL;
2171         int target_id, rc;
2172         unsigned long ul;
2173         int ret = count;
2174
2175         rc = strict_strtoul(buf, 10, &ul);
2176         if (rc)
2177                 return rc;
2178
2179         /* convert to int; abort if we lost anything in the conversion */
2180         target_id = (int) ul;
2181         if (target_id != ul)
2182                 return -EINVAL;
2183
2184         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2185
2186         rbd_dev = __rbd_get_dev(target_id);
2187         if (!rbd_dev) {
2188                 ret = -ENOENT;
2189                 goto done;
2190         }
2191
2192         list_del_init(&rbd_dev->node);
2193
2194         __rbd_remove_all_snaps(rbd_dev);
2195         rbd_bus_del_dev(rbd_dev);
2196
2197 done:
2198         mutex_unlock(&ctl_mutex);
2199         return ret;
2200 }
2201
2202 static ssize_t rbd_snap_add(struct device *dev,
2203                             struct device_attribute *attr,
2204                             const char *buf,
2205                             size_t count)
2206 {
2207         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2208         int ret;
2209         char *name = kmalloc(count + 1, GFP_KERNEL);
2210         if (!name)
2211                 return -ENOMEM;
2212
2213         snprintf(name, count, "%s", buf);
2214
2215         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2216
2217         ret = rbd_header_add_snap(rbd_dev,
2218                                   name, GFP_KERNEL);
2219         if (ret < 0)
2220                 goto err_unlock;
2221
2222         ret = __rbd_update_snaps(rbd_dev);
2223         if (ret < 0)
2224                 goto err_unlock;
2225
2226         /* shouldn't hold ctl_mutex when notifying.. notify might
2227            trigger a watch callback that would need to get that mutex */
2228         mutex_unlock(&ctl_mutex);
2229
2230         /* make a best effort, don't error if failed */
2231         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2232
2233         ret = count;
2234         kfree(name);
2235         return ret;
2236
2237 err_unlock:
2238         mutex_unlock(&ctl_mutex);
2239         kfree(name);
2240         return ret;
2241 }
2242
2243 static ssize_t rbd_snap_rollback(struct device *dev,
2244                                  struct device_attribute *attr,
2245                                  const char *buf,
2246                                  size_t count)
2247 {
2248         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2249         int ret;
2250         u64 snapid;
2251         u64 cur_ofs;
2252         char *seg_name = NULL;
2253         char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2254         ret = -ENOMEM;
2255         if (!snap_name)
2256                 return ret;
2257
2258         /* parse snaps add command */
2259         snprintf(snap_name, count, "%s", buf);
2260         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2261         if (!seg_name)
2262                 goto done;
2263
2264         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265
2266         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2267         if (ret < 0)
2268                 goto done_unlock;
2269
2270         dout("snapid=%lld\n", snapid);
2271
2272         cur_ofs = 0;
2273         while (cur_ofs < rbd_dev->header.image_size) {
2274                 cur_ofs += rbd_get_segment(&rbd_dev->header,
2275                                            rbd_dev->obj,
2276                                            cur_ofs, (u64)-1,
2277                                            seg_name, NULL);
2278                 dout("seg_name=%s\n", seg_name);
2279
2280                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2281                 if (ret < 0)
2282                         pr_warning("could not roll back obj %s err=%d\n",
2283                                    seg_name, ret);
2284         }
2285
2286         ret = __rbd_update_snaps(rbd_dev);
2287         if (ret < 0)
2288                 goto done_unlock;
2289
2290         ret = count;
2291
2292 done_unlock:
2293         mutex_unlock(&ctl_mutex);
2294 done:
2295         kfree(seg_name);
2296         kfree(snap_name);
2297
2298         return ret;
2299 }
2300
2301 static struct bus_attribute rbd_bus_attrs[] = {
2302         __ATTR(add, S_IWUSR, NULL, rbd_add),
2303         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2304         __ATTR_NULL
2305 };
2306
2307 /*
2308  * create control files in sysfs
2309  * /sys/bus/rbd/...
2310  */
2311 static int rbd_sysfs_init(void)
2312 {
2313         int ret;
2314
2315         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2316
2317         ret = bus_register(&rbd_bus_type);
2318          if (ret < 0)
2319                 return ret;
2320
2321         ret = device_register(&rbd_root_dev);
2322
2323         return ret;
2324 }
2325
2326 static void rbd_sysfs_cleanup(void)
2327 {
2328         device_unregister(&rbd_root_dev);
2329         bus_unregister(&rbd_bus_type);
2330 }
2331
2332 int __init rbd_init(void)
2333 {
2334         int rc;
2335
2336         rc = rbd_sysfs_init();
2337         if (rc)
2338                 return rc;
2339         spin_lock_init(&node_lock);
2340         pr_info("loaded " DRV_NAME_LONG "\n");
2341         return 0;
2342 }
2343
2344 void __exit rbd_exit(void)
2345 {
2346         rbd_sysfs_cleanup();
2347 }
2348
2349 module_init(rbd_init);
2350 module_exit(rbd_exit);
2351
2352 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2353 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2354 MODULE_DESCRIPTION("rados block device");
2355
2356 /* following authorship retained from original osdblk.c */
2357 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2358
2359 MODULE_LICENSE("GPL");