drbd: The new, smarter resync speed controller
[linux-2.6.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108                         == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_conf *mdev = e->mdev;
117         sector_t e_sector;
118         int do_wake;
119         int is_syncer_req;
120         int do_al_complete_io;
121
122         /* if this is a failed barrier request, disable use of barriers,
123          * and schedule for resubmission */
124         if (is_failed_barrier(e->flags)) {
125                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126                 spin_lock_irqsave(&mdev->req_lock, flags);
127                 list_del(&e->w.list);
128                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129                 e->w.cb = w_e_reissue;
130                 /* put_ldev actually happens below, once we come here again. */
131                 __release(local);
132                 spin_unlock_irqrestore(&mdev->req_lock, flags);
133                 drbd_queue_work(&mdev->data.work, &e->w);
134                 return;
135         }
136
137         D_ASSERT(e->block_id != ID_VACANT);
138
139         /* after we moved e to done_ee,
140          * we may no longer access it,
141          * it may be freed/reused already!
142          * (as soon as we release the req_lock) */
143         e_sector = e->sector;
144         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145         is_syncer_req = is_syncer_block_id(e->block_id);
146
147         spin_lock_irqsave(&mdev->req_lock, flags);
148         mdev->writ_cnt += e->size >> 9;
149         list_del(&e->w.list); /* has been on active_ee or sync_ee */
150         list_add_tail(&e->w.list, &mdev->done_ee);
151
152         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153          * neither did we wake possibly waiting conflicting requests.
154          * done from "drbd_process_done_ee" within the appropriate w.cb
155          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157         do_wake = is_syncer_req
158                 ? list_empty(&mdev->sync_ee)
159                 : list_empty(&mdev->active_ee);
160
161         if (test_bit(__EE_WAS_ERROR, &e->flags))
162                 __drbd_chk_io_error(mdev, FALSE);
163         spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165         if (is_syncer_req)
166                 drbd_rs_complete_io(mdev, e_sector);
167
168         if (do_wake)
169                 wake_up(&mdev->ee_wait);
170
171         if (do_al_complete_io)
172                 drbd_al_complete_io(mdev, e_sector);
173
174         wake_asender(mdev);
175         put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183         struct drbd_epoch_entry *e = bio->bi_private;
184         struct drbd_conf *mdev = e->mdev;
185         int uptodate = bio_flagged(bio, BIO_UPTODATE);
186         int is_write = bio_data_dir(bio) == WRITE;
187
188         if (error)
189                 dev_warn(DEV, "%s: error=%d s=%llus\n",
190                                 is_write ? "write" : "read", error,
191                                 (unsigned long long)e->sector);
192         if (!error && !uptodate) {
193                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194                                 is_write ? "write" : "read",
195                                 (unsigned long long)e->sector);
196                 /* strange behavior of some lower level drivers...
197                  * fail the request by clearing the uptodate flag,
198                  * but do not return any error?! */
199                 error = -EIO;
200         }
201
202         if (error)
203                 set_bit(__EE_WAS_ERROR, &e->flags);
204
205         bio_put(bio); /* no need for the bio anymore */
206         if (atomic_dec_and_test(&e->pending_bios)) {
207                 if (is_write)
208                         drbd_endio_write_sec_final(e);
209                 else
210                         drbd_endio_read_sec_final(e);
211         }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218         unsigned long flags;
219         struct drbd_request *req = bio->bi_private;
220         struct drbd_conf *mdev = req->mdev;
221         struct bio_and_error m;
222         enum drbd_req_event what;
223         int uptodate = bio_flagged(bio, BIO_UPTODATE);
224
225         if (!error && !uptodate) {
226                 dev_warn(DEV, "p %s: setting error to -EIO\n",
227                          bio_data_dir(bio) == WRITE ? "write" : "read");
228                 /* strange behavior of some lower level drivers...
229                  * fail the request by clearing the uptodate flag,
230                  * but do not return any error?! */
231                 error = -EIO;
232         }
233
234         /* to avoid recursion in __req_mod */
235         if (unlikely(error)) {
236                 what = (bio_data_dir(bio) == WRITE)
237                         ? write_completed_with_error
238                         : (bio_rw(bio) == READ)
239                           ? read_completed_with_error
240                           : read_ahead_completed_with_error;
241         } else
242                 what = completed_ok;
243
244         bio_put(req->private_bio);
245         req->private_bio = ERR_PTR(error);
246
247         spin_lock_irqsave(&mdev->req_lock, flags);
248         __req_mod(req, what, &m);
249         spin_unlock_irqrestore(&mdev->req_lock, flags);
250
251         if (m.bio)
252                 complete_master_bio(mdev, &m);
253 }
254
255 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257         struct drbd_request *req = container_of(w, struct drbd_request, w);
258
259         /* We should not detach for read io-error,
260          * but try to WRITE the P_DATA_REPLY to the failed location,
261          * to give the disk the chance to relocate that block */
262
263         spin_lock_irq(&mdev->req_lock);
264         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
265                 _req_mod(req, read_retry_remote_canceled);
266                 spin_unlock_irq(&mdev->req_lock);
267                 return 1;
268         }
269         spin_unlock_irq(&mdev->req_lock);
270
271         return w_send_read_req(mdev, w, 0);
272 }
273
274 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
275 {
276         ERR_IF(cancel) return 1;
277         dev_err(DEV, "resync inactive, but callback triggered??\n");
278         return 1; /* Simply ignore this! */
279 }
280
281 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
282 {
283         struct hash_desc desc;
284         struct scatterlist sg;
285         struct page *page = e->pages;
286         struct page *tmp;
287         unsigned len;
288
289         desc.tfm = tfm;
290         desc.flags = 0;
291
292         sg_init_table(&sg, 1);
293         crypto_hash_init(&desc);
294
295         while ((tmp = page_chain_next(page))) {
296                 /* all but the last page will be fully used */
297                 sg_set_page(&sg, page, PAGE_SIZE, 0);
298                 crypto_hash_update(&desc, &sg, sg.length);
299                 page = tmp;
300         }
301         /* and now the last, possibly only partially used page */
302         len = e->size & (PAGE_SIZE - 1);
303         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
304         crypto_hash_update(&desc, &sg, sg.length);
305         crypto_hash_final(&desc, digest);
306 }
307
308 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
309 {
310         struct hash_desc desc;
311         struct scatterlist sg;
312         struct bio_vec *bvec;
313         int i;
314
315         desc.tfm = tfm;
316         desc.flags = 0;
317
318         sg_init_table(&sg, 1);
319         crypto_hash_init(&desc);
320
321         __bio_for_each_segment(bvec, bio, i, 0) {
322                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
323                 crypto_hash_update(&desc, &sg, sg.length);
324         }
325         crypto_hash_final(&desc, digest);
326 }
327
328 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
329 {
330         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
331         int digest_size;
332         void *digest;
333         int ok;
334
335         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
336
337         if (unlikely(cancel)) {
338                 drbd_free_ee(mdev, e);
339                 return 1;
340         }
341
342         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
343                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
344                 digest = kmalloc(digest_size, GFP_NOIO);
345                 if (digest) {
346                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
347
348                         inc_rs_pending(mdev);
349                         ok = drbd_send_drequest_csum(mdev,
350                                                      e->sector,
351                                                      e->size,
352                                                      digest,
353                                                      digest_size,
354                                                      P_CSUM_RS_REQUEST);
355                         kfree(digest);
356                 } else {
357                         dev_err(DEV, "kmalloc() of digest failed.\n");
358                         ok = 0;
359                 }
360         } else
361                 ok = 1;
362
363         drbd_free_ee(mdev, e);
364
365         if (unlikely(!ok))
366                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
367         return ok;
368 }
369
370 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
371
372 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
373 {
374         struct drbd_epoch_entry *e;
375
376         if (!get_ldev(mdev))
377                 return 0;
378
379         /* GFP_TRY, because if there is no memory available right now, this may
380          * be rescheduled for later. It is "only" background resync, after all. */
381         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
382         if (!e)
383                 goto fail;
384
385         spin_lock_irq(&mdev->req_lock);
386         list_add(&e->w.list, &mdev->read_ee);
387         spin_unlock_irq(&mdev->req_lock);
388
389         e->w.cb = w_e_send_csum;
390         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
391                 return 1;
392
393         drbd_free_ee(mdev, e);
394 fail:
395         put_ldev(mdev);
396         return 2;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         unsigned long flags;
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403         int queue;
404
405         spin_lock_irqsave(&mdev->req_lock, flags);
406
407         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
408                 queue = 1;
409                 if (mdev->state.conn == C_VERIFY_S)
410                         mdev->resync_work.cb = w_make_ov_request;
411                 else
412                         mdev->resync_work.cb = w_make_resync_request;
413         } else {
414                 queue = 0;
415                 mdev->resync_work.cb = w_resync_inactive;
416         }
417
418         spin_unlock_irqrestore(&mdev->req_lock, flags);
419
420         /* harmless race: list_empty outside data.work.q_lock */
421         if (list_empty(&mdev->resync_work.list) && queue)
422                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
423 }
424
425 static void fifo_set(struct fifo_buffer *fb, int value)
426 {
427         int i;
428
429         for (i = 0; i < fb->size; i++)
430                 fb->values[i] += value;
431 }
432
433 static int fifo_push(struct fifo_buffer *fb, int value)
434 {
435         int ov;
436
437         ov = fb->values[fb->head_index];
438         fb->values[fb->head_index++] = value;
439
440         if (fb->head_index >= fb->size)
441                 fb->head_index = 0;
442
443         return ov;
444 }
445
446 static void fifo_add_val(struct fifo_buffer *fb, int value)
447 {
448         int i;
449
450         for (i = 0; i < fb->size; i++)
451                 fb->values[i] += value;
452 }
453
454 int drbd_rs_controller(struct drbd_conf *mdev)
455 {
456         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
457         unsigned int want;     /* The number of sectors we want in the proxy */
458         int req_sect; /* Number of sectors to request in this turn */
459         int correction; /* Number of sectors more we need in the proxy*/
460         int cps; /* correction per invocation of drbd_rs_controller() */
461         int steps; /* Number of time steps to plan ahead */
462         int curr_corr;
463         int max_sect;
464
465         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
466         mdev->rs_in_flight -= sect_in;
467
468         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
469
470         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
471
472         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
473                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
474         } else { /* normal path */
475                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
476                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
477         }
478
479         correction = want - mdev->rs_in_flight - mdev->rs_planed;
480
481         /* Plan ahead */
482         cps = correction / steps;
483         fifo_add_val(&mdev->rs_plan_s, cps);
484         mdev->rs_planed += cps * steps;
485
486         /* What we do in this step */
487         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
488         spin_unlock(&mdev->peer_seq_lock);
489         mdev->rs_planed -= curr_corr;
490
491         req_sect = sect_in + curr_corr;
492         if (req_sect < 0)
493                 req_sect = 0;
494
495         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
496         if (req_sect > max_sect)
497                 req_sect = max_sect;
498
499         /*
500         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
501                  sect_in, mdev->rs_in_flight, want, correction,
502                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
503         */
504
505         return req_sect;
506 }
507
508 int w_make_resync_request(struct drbd_conf *mdev,
509                 struct drbd_work *w, int cancel)
510 {
511         unsigned long bit;
512         sector_t sector;
513         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
514         int max_segment_size;
515         int number, i, size, pe, mx;
516         int align, queued, sndbuf;
517
518         if (unlikely(cancel))
519                 return 1;
520
521         if (unlikely(mdev->state.conn < C_CONNECTED)) {
522                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
523                 return 0;
524         }
525
526         if (mdev->state.conn != C_SYNC_TARGET)
527                 dev_err(DEV, "%s in w_make_resync_request\n",
528                         drbd_conn_str(mdev->state.conn));
529
530         if (!get_ldev(mdev)) {
531                 /* Since we only need to access mdev->rsync a
532                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
533                    to continue resync with a broken disk makes no sense at
534                    all */
535                 dev_err(DEV, "Disk broke down during resync!\n");
536                 mdev->resync_work.cb = w_resync_inactive;
537                 return 1;
538         }
539
540         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
541          * if it should be necessary */
542         max_segment_size = mdev->agreed_pro_version < 94 ?
543                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
544
545         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
546                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
547                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
548         } else {
549                 mdev->c_sync_rate = mdev->sync_conf.rate;
550                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
551         }
552         pe = atomic_read(&mdev->rs_pending_cnt);
553
554         mutex_lock(&mdev->data.mutex);
555         if (mdev->data.socket)
556                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
557         else
558                 mx = 1;
559         mutex_unlock(&mdev->data.mutex);
560
561         /* For resync rates >160MB/sec, allow more pending RS requests */
562         if (number > mx)
563                 mx = number;
564
565         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
566         if ((pe + number) > mx) {
567                 number = mx - pe;
568         }
569
570         for (i = 0; i < number; i++) {
571                 /* Stop generating RS requests, when half of the send buffer is filled */
572                 mutex_lock(&mdev->data.mutex);
573                 if (mdev->data.socket) {
574                         queued = mdev->data.socket->sk->sk_wmem_queued;
575                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
576                 } else {
577                         queued = 1;
578                         sndbuf = 0;
579                 }
580                 mutex_unlock(&mdev->data.mutex);
581                 if (queued > sndbuf / 2)
582                         goto requeue;
583
584 next_sector:
585                 size = BM_BLOCK_SIZE;
586                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587
588                 if (bit == -1UL) {
589                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
590                         mdev->resync_work.cb = w_resync_inactive;
591                         put_ldev(mdev);
592                         return 1;
593                 }
594
595                 sector = BM_BIT_TO_SECT(bit);
596
597                 if (drbd_try_rs_begin_io(mdev, sector)) {
598                         mdev->bm_resync_fo = bit;
599                         goto requeue;
600                 }
601                 mdev->bm_resync_fo = bit + 1;
602
603                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604                         drbd_rs_complete_io(mdev, sector);
605                         goto next_sector;
606                 }
607
608 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
609                 /* try to find some adjacent bits.
610                  * we stop if we have already the maximum req size.
611                  *
612                  * Additionally always align bigger requests, in order to
613                  * be prepared for all stripe sizes of software RAIDs.
614                  */
615                 align = 1;
616                 for (;;) {
617                         if (size + BM_BLOCK_SIZE > max_segment_size)
618                                 break;
619
620                         /* Be always aligned */
621                         if (sector & ((1<<(align+3))-1))
622                                 break;
623
624                         /* do not cross extent boundaries */
625                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
626                                 break;
627                         /* now, is it actually dirty, after all?
628                          * caution, drbd_bm_test_bit is tri-state for some
629                          * obscure reason; ( b == 0 ) would get the out-of-band
630                          * only accidentally right because of the "oddly sized"
631                          * adjustment below */
632                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
633                                 break;
634                         bit++;
635                         size += BM_BLOCK_SIZE;
636                         if ((BM_BLOCK_SIZE << align) <= size)
637                                 align++;
638                         i++;
639                 }
640                 /* if we merged some,
641                  * reset the offset to start the next drbd_bm_find_next from */
642                 if (size > BM_BLOCK_SIZE)
643                         mdev->bm_resync_fo = bit + 1;
644 #endif
645
646                 /* adjust very last sectors, in case we are oddly sized */
647                 if (sector + (size>>9) > capacity)
648                         size = (capacity-sector)<<9;
649                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
650                         switch (read_for_csum(mdev, sector, size)) {
651                         case 0: /* Disk failure*/
652                                 put_ldev(mdev);
653                                 return 0;
654                         case 2: /* Allocation failed */
655                                 drbd_rs_complete_io(mdev, sector);
656                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
657                                 goto requeue;
658                         /* case 1: everything ok */
659                         }
660                 } else {
661                         inc_rs_pending(mdev);
662                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
663                                                sector, size, ID_SYNCER)) {
664                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
665                                 dec_rs_pending(mdev);
666                                 put_ldev(mdev);
667                                 return 0;
668                         }
669                 }
670         }
671
672         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
673                 /* last syncer _request_ was sent,
674                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
675                  * next sync group will resume), as soon as we receive the last
676                  * resync data block, and the last bit is cleared.
677                  * until then resync "work" is "inactive" ...
678                  */
679                 mdev->resync_work.cb = w_resync_inactive;
680                 put_ldev(mdev);
681                 return 1;
682         }
683
684  requeue:
685         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
686         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
687         put_ldev(mdev);
688         return 1;
689 }
690
691 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
692 {
693         int number, i, size;
694         sector_t sector;
695         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
696
697         if (unlikely(cancel))
698                 return 1;
699
700         if (unlikely(mdev->state.conn < C_CONNECTED)) {
701                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
702                 return 0;
703         }
704
705         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
706         if (atomic_read(&mdev->rs_pending_cnt) > number)
707                 goto requeue;
708
709         number -= atomic_read(&mdev->rs_pending_cnt);
710
711         sector = mdev->ov_position;
712         for (i = 0; i < number; i++) {
713                 if (sector >= capacity) {
714                         mdev->resync_work.cb = w_resync_inactive;
715                         return 1;
716                 }
717
718                 size = BM_BLOCK_SIZE;
719
720                 if (drbd_try_rs_begin_io(mdev, sector)) {
721                         mdev->ov_position = sector;
722                         goto requeue;
723                 }
724
725                 if (sector + (size>>9) > capacity)
726                         size = (capacity-sector)<<9;
727
728                 inc_rs_pending(mdev);
729                 if (!drbd_send_ov_request(mdev, sector, size)) {
730                         dec_rs_pending(mdev);
731                         return 0;
732                 }
733                 sector += BM_SECT_PER_BIT;
734         }
735         mdev->ov_position = sector;
736
737  requeue:
738         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
739         return 1;
740 }
741
742
743 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
744 {
745         kfree(w);
746         ov_oos_print(mdev);
747         drbd_resync_finished(mdev);
748
749         return 1;
750 }
751
752 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
753 {
754         kfree(w);
755
756         drbd_resync_finished(mdev);
757
758         return 1;
759 }
760
761 int drbd_resync_finished(struct drbd_conf *mdev)
762 {
763         unsigned long db, dt, dbdt;
764         unsigned long n_oos;
765         union drbd_state os, ns;
766         struct drbd_work *w;
767         char *khelper_cmd = NULL;
768
769         /* Remove all elements from the resync LRU. Since future actions
770          * might set bits in the (main) bitmap, then the entries in the
771          * resync LRU would be wrong. */
772         if (drbd_rs_del_all(mdev)) {
773                 /* In case this is not possible now, most probably because
774                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
775                  * queue (or even the read operations for those packets
776                  * is not finished by now).   Retry in 100ms. */
777
778                 drbd_kick_lo(mdev);
779                 __set_current_state(TASK_INTERRUPTIBLE);
780                 schedule_timeout(HZ / 10);
781                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
782                 if (w) {
783                         w->cb = w_resync_finished;
784                         drbd_queue_work(&mdev->data.work, w);
785                         return 1;
786                 }
787                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
788         }
789
790         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
791         if (dt <= 0)
792                 dt = 1;
793         db = mdev->rs_total;
794         dbdt = Bit2KB(db/dt);
795         mdev->rs_paused /= HZ;
796
797         if (!get_ldev(mdev))
798                 goto out;
799
800         spin_lock_irq(&mdev->req_lock);
801         os = mdev->state;
802
803         /* This protects us against multiple calls (that can happen in the presence
804            of application IO), and against connectivity loss just before we arrive here. */
805         if (os.conn <= C_CONNECTED)
806                 goto out_unlock;
807
808         ns = os;
809         ns.conn = C_CONNECTED;
810
811         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
812              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
813              "Online verify " : "Resync",
814              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
815
816         n_oos = drbd_bm_total_weight(mdev);
817
818         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
819                 if (n_oos) {
820                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
821                               n_oos, Bit2KB(1));
822                         khelper_cmd = "out-of-sync";
823                 }
824         } else {
825                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
826
827                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
828                         khelper_cmd = "after-resync-target";
829
830                 if (mdev->csums_tfm && mdev->rs_total) {
831                         const unsigned long s = mdev->rs_same_csum;
832                         const unsigned long t = mdev->rs_total;
833                         const int ratio =
834                                 (t == 0)     ? 0 :
835                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
836                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
837                              "transferred %luK total %luK\n",
838                              ratio,
839                              Bit2KB(mdev->rs_same_csum),
840                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
841                              Bit2KB(mdev->rs_total));
842                 }
843         }
844
845         if (mdev->rs_failed) {
846                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
847
848                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
849                         ns.disk = D_INCONSISTENT;
850                         ns.pdsk = D_UP_TO_DATE;
851                 } else {
852                         ns.disk = D_UP_TO_DATE;
853                         ns.pdsk = D_INCONSISTENT;
854                 }
855         } else {
856                 ns.disk = D_UP_TO_DATE;
857                 ns.pdsk = D_UP_TO_DATE;
858
859                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
860                         if (mdev->p_uuid) {
861                                 int i;
862                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
863                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
864                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
865                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
866                         } else {
867                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
868                         }
869                 }
870
871                 drbd_uuid_set_bm(mdev, 0UL);
872
873                 if (mdev->p_uuid) {
874                         /* Now the two UUID sets are equal, update what we
875                          * know of the peer. */
876                         int i;
877                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
878                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
879                 }
880         }
881
882         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
883 out_unlock:
884         spin_unlock_irq(&mdev->req_lock);
885         put_ldev(mdev);
886 out:
887         mdev->rs_total  = 0;
888         mdev->rs_failed = 0;
889         mdev->rs_paused = 0;
890         mdev->ov_start_sector = 0;
891
892         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
893                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
894                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
895         }
896
897         if (khelper_cmd)
898                 drbd_khelper(mdev, khelper_cmd);
899
900         return 1;
901 }
902
903 /* helper */
904 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
905 {
906         if (drbd_ee_has_active_page(e)) {
907                 /* This might happen if sendpage() has not finished */
908                 spin_lock_irq(&mdev->req_lock);
909                 list_add_tail(&e->w.list, &mdev->net_ee);
910                 spin_unlock_irq(&mdev->req_lock);
911         } else
912                 drbd_free_ee(mdev, e);
913 }
914
915 /**
916  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
917  * @mdev:       DRBD device.
918  * @w:          work object.
919  * @cancel:     The connection will be closed anyways
920  */
921 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
922 {
923         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
924         int ok;
925
926         if (unlikely(cancel)) {
927                 drbd_free_ee(mdev, e);
928                 dec_unacked(mdev);
929                 return 1;
930         }
931
932         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
933                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
934         } else {
935                 if (__ratelimit(&drbd_ratelimit_state))
936                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
937                             (unsigned long long)e->sector);
938
939                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
940         }
941
942         dec_unacked(mdev);
943
944         move_to_net_ee_or_free(mdev, e);
945
946         if (unlikely(!ok))
947                 dev_err(DEV, "drbd_send_block() failed\n");
948         return ok;
949 }
950
951 /**
952  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
953  * @mdev:       DRBD device.
954  * @w:          work object.
955  * @cancel:     The connection will be closed anyways
956  */
957 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
958 {
959         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
960         int ok;
961
962         if (unlikely(cancel)) {
963                 drbd_free_ee(mdev, e);
964                 dec_unacked(mdev);
965                 return 1;
966         }
967
968         if (get_ldev_if_state(mdev, D_FAILED)) {
969                 drbd_rs_complete_io(mdev, e->sector);
970                 put_ldev(mdev);
971         }
972
973         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
974                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
975                         inc_rs_pending(mdev);
976                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
977                 } else {
978                         if (__ratelimit(&drbd_ratelimit_state))
979                                 dev_err(DEV, "Not sending RSDataReply, "
980                                     "partner DISKLESS!\n");
981                         ok = 1;
982                 }
983         } else {
984                 if (__ratelimit(&drbd_ratelimit_state))
985                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
986                             (unsigned long long)e->sector);
987
988                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
989
990                 /* update resync data with failure */
991                 drbd_rs_failed_io(mdev, e->sector, e->size);
992         }
993
994         dec_unacked(mdev);
995
996         move_to_net_ee_or_free(mdev, e);
997
998         if (unlikely(!ok))
999                 dev_err(DEV, "drbd_send_block() failed\n");
1000         return ok;
1001 }
1002
1003 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1004 {
1005         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1006         struct digest_info *di;
1007         int digest_size;
1008         void *digest = NULL;
1009         int ok, eq = 0;
1010
1011         if (unlikely(cancel)) {
1012                 drbd_free_ee(mdev, e);
1013                 dec_unacked(mdev);
1014                 return 1;
1015         }
1016
1017         drbd_rs_complete_io(mdev, e->sector);
1018
1019         di = (struct digest_info *)(unsigned long)e->block_id;
1020
1021         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1022                 /* quick hack to try to avoid a race against reconfiguration.
1023                  * a real fix would be much more involved,
1024                  * introducing more locking mechanisms */
1025                 if (mdev->csums_tfm) {
1026                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1027                         D_ASSERT(digest_size == di->digest_size);
1028                         digest = kmalloc(digest_size, GFP_NOIO);
1029                 }
1030                 if (digest) {
1031                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1032                         eq = !memcmp(digest, di->digest, digest_size);
1033                         kfree(digest);
1034                 }
1035
1036                 if (eq) {
1037                         drbd_set_in_sync(mdev, e->sector, e->size);
1038                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1039                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1040                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1041                 } else {
1042                         inc_rs_pending(mdev);
1043                         e->block_id = ID_SYNCER;
1044                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1045                 }
1046         } else {
1047                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1048                 if (__ratelimit(&drbd_ratelimit_state))
1049                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1050         }
1051
1052         dec_unacked(mdev);
1053
1054         kfree(di);
1055
1056         move_to_net_ee_or_free(mdev, e);
1057
1058         if (unlikely(!ok))
1059                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1060         return ok;
1061 }
1062
1063 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1064 {
1065         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1066         int digest_size;
1067         void *digest;
1068         int ok = 1;
1069
1070         if (unlikely(cancel))
1071                 goto out;
1072
1073         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1074                 goto out;
1075
1076         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1077         /* FIXME if this allocation fails, online verify will not terminate! */
1078         digest = kmalloc(digest_size, GFP_NOIO);
1079         if (digest) {
1080                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1081                 inc_rs_pending(mdev);
1082                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1083                                              digest, digest_size, P_OV_REPLY);
1084                 if (!ok)
1085                         dec_rs_pending(mdev);
1086                 kfree(digest);
1087         }
1088
1089 out:
1090         drbd_free_ee(mdev, e);
1091
1092         dec_unacked(mdev);
1093
1094         return ok;
1095 }
1096
1097 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1098 {
1099         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1100                 mdev->ov_last_oos_size += size>>9;
1101         } else {
1102                 mdev->ov_last_oos_start = sector;
1103                 mdev->ov_last_oos_size = size>>9;
1104         }
1105         drbd_set_out_of_sync(mdev, sector, size);
1106         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1107 }
1108
1109 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1110 {
1111         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1112         struct digest_info *di;
1113         int digest_size;
1114         void *digest;
1115         int ok, eq = 0;
1116
1117         if (unlikely(cancel)) {
1118                 drbd_free_ee(mdev, e);
1119                 dec_unacked(mdev);
1120                 return 1;
1121         }
1122
1123         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1124          * the resync lru has been cleaned up already */
1125         drbd_rs_complete_io(mdev, e->sector);
1126
1127         di = (struct digest_info *)(unsigned long)e->block_id;
1128
1129         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1130                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1131                 digest = kmalloc(digest_size, GFP_NOIO);
1132                 if (digest) {
1133                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1134
1135                         D_ASSERT(digest_size == di->digest_size);
1136                         eq = !memcmp(digest, di->digest, digest_size);
1137                         kfree(digest);
1138                 }
1139         } else {
1140                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1141                 if (__ratelimit(&drbd_ratelimit_state))
1142                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1143         }
1144
1145         dec_unacked(mdev);
1146
1147         kfree(di);
1148
1149         if (!eq)
1150                 drbd_ov_oos_found(mdev, e->sector, e->size);
1151         else
1152                 ov_oos_print(mdev);
1153
1154         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1155                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1156
1157         drbd_free_ee(mdev, e);
1158
1159         if (--mdev->ov_left == 0) {
1160                 ov_oos_print(mdev);
1161                 drbd_resync_finished(mdev);
1162         }
1163
1164         return ok;
1165 }
1166
1167 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1168 {
1169         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1170         complete(&b->done);
1171         return 1;
1172 }
1173
1174 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1175 {
1176         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1177         struct p_barrier *p = &mdev->data.sbuf.barrier;
1178         int ok = 1;
1179
1180         /* really avoid racing with tl_clear.  w.cb may have been referenced
1181          * just before it was reassigned and re-queued, so double check that.
1182          * actually, this race was harmless, since we only try to send the
1183          * barrier packet here, and otherwise do nothing with the object.
1184          * but compare with the head of w_clear_epoch */
1185         spin_lock_irq(&mdev->req_lock);
1186         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1187                 cancel = 1;
1188         spin_unlock_irq(&mdev->req_lock);
1189         if (cancel)
1190                 return 1;
1191
1192         if (!drbd_get_data_sock(mdev))
1193                 return 0;
1194         p->barrier = b->br_number;
1195         /* inc_ap_pending was done where this was queued.
1196          * dec_ap_pending will be done in got_BarrierAck
1197          * or (on connection loss) in w_clear_epoch.  */
1198         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1199                                 (struct p_header *)p, sizeof(*p), 0);
1200         drbd_put_data_sock(mdev);
1201
1202         return ok;
1203 }
1204
1205 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1206 {
1207         if (cancel)
1208                 return 1;
1209         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1210 }
1211
1212 /**
1213  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1214  * @mdev:       DRBD device.
1215  * @w:          work object.
1216  * @cancel:     The connection will be closed anyways
1217  */
1218 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1219 {
1220         struct drbd_request *req = container_of(w, struct drbd_request, w);
1221         int ok;
1222
1223         if (unlikely(cancel)) {
1224                 req_mod(req, send_canceled);
1225                 return 1;
1226         }
1227
1228         ok = drbd_send_dblock(mdev, req);
1229         req_mod(req, ok ? handed_over_to_network : send_failed);
1230
1231         return ok;
1232 }
1233
1234 /**
1235  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1236  * @mdev:       DRBD device.
1237  * @w:          work object.
1238  * @cancel:     The connection will be closed anyways
1239  */
1240 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1241 {
1242         struct drbd_request *req = container_of(w, struct drbd_request, w);
1243         int ok;
1244
1245         if (unlikely(cancel)) {
1246                 req_mod(req, send_canceled);
1247                 return 1;
1248         }
1249
1250         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1251                                 (unsigned long)req);
1252
1253         if (!ok) {
1254                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1255                  * so this is probably redundant */
1256                 if (mdev->state.conn >= C_CONNECTED)
1257                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1258         }
1259         req_mod(req, ok ? handed_over_to_network : send_failed);
1260
1261         return ok;
1262 }
1263
1264 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265 {
1266         struct drbd_request *req = container_of(w, struct drbd_request, w);
1267
1268         if (bio_data_dir(req->master_bio) == WRITE)
1269                 drbd_al_begin_io(mdev, req->sector);
1270         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1271            theoretically. Practically it can not deadlock, since this is
1272            only used when unfreezing IOs. All the extents of the requests
1273            that made it into the TL are already active */
1274
1275         drbd_req_make_private_bio(req, req->master_bio);
1276         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1277         generic_make_request(req->private_bio);
1278
1279         return 1;
1280 }
1281
1282 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1283 {
1284         struct drbd_conf *odev = mdev;
1285
1286         while (1) {
1287                 if (odev->sync_conf.after == -1)
1288                         return 1;
1289                 odev = minor_to_mdev(odev->sync_conf.after);
1290                 ERR_IF(!odev) return 1;
1291                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1292                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1293                     odev->state.aftr_isp || odev->state.peer_isp ||
1294                     odev->state.user_isp)
1295                         return 0;
1296         }
1297 }
1298
1299 /**
1300  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1301  * @mdev:       DRBD device.
1302  *
1303  * Called from process context only (admin command and after_state_ch).
1304  */
1305 static int _drbd_pause_after(struct drbd_conf *mdev)
1306 {
1307         struct drbd_conf *odev;
1308         int i, rv = 0;
1309
1310         for (i = 0; i < minor_count; i++) {
1311                 odev = minor_to_mdev(i);
1312                 if (!odev)
1313                         continue;
1314                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1315                         continue;
1316                 if (!_drbd_may_sync_now(odev))
1317                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1318                                != SS_NOTHING_TO_DO);
1319         }
1320
1321         return rv;
1322 }
1323
1324 /**
1325  * _drbd_resume_next() - Resume resync on all devices that may resync now
1326  * @mdev:       DRBD device.
1327  *
1328  * Called from process context only (admin command and worker).
1329  */
1330 static int _drbd_resume_next(struct drbd_conf *mdev)
1331 {
1332         struct drbd_conf *odev;
1333         int i, rv = 0;
1334
1335         for (i = 0; i < minor_count; i++) {
1336                 odev = minor_to_mdev(i);
1337                 if (!odev)
1338                         continue;
1339                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1340                         continue;
1341                 if (odev->state.aftr_isp) {
1342                         if (_drbd_may_sync_now(odev))
1343                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1344                                                         CS_HARD, NULL)
1345                                        != SS_NOTHING_TO_DO) ;
1346                 }
1347         }
1348         return rv;
1349 }
1350
1351 void resume_next_sg(struct drbd_conf *mdev)
1352 {
1353         write_lock_irq(&global_state_lock);
1354         _drbd_resume_next(mdev);
1355         write_unlock_irq(&global_state_lock);
1356 }
1357
1358 void suspend_other_sg(struct drbd_conf *mdev)
1359 {
1360         write_lock_irq(&global_state_lock);
1361         _drbd_pause_after(mdev);
1362         write_unlock_irq(&global_state_lock);
1363 }
1364
1365 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1366 {
1367         struct drbd_conf *odev;
1368
1369         if (o_minor == -1)
1370                 return NO_ERROR;
1371         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1372                 return ERR_SYNC_AFTER;
1373
1374         /* check for loops */
1375         odev = minor_to_mdev(o_minor);
1376         while (1) {
1377                 if (odev == mdev)
1378                         return ERR_SYNC_AFTER_CYCLE;
1379
1380                 /* dependency chain ends here, no cycles. */
1381                 if (odev->sync_conf.after == -1)
1382                         return NO_ERROR;
1383
1384                 /* follow the dependency chain */
1385                 odev = minor_to_mdev(odev->sync_conf.after);
1386         }
1387 }
1388
1389 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1390 {
1391         int changes;
1392         int retcode;
1393
1394         write_lock_irq(&global_state_lock);
1395         retcode = sync_after_error(mdev, na);
1396         if (retcode == NO_ERROR) {
1397                 mdev->sync_conf.after = na;
1398                 do {
1399                         changes  = _drbd_pause_after(mdev);
1400                         changes |= _drbd_resume_next(mdev);
1401                 } while (changes);
1402         }
1403         write_unlock_irq(&global_state_lock);
1404         return retcode;
1405 }
1406
1407 static void ping_peer(struct drbd_conf *mdev)
1408 {
1409         clear_bit(GOT_PING_ACK, &mdev->flags);
1410         request_ping(mdev);
1411         wait_event(mdev->misc_wait,
1412                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1413 }
1414
1415 /**
1416  * drbd_start_resync() - Start the resync process
1417  * @mdev:       DRBD device.
1418  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1419  *
1420  * This function might bring you directly into one of the
1421  * C_PAUSED_SYNC_* states.
1422  */
1423 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1424 {
1425         union drbd_state ns;
1426         int r;
1427
1428         if (mdev->state.conn >= C_SYNC_SOURCE) {
1429                 dev_err(DEV, "Resync already running!\n");
1430                 return;
1431         }
1432
1433         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1434         drbd_rs_cancel_all(mdev);
1435
1436         if (side == C_SYNC_TARGET) {
1437                 /* Since application IO was locked out during C_WF_BITMAP_T and
1438                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1439                    we check that we might make the data inconsistent. */
1440                 r = drbd_khelper(mdev, "before-resync-target");
1441                 r = (r >> 8) & 0xff;
1442                 if (r > 0) {
1443                         dev_info(DEV, "before-resync-target handler returned %d, "
1444                              "dropping connection.\n", r);
1445                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1446                         return;
1447                 }
1448         }
1449
1450         drbd_state_lock(mdev);
1451
1452         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1453                 drbd_state_unlock(mdev);
1454                 return;
1455         }
1456
1457         if (side == C_SYNC_TARGET) {
1458                 mdev->bm_resync_fo = 0;
1459         } else /* side == C_SYNC_SOURCE */ {
1460                 u64 uuid;
1461
1462                 get_random_bytes(&uuid, sizeof(u64));
1463                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1464                 drbd_send_sync_uuid(mdev, uuid);
1465
1466                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1467         }
1468
1469         write_lock_irq(&global_state_lock);
1470         ns = mdev->state;
1471
1472         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1473
1474         ns.conn = side;
1475
1476         if (side == C_SYNC_TARGET)
1477                 ns.disk = D_INCONSISTENT;
1478         else /* side == C_SYNC_SOURCE */
1479                 ns.pdsk = D_INCONSISTENT;
1480
1481         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1482         ns = mdev->state;
1483
1484         if (ns.conn < C_CONNECTED)
1485                 r = SS_UNKNOWN_ERROR;
1486
1487         if (r == SS_SUCCESS) {
1488                 mdev->rs_total     =
1489                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1490                 mdev->rs_failed    = 0;
1491                 mdev->rs_paused    = 0;
1492                 mdev->rs_start     =
1493                 mdev->rs_mark_time = jiffies;
1494                 mdev->rs_same_csum = 0;
1495                 _drbd_pause_after(mdev);
1496         }
1497         write_unlock_irq(&global_state_lock);
1498         put_ldev(mdev);
1499
1500         if (r == SS_SUCCESS) {
1501                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1502                      drbd_conn_str(ns.conn),
1503                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1504                      (unsigned long) mdev->rs_total);
1505
1506                 if (mdev->rs_total == 0) {
1507                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1508                         ping_peer(mdev);
1509                         drbd_resync_finished(mdev);
1510                 }
1511
1512                 atomic_set(&mdev->rs_sect_in, 0);
1513                 mdev->rs_in_flight = 0;
1514                 mdev->rs_planed = 0;
1515                 spin_lock(&mdev->peer_seq_lock);
1516                 fifo_set(&mdev->rs_plan_s, 0);
1517                 spin_unlock(&mdev->peer_seq_lock);
1518                 /* ns.conn may already be != mdev->state.conn,
1519                  * we may have been paused in between, or become paused until
1520                  * the timer triggers.
1521                  * No matter, that is handled in resync_timer_fn() */
1522                 if (ns.conn == C_SYNC_TARGET)
1523                         mod_timer(&mdev->resync_timer, jiffies);
1524
1525                 drbd_md_sync(mdev);
1526         }
1527         drbd_state_unlock(mdev);
1528 }
1529
1530 int drbd_worker(struct drbd_thread *thi)
1531 {
1532         struct drbd_conf *mdev = thi->mdev;
1533         struct drbd_work *w = NULL;
1534         LIST_HEAD(work_list);
1535         int intr = 0, i;
1536
1537         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1538
1539         while (get_t_state(thi) == Running) {
1540                 drbd_thread_current_set_cpu(mdev);
1541
1542                 if (down_trylock(&mdev->data.work.s)) {
1543                         mutex_lock(&mdev->data.mutex);
1544                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1545                                 drbd_tcp_uncork(mdev->data.socket);
1546                         mutex_unlock(&mdev->data.mutex);
1547
1548                         intr = down_interruptible(&mdev->data.work.s);
1549
1550                         mutex_lock(&mdev->data.mutex);
1551                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1552                                 drbd_tcp_cork(mdev->data.socket);
1553                         mutex_unlock(&mdev->data.mutex);
1554                 }
1555
1556                 if (intr) {
1557                         D_ASSERT(intr == -EINTR);
1558                         flush_signals(current);
1559                         ERR_IF (get_t_state(thi) == Running)
1560                                 continue;
1561                         break;
1562                 }
1563
1564                 if (get_t_state(thi) != Running)
1565                         break;
1566                 /* With this break, we have done a down() but not consumed
1567                    the entry from the list. The cleanup code takes care of
1568                    this...   */
1569
1570                 w = NULL;
1571                 spin_lock_irq(&mdev->data.work.q_lock);
1572                 ERR_IF(list_empty(&mdev->data.work.q)) {
1573                         /* something terribly wrong in our logic.
1574                          * we were able to down() the semaphore,
1575                          * but the list is empty... doh.
1576                          *
1577                          * what is the best thing to do now?
1578                          * try again from scratch, restarting the receiver,
1579                          * asender, whatnot? could break even more ugly,
1580                          * e.g. when we are primary, but no good local data.
1581                          *
1582                          * I'll try to get away just starting over this loop.
1583                          */
1584                         spin_unlock_irq(&mdev->data.work.q_lock);
1585                         continue;
1586                 }
1587                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1588                 list_del_init(&w->list);
1589                 spin_unlock_irq(&mdev->data.work.q_lock);
1590
1591                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1592                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1593                         if (mdev->state.conn >= C_CONNECTED)
1594                                 drbd_force_state(mdev,
1595                                                 NS(conn, C_NETWORK_FAILURE));
1596                 }
1597         }
1598         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1599         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1600
1601         spin_lock_irq(&mdev->data.work.q_lock);
1602         i = 0;
1603         while (!list_empty(&mdev->data.work.q)) {
1604                 list_splice_init(&mdev->data.work.q, &work_list);
1605                 spin_unlock_irq(&mdev->data.work.q_lock);
1606
1607                 while (!list_empty(&work_list)) {
1608                         w = list_entry(work_list.next, struct drbd_work, list);
1609                         list_del_init(&w->list);
1610                         w->cb(mdev, w, 1);
1611                         i++; /* dead debugging code */
1612                 }
1613
1614                 spin_lock_irq(&mdev->data.work.q_lock);
1615         }
1616         sema_init(&mdev->data.work.s, 0);
1617         /* DANGEROUS race: if someone did queue his work within the spinlock,
1618          * but up() ed outside the spinlock, we could get an up() on the
1619          * semaphore without corresponding list entry.
1620          * So don't do that.
1621          */
1622         spin_unlock_irq(&mdev->data.work.q_lock);
1623
1624         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1625         /* _drbd_set_state only uses stop_nowait.
1626          * wait here for the Exiting receiver. */
1627         drbd_thread_stop(&mdev->receiver);
1628         drbd_mdev_cleanup(mdev);
1629
1630         dev_info(DEV, "worker terminated\n");
1631
1632         clear_bit(DEVICE_DYING, &mdev->flags);
1633         clear_bit(CONFIG_PENDING, &mdev->flags);
1634         wake_up(&mdev->state_wait);
1635
1636         return 0;
1637 }