b0551ba7ad0c9355a36621d3a6405ba76a8ab9d0
[linux-3.10.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 /* writes on behalf of the partner, or resync writes,
106  * "submitted" by the receiver, final stage.  */
107 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
108 {
109         unsigned long flags = 0;
110         struct drbd_conf *mdev = e->mdev;
111         sector_t e_sector;
112         int do_wake;
113         int is_syncer_req;
114         int do_al_complete_io;
115
116         D_ASSERT(e->block_id != ID_VACANT);
117
118         /* after we moved e to done_ee,
119          * we may no longer access it,
120          * it may be freed/reused already!
121          * (as soon as we release the req_lock) */
122         e_sector = e->sector;
123         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
124         is_syncer_req = is_syncer_block_id(e->block_id);
125
126         spin_lock_irqsave(&mdev->req_lock, flags);
127         mdev->writ_cnt += e->size >> 9;
128         list_del(&e->w.list); /* has been on active_ee or sync_ee */
129         list_add_tail(&e->w.list, &mdev->done_ee);
130
131         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
132          * neither did we wake possibly waiting conflicting requests.
133          * done from "drbd_process_done_ee" within the appropriate w.cb
134          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
135
136         do_wake = is_syncer_req
137                 ? list_empty(&mdev->sync_ee)
138                 : list_empty(&mdev->active_ee);
139
140         if (test_bit(__EE_WAS_ERROR, &e->flags))
141                 __drbd_chk_io_error(mdev, FALSE);
142         spin_unlock_irqrestore(&mdev->req_lock, flags);
143
144         if (is_syncer_req)
145                 drbd_rs_complete_io(mdev, e_sector);
146
147         if (do_wake)
148                 wake_up(&mdev->ee_wait);
149
150         if (do_al_complete_io)
151                 drbd_al_complete_io(mdev, e_sector);
152
153         wake_asender(mdev);
154         put_ldev(mdev);
155 }
156
157 /* writes on behalf of the partner, or resync writes,
158  * "submitted" by the receiver.
159  */
160 void drbd_endio_sec(struct bio *bio, int error)
161 {
162         struct drbd_epoch_entry *e = bio->bi_private;
163         struct drbd_conf *mdev = e->mdev;
164         int uptodate = bio_flagged(bio, BIO_UPTODATE);
165         int is_write = bio_data_dir(bio) == WRITE;
166
167         if (error)
168                 dev_warn(DEV, "%s: error=%d s=%llus\n",
169                                 is_write ? "write" : "read", error,
170                                 (unsigned long long)e->sector);
171         if (!error && !uptodate) {
172                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
173                                 is_write ? "write" : "read",
174                                 (unsigned long long)e->sector);
175                 /* strange behavior of some lower level drivers...
176                  * fail the request by clearing the uptodate flag,
177                  * but do not return any error?! */
178                 error = -EIO;
179         }
180
181         if (error)
182                 set_bit(__EE_WAS_ERROR, &e->flags);
183
184         bio_put(bio); /* no need for the bio anymore */
185         if (atomic_dec_and_test(&e->pending_bios)) {
186                 if (is_write)
187                         drbd_endio_write_sec_final(e);
188                 else
189                         drbd_endio_read_sec_final(e);
190         }
191 }
192
193 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
194  */
195 void drbd_endio_pri(struct bio *bio, int error)
196 {
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         enum drbd_req_event what;
200         int uptodate = bio_flagged(bio, BIO_UPTODATE);
201
202         if (!error && !uptodate) {
203                 dev_warn(DEV, "p %s: setting error to -EIO\n",
204                          bio_data_dir(bio) == WRITE ? "write" : "read");
205                 /* strange behavior of some lower level drivers...
206                  * fail the request by clearing the uptodate flag,
207                  * but do not return any error?! */
208                 error = -EIO;
209         }
210
211         /* to avoid recursion in __req_mod */
212         if (unlikely(error)) {
213                 what = (bio_data_dir(bio) == WRITE)
214                         ? write_completed_with_error
215                         : (bio_rw(bio) == READ)
216                           ? read_completed_with_error
217                           : read_ahead_completed_with_error;
218         } else
219                 what = completed_ok;
220
221         bio_put(req->private_bio);
222         req->private_bio = ERR_PTR(error);
223
224         req_mod(req, what);
225 }
226
227 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
228 {
229         struct drbd_request *req = container_of(w, struct drbd_request, w);
230
231         /* We should not detach for read io-error,
232          * but try to WRITE the P_DATA_REPLY to the failed location,
233          * to give the disk the chance to relocate that block */
234
235         spin_lock_irq(&mdev->req_lock);
236         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
237                 _req_mod(req, read_retry_remote_canceled);
238                 spin_unlock_irq(&mdev->req_lock);
239                 return 1;
240         }
241         spin_unlock_irq(&mdev->req_lock);
242
243         return w_send_read_req(mdev, w, 0);
244 }
245
246 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
247 {
248         ERR_IF(cancel) return 1;
249         dev_err(DEV, "resync inactive, but callback triggered??\n");
250         return 1; /* Simply ignore this! */
251 }
252
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255         struct hash_desc desc;
256         struct scatterlist sg;
257         struct page *page = e->pages;
258         struct page *tmp;
259         unsigned len;
260
261         desc.tfm = tfm;
262         desc.flags = 0;
263
264         sg_init_table(&sg, 1);
265         crypto_hash_init(&desc);
266
267         while ((tmp = page_chain_next(page))) {
268                 /* all but the last page will be fully used */
269                 sg_set_page(&sg, page, PAGE_SIZE, 0);
270                 crypto_hash_update(&desc, &sg, sg.length);
271                 page = tmp;
272         }
273         /* and now the last, possibly only partially used page */
274         len = e->size & (PAGE_SIZE - 1);
275         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276         crypto_hash_update(&desc, &sg, sg.length);
277         crypto_hash_final(&desc, digest);
278 }
279
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282         struct hash_desc desc;
283         struct scatterlist sg;
284         struct bio_vec *bvec;
285         int i;
286
287         desc.tfm = tfm;
288         desc.flags = 0;
289
290         sg_init_table(&sg, 1);
291         crypto_hash_init(&desc);
292
293         __bio_for_each_segment(bvec, bio, i, 0) {
294                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295                 crypto_hash_update(&desc, &sg, sg.length);
296         }
297         crypto_hash_final(&desc, digest);
298 }
299
300 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
301 {
302         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
303         int digest_size;
304         void *digest;
305         int ok;
306
307         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
308
309         if (unlikely(cancel)) {
310                 drbd_free_ee(mdev, e);
311                 return 1;
312         }
313
314         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
315                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
316                 digest = kmalloc(digest_size, GFP_NOIO);
317                 if (digest) {
318                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
319
320                         inc_rs_pending(mdev);
321                         ok = drbd_send_drequest_csum(mdev,
322                                                      e->sector,
323                                                      e->size,
324                                                      digest,
325                                                      digest_size,
326                                                      P_CSUM_RS_REQUEST);
327                         kfree(digest);
328                 } else {
329                         dev_err(DEV, "kmalloc() of digest failed.\n");
330                         ok = 0;
331                 }
332         } else
333                 ok = 1;
334
335         drbd_free_ee(mdev, e);
336
337         if (unlikely(!ok))
338                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
339         return ok;
340 }
341
342 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
343
344 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
345 {
346         struct drbd_epoch_entry *e;
347
348         if (!get_ldev(mdev))
349                 return -EIO;
350
351         if (drbd_rs_should_slow_down(mdev))
352                 goto defer;
353
354         /* GFP_TRY, because if there is no memory available right now, this may
355          * be rescheduled for later. It is "only" background resync, after all. */
356         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
357         if (!e)
358                 goto defer;
359
360         e->w.cb = w_e_send_csum;
361         spin_lock_irq(&mdev->req_lock);
362         list_add(&e->w.list, &mdev->read_ee);
363         spin_unlock_irq(&mdev->req_lock);
364
365         atomic_add(size >> 9, &mdev->rs_sect_ev);
366         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
367                 return 0;
368
369         /* drbd_submit_ee currently fails for one reason only:
370          * not being able to allocate enough bios.
371          * Is dropping the connection going to help? */
372         spin_lock_irq(&mdev->req_lock);
373         list_del(&e->w.list);
374         spin_unlock_irq(&mdev->req_lock);
375
376         drbd_free_ee(mdev, e);
377 defer:
378         put_ldev(mdev);
379         return -EAGAIN;
380 }
381
382 void resync_timer_fn(unsigned long data)
383 {
384         struct drbd_conf *mdev = (struct drbd_conf *) data;
385         int queue;
386
387         queue = 1;
388         switch (mdev->state.conn) {
389         case C_VERIFY_S:
390                 mdev->resync_work.cb = w_make_ov_request;
391                 break;
392         case C_SYNC_TARGET:
393                 mdev->resync_work.cb = w_make_resync_request;
394                 break;
395         default:
396                 queue = 0;
397                 mdev->resync_work.cb = w_resync_inactive;
398         }
399
400         /* harmless race: list_empty outside data.work.q_lock */
401         if (list_empty(&mdev->resync_work.list) && queue)
402                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
403 }
404
405 static void fifo_set(struct fifo_buffer *fb, int value)
406 {
407         int i;
408
409         for (i = 0; i < fb->size; i++)
410                 fb->values[i] = value;
411 }
412
413 static int fifo_push(struct fifo_buffer *fb, int value)
414 {
415         int ov;
416
417         ov = fb->values[fb->head_index];
418         fb->values[fb->head_index++] = value;
419
420         if (fb->head_index >= fb->size)
421                 fb->head_index = 0;
422
423         return ov;
424 }
425
426 static void fifo_add_val(struct fifo_buffer *fb, int value)
427 {
428         int i;
429
430         for (i = 0; i < fb->size; i++)
431                 fb->values[i] += value;
432 }
433
434 int drbd_rs_controller(struct drbd_conf *mdev)
435 {
436         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
437         unsigned int want;     /* The number of sectors we want in the proxy */
438         int req_sect; /* Number of sectors to request in this turn */
439         int correction; /* Number of sectors more we need in the proxy*/
440         int cps; /* correction per invocation of drbd_rs_controller() */
441         int steps; /* Number of time steps to plan ahead */
442         int curr_corr;
443         int max_sect;
444
445         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
446         mdev->rs_in_flight -= sect_in;
447
448         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
449
450         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
451
452         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
453                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
454         } else { /* normal path */
455                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
456                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
457         }
458
459         correction = want - mdev->rs_in_flight - mdev->rs_planed;
460
461         /* Plan ahead */
462         cps = correction / steps;
463         fifo_add_val(&mdev->rs_plan_s, cps);
464         mdev->rs_planed += cps * steps;
465
466         /* What we do in this step */
467         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
468         spin_unlock(&mdev->peer_seq_lock);
469         mdev->rs_planed -= curr_corr;
470
471         req_sect = sect_in + curr_corr;
472         if (req_sect < 0)
473                 req_sect = 0;
474
475         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
476         if (req_sect > max_sect)
477                 req_sect = max_sect;
478
479         /*
480         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
481                  sect_in, mdev->rs_in_flight, want, correction,
482                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
483         */
484
485         return req_sect;
486 }
487
488 int w_make_resync_request(struct drbd_conf *mdev,
489                 struct drbd_work *w, int cancel)
490 {
491         unsigned long bit;
492         sector_t sector;
493         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
494         int max_segment_size;
495         int number, rollback_i, size, pe, mx;
496         int align, queued, sndbuf;
497         int i = 0;
498
499         if (unlikely(cancel))
500                 return 1;
501
502         if (unlikely(mdev->state.conn < C_CONNECTED)) {
503                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
504                 return 0;
505         }
506
507         if (mdev->state.conn != C_SYNC_TARGET)
508                 dev_err(DEV, "%s in w_make_resync_request\n",
509                         drbd_conn_str(mdev->state.conn));
510
511         if (mdev->rs_total == 0) {
512                 /* empty resync? */
513                 drbd_resync_finished(mdev);
514                 return 1;
515         }
516
517         if (!get_ldev(mdev)) {
518                 /* Since we only need to access mdev->rsync a
519                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
520                    to continue resync with a broken disk makes no sense at
521                    all */
522                 dev_err(DEV, "Disk broke down during resync!\n");
523                 mdev->resync_work.cb = w_resync_inactive;
524                 return 1;
525         }
526
527         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
528          * if it should be necessary */
529         max_segment_size =
530                 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
531                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
532
533         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
534                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
535                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
536         } else {
537                 mdev->c_sync_rate = mdev->sync_conf.rate;
538                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
539         }
540
541         /* Throttle resync on lower level disk activity, which may also be
542          * caused by application IO on Primary/SyncTarget.
543          * Keep this after the call to drbd_rs_controller, as that assumes
544          * to be called as precisely as possible every SLEEP_TIME,
545          * and would be confused otherwise. */
546         if (drbd_rs_should_slow_down(mdev))
547                 goto requeue;
548
549         mutex_lock(&mdev->data.mutex);
550         if (mdev->data.socket)
551                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
552         else
553                 mx = 1;
554         mutex_unlock(&mdev->data.mutex);
555
556         /* For resync rates >160MB/sec, allow more pending RS requests */
557         if (number > mx)
558                 mx = number;
559
560         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
561         pe = atomic_read(&mdev->rs_pending_cnt);
562         if ((pe + number) > mx) {
563                 number = mx - pe;
564         }
565
566         for (i = 0; i < number; i++) {
567                 /* Stop generating RS requests, when half of the send buffer is filled */
568                 mutex_lock(&mdev->data.mutex);
569                 if (mdev->data.socket) {
570                         queued = mdev->data.socket->sk->sk_wmem_queued;
571                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
572                 } else {
573                         queued = 1;
574                         sndbuf = 0;
575                 }
576                 mutex_unlock(&mdev->data.mutex);
577                 if (queued > sndbuf / 2)
578                         goto requeue;
579
580 next_sector:
581                 size = BM_BLOCK_SIZE;
582                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
583
584                 if (bit == -1UL) {
585                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
586                         mdev->resync_work.cb = w_resync_inactive;
587                         put_ldev(mdev);
588                         return 1;
589                 }
590
591                 sector = BM_BIT_TO_SECT(bit);
592
593                 if (drbd_try_rs_begin_io(mdev, sector)) {
594                         mdev->bm_resync_fo = bit;
595                         goto requeue;
596                 }
597                 mdev->bm_resync_fo = bit + 1;
598
599                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
600                         drbd_rs_complete_io(mdev, sector);
601                         goto next_sector;
602                 }
603
604 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
605                 /* try to find some adjacent bits.
606                  * we stop if we have already the maximum req size.
607                  *
608                  * Additionally always align bigger requests, in order to
609                  * be prepared for all stripe sizes of software RAIDs.
610                  */
611                 align = 1;
612                 rollback_i = i;
613                 for (;;) {
614                         if (size + BM_BLOCK_SIZE > max_segment_size)
615                                 break;
616
617                         /* Be always aligned */
618                         if (sector & ((1<<(align+3))-1))
619                                 break;
620
621                         /* do not cross extent boundaries */
622                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
623                                 break;
624                         /* now, is it actually dirty, after all?
625                          * caution, drbd_bm_test_bit is tri-state for some
626                          * obscure reason; ( b == 0 ) would get the out-of-band
627                          * only accidentally right because of the "oddly sized"
628                          * adjustment below */
629                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
630                                 break;
631                         bit++;
632                         size += BM_BLOCK_SIZE;
633                         if ((BM_BLOCK_SIZE << align) <= size)
634                                 align++;
635                         i++;
636                 }
637                 /* if we merged some,
638                  * reset the offset to start the next drbd_bm_find_next from */
639                 if (size > BM_BLOCK_SIZE)
640                         mdev->bm_resync_fo = bit + 1;
641 #endif
642
643                 /* adjust very last sectors, in case we are oddly sized */
644                 if (sector + (size>>9) > capacity)
645                         size = (capacity-sector)<<9;
646                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
647                         switch (read_for_csum(mdev, sector, size)) {
648                         case -EIO: /* Disk failure */
649                                 put_ldev(mdev);
650                                 return 0;
651                         case -EAGAIN: /* allocation failed, or ldev busy */
652                                 drbd_rs_complete_io(mdev, sector);
653                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
654                                 i = rollback_i;
655                                 goto requeue;
656                         case 0:
657                                 /* everything ok */
658                                 break;
659                         default:
660                                 BUG();
661                         }
662                 } else {
663                         inc_rs_pending(mdev);
664                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
665                                                sector, size, ID_SYNCER)) {
666                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
667                                 dec_rs_pending(mdev);
668                                 put_ldev(mdev);
669                                 return 0;
670                         }
671                 }
672         }
673
674         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
675                 /* last syncer _request_ was sent,
676                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
677                  * next sync group will resume), as soon as we receive the last
678                  * resync data block, and the last bit is cleared.
679                  * until then resync "work" is "inactive" ...
680                  */
681                 mdev->resync_work.cb = w_resync_inactive;
682                 put_ldev(mdev);
683                 return 1;
684         }
685
686  requeue:
687         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
688         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
689         put_ldev(mdev);
690         return 1;
691 }
692
693 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
694 {
695         int number, i, size;
696         sector_t sector;
697         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
698
699         if (unlikely(cancel))
700                 return 1;
701
702         if (unlikely(mdev->state.conn < C_CONNECTED)) {
703                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
704                 return 0;
705         }
706
707         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
708         if (atomic_read(&mdev->rs_pending_cnt) > number)
709                 goto requeue;
710
711         number -= atomic_read(&mdev->rs_pending_cnt);
712
713         sector = mdev->ov_position;
714         for (i = 0; i < number; i++) {
715                 if (sector >= capacity) {
716                         mdev->resync_work.cb = w_resync_inactive;
717                         return 1;
718                 }
719
720                 size = BM_BLOCK_SIZE;
721
722                 if (drbd_try_rs_begin_io(mdev, sector)) {
723                         mdev->ov_position = sector;
724                         goto requeue;
725                 }
726
727                 if (sector + (size>>9) > capacity)
728                         size = (capacity-sector)<<9;
729
730                 inc_rs_pending(mdev);
731                 if (!drbd_send_ov_request(mdev, sector, size)) {
732                         dec_rs_pending(mdev);
733                         return 0;
734                 }
735                 sector += BM_SECT_PER_BIT;
736         }
737         mdev->ov_position = sector;
738
739  requeue:
740         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
741         return 1;
742 }
743
744
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747         kfree(w);
748         ov_oos_print(mdev);
749         drbd_resync_finished(mdev);
750
751         return 1;
752 }
753
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756         kfree(w);
757
758         drbd_resync_finished(mdev);
759
760         return 1;
761 }
762
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765         clear_bit(GOT_PING_ACK, &mdev->flags);
766         request_ping(mdev);
767         wait_event(mdev->misc_wait,
768                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773         unsigned long db, dt, dbdt;
774         unsigned long n_oos;
775         union drbd_state os, ns;
776         struct drbd_work *w;
777         char *khelper_cmd = NULL;
778
779         /* Remove all elements from the resync LRU. Since future actions
780          * might set bits in the (main) bitmap, then the entries in the
781          * resync LRU would be wrong. */
782         if (drbd_rs_del_all(mdev)) {
783                 /* In case this is not possible now, most probably because
784                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
785                  * queue (or even the read operations for those packets
786                  * is not finished by now).   Retry in 100ms. */
787
788                 drbd_kick_lo(mdev);
789                 __set_current_state(TASK_INTERRUPTIBLE);
790                 schedule_timeout(HZ / 10);
791                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792                 if (w) {
793                         w->cb = w_resync_finished;
794                         drbd_queue_work(&mdev->data.work, w);
795                         return 1;
796                 }
797                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798         }
799
800         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801         if (dt <= 0)
802                 dt = 1;
803         db = mdev->rs_total;
804         dbdt = Bit2KB(db/dt);
805         mdev->rs_paused /= HZ;
806
807         if (!get_ldev(mdev))
808                 goto out;
809
810         ping_peer(mdev);
811
812         spin_lock_irq(&mdev->req_lock);
813         os = mdev->state;
814
815         /* This protects us against multiple calls (that can happen in the presence
816            of application IO), and against connectivity loss just before we arrive here. */
817         if (os.conn <= C_CONNECTED)
818                 goto out_unlock;
819
820         ns = os;
821         ns.conn = C_CONNECTED;
822
823         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
824              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
825              "Online verify " : "Resync",
826              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828         n_oos = drbd_bm_total_weight(mdev);
829
830         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831                 if (n_oos) {
832                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833                               n_oos, Bit2KB(1));
834                         khelper_cmd = "out-of-sync";
835                 }
836         } else {
837                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840                         khelper_cmd = "after-resync-target";
841
842                 if (mdev->csums_tfm && mdev->rs_total) {
843                         const unsigned long s = mdev->rs_same_csum;
844                         const unsigned long t = mdev->rs_total;
845                         const int ratio =
846                                 (t == 0)     ? 0 :
847                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
848                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
849                              "transferred %luK total %luK\n",
850                              ratio,
851                              Bit2KB(mdev->rs_same_csum),
852                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total));
854                 }
855         }
856
857         if (mdev->rs_failed) {
858                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861                         ns.disk = D_INCONSISTENT;
862                         ns.pdsk = D_UP_TO_DATE;
863                 } else {
864                         ns.disk = D_UP_TO_DATE;
865                         ns.pdsk = D_INCONSISTENT;
866                 }
867         } else {
868                 ns.disk = D_UP_TO_DATE;
869                 ns.pdsk = D_UP_TO_DATE;
870
871                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872                         if (mdev->p_uuid) {
873                                 int i;
874                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878                         } else {
879                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880                         }
881                 }
882
883                 drbd_uuid_set_bm(mdev, 0UL);
884
885                 if (mdev->p_uuid) {
886                         /* Now the two UUID sets are equal, update what we
887                          * know of the peer. */
888                         int i;
889                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
890                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
891                 }
892         }
893
894         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
895 out_unlock:
896         spin_unlock_irq(&mdev->req_lock);
897         put_ldev(mdev);
898 out:
899         mdev->rs_total  = 0;
900         mdev->rs_failed = 0;
901         mdev->rs_paused = 0;
902         mdev->ov_start_sector = 0;
903
904         drbd_md_sync(mdev);
905
906         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
907                 dev_info(DEV, "Writing the whole bitmap\n");
908                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
909         }
910
911         if (khelper_cmd)
912                 drbd_khelper(mdev, khelper_cmd);
913
914         return 1;
915 }
916
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
919 {
920         if (drbd_ee_has_active_page(e)) {
921                 /* This might happen if sendpage() has not finished */
922                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
923                 atomic_add(i, &mdev->pp_in_use_by_net);
924                 atomic_sub(i, &mdev->pp_in_use);
925                 spin_lock_irq(&mdev->req_lock);
926                 list_add_tail(&e->w.list, &mdev->net_ee);
927                 spin_unlock_irq(&mdev->req_lock);
928                 wake_up(&drbd_pp_wait);
929         } else
930                 drbd_free_ee(mdev, e);
931 }
932
933 /**
934  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935  * @mdev:       DRBD device.
936  * @w:          work object.
937  * @cancel:     The connection will be closed anyways
938  */
939 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
940 {
941         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
942         int ok;
943
944         if (unlikely(cancel)) {
945                 drbd_free_ee(mdev, e);
946                 dec_unacked(mdev);
947                 return 1;
948         }
949
950         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
952         } else {
953                 if (__ratelimit(&drbd_ratelimit_state))
954                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
955                             (unsigned long long)e->sector);
956
957                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
958         }
959
960         dec_unacked(mdev);
961
962         move_to_net_ee_or_free(mdev, e);
963
964         if (unlikely(!ok))
965                 dev_err(DEV, "drbd_send_block() failed\n");
966         return ok;
967 }
968
969 /**
970  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
971  * @mdev:       DRBD device.
972  * @w:          work object.
973  * @cancel:     The connection will be closed anyways
974  */
975 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978         int ok;
979
980         if (unlikely(cancel)) {
981                 drbd_free_ee(mdev, e);
982                 dec_unacked(mdev);
983                 return 1;
984         }
985
986         if (get_ldev_if_state(mdev, D_FAILED)) {
987                 drbd_rs_complete_io(mdev, e->sector);
988                 put_ldev(mdev);
989         }
990
991         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
992                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
993                         inc_rs_pending(mdev);
994                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
995                 } else {
996                         if (__ratelimit(&drbd_ratelimit_state))
997                                 dev_err(DEV, "Not sending RSDataReply, "
998                                     "partner DISKLESS!\n");
999                         ok = 1;
1000                 }
1001         } else {
1002                 if (__ratelimit(&drbd_ratelimit_state))
1003                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1004                             (unsigned long long)e->sector);
1005
1006                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1007
1008                 /* update resync data with failure */
1009                 drbd_rs_failed_io(mdev, e->sector, e->size);
1010         }
1011
1012         dec_unacked(mdev);
1013
1014         move_to_net_ee_or_free(mdev, e);
1015
1016         if (unlikely(!ok))
1017                 dev_err(DEV, "drbd_send_block() failed\n");
1018         return ok;
1019 }
1020
1021 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1022 {
1023         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1024         struct digest_info *di;
1025         int digest_size;
1026         void *digest = NULL;
1027         int ok, eq = 0;
1028
1029         if (unlikely(cancel)) {
1030                 drbd_free_ee(mdev, e);
1031                 dec_unacked(mdev);
1032                 return 1;
1033         }
1034
1035         if (get_ldev(mdev)) {
1036                 drbd_rs_complete_io(mdev, e->sector);
1037                 put_ldev(mdev);
1038         }
1039
1040         di = e->digest;
1041
1042         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1043                 /* quick hack to try to avoid a race against reconfiguration.
1044                  * a real fix would be much more involved,
1045                  * introducing more locking mechanisms */
1046                 if (mdev->csums_tfm) {
1047                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1048                         D_ASSERT(digest_size == di->digest_size);
1049                         digest = kmalloc(digest_size, GFP_NOIO);
1050                 }
1051                 if (digest) {
1052                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1053                         eq = !memcmp(digest, di->digest, digest_size);
1054                         kfree(digest);
1055                 }
1056
1057                 if (eq) {
1058                         drbd_set_in_sync(mdev, e->sector, e->size);
1059                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1060                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1061                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1062                 } else {
1063                         inc_rs_pending(mdev);
1064                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1065                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1066                         kfree(di);
1067                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1068                 }
1069         } else {
1070                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1071                 if (__ratelimit(&drbd_ratelimit_state))
1072                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1073         }
1074
1075         dec_unacked(mdev);
1076         move_to_net_ee_or_free(mdev, e);
1077
1078         if (unlikely(!ok))
1079                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1080         return ok;
1081 }
1082
1083 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1084 {
1085         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1086         int digest_size;
1087         void *digest;
1088         int ok = 1;
1089
1090         if (unlikely(cancel))
1091                 goto out;
1092
1093         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1094                 goto out;
1095
1096         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1097         /* FIXME if this allocation fails, online verify will not terminate! */
1098         digest = kmalloc(digest_size, GFP_NOIO);
1099         if (digest) {
1100                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1101                 inc_rs_pending(mdev);
1102                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1103                                              digest, digest_size, P_OV_REPLY);
1104                 if (!ok)
1105                         dec_rs_pending(mdev);
1106                 kfree(digest);
1107         }
1108
1109 out:
1110         drbd_free_ee(mdev, e);
1111
1112         dec_unacked(mdev);
1113
1114         return ok;
1115 }
1116
1117 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1118 {
1119         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1120                 mdev->ov_last_oos_size += size>>9;
1121         } else {
1122                 mdev->ov_last_oos_start = sector;
1123                 mdev->ov_last_oos_size = size>>9;
1124         }
1125         drbd_set_out_of_sync(mdev, sector, size);
1126         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1127 }
1128
1129 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1130 {
1131         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1132         struct digest_info *di;
1133         int digest_size;
1134         void *digest;
1135         int ok, eq = 0;
1136
1137         if (unlikely(cancel)) {
1138                 drbd_free_ee(mdev, e);
1139                 dec_unacked(mdev);
1140                 return 1;
1141         }
1142
1143         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1144          * the resync lru has been cleaned up already */
1145         if (get_ldev(mdev)) {
1146                 drbd_rs_complete_io(mdev, e->sector);
1147                 put_ldev(mdev);
1148         }
1149
1150         di = e->digest;
1151
1152         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1153                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1154                 digest = kmalloc(digest_size, GFP_NOIO);
1155                 if (digest) {
1156                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1157
1158                         D_ASSERT(digest_size == di->digest_size);
1159                         eq = !memcmp(digest, di->digest, digest_size);
1160                         kfree(digest);
1161                 }
1162         } else {
1163                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1164                 if (__ratelimit(&drbd_ratelimit_state))
1165                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1166         }
1167
1168         dec_unacked(mdev);
1169         if (!eq)
1170                 drbd_ov_oos_found(mdev, e->sector, e->size);
1171         else
1172                 ov_oos_print(mdev);
1173
1174         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1175                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1176
1177         drbd_free_ee(mdev, e);
1178
1179         if (--mdev->ov_left == 0) {
1180                 ov_oos_print(mdev);
1181                 drbd_resync_finished(mdev);
1182         }
1183
1184         return ok;
1185 }
1186
1187 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1188 {
1189         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1190         complete(&b->done);
1191         return 1;
1192 }
1193
1194 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195 {
1196         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1197         struct p_barrier *p = &mdev->data.sbuf.barrier;
1198         int ok = 1;
1199
1200         /* really avoid racing with tl_clear.  w.cb may have been referenced
1201          * just before it was reassigned and re-queued, so double check that.
1202          * actually, this race was harmless, since we only try to send the
1203          * barrier packet here, and otherwise do nothing with the object.
1204          * but compare with the head of w_clear_epoch */
1205         spin_lock_irq(&mdev->req_lock);
1206         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207                 cancel = 1;
1208         spin_unlock_irq(&mdev->req_lock);
1209         if (cancel)
1210                 return 1;
1211
1212         if (!drbd_get_data_sock(mdev))
1213                 return 0;
1214         p->barrier = b->br_number;
1215         /* inc_ap_pending was done where this was queued.
1216          * dec_ap_pending will be done in got_BarrierAck
1217          * or (on connection loss) in w_clear_epoch.  */
1218         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1219                                 (struct p_header80 *)p, sizeof(*p), 0);
1220         drbd_put_data_sock(mdev);
1221
1222         return ok;
1223 }
1224
1225 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1226 {
1227         if (cancel)
1228                 return 1;
1229         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1230 }
1231
1232 /**
1233  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1234  * @mdev:       DRBD device.
1235  * @w:          work object.
1236  * @cancel:     The connection will be closed anyways
1237  */
1238 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239 {
1240         struct drbd_request *req = container_of(w, struct drbd_request, w);
1241         int ok;
1242
1243         if (unlikely(cancel)) {
1244                 req_mod(req, send_canceled);
1245                 return 1;
1246         }
1247
1248         ok = drbd_send_dblock(mdev, req);
1249         req_mod(req, ok ? handed_over_to_network : send_failed);
1250
1251         return ok;
1252 }
1253
1254 /**
1255  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1256  * @mdev:       DRBD device.
1257  * @w:          work object.
1258  * @cancel:     The connection will be closed anyways
1259  */
1260 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1261 {
1262         struct drbd_request *req = container_of(w, struct drbd_request, w);
1263         int ok;
1264
1265         if (unlikely(cancel)) {
1266                 req_mod(req, send_canceled);
1267                 return 1;
1268         }
1269
1270         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1271                                 (unsigned long)req);
1272
1273         if (!ok) {
1274                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1275                  * so this is probably redundant */
1276                 if (mdev->state.conn >= C_CONNECTED)
1277                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1278         }
1279         req_mod(req, ok ? handed_over_to_network : send_failed);
1280
1281         return ok;
1282 }
1283
1284 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285 {
1286         struct drbd_request *req = container_of(w, struct drbd_request, w);
1287
1288         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1289                 drbd_al_begin_io(mdev, req->sector);
1290         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1291            theoretically. Practically it can not deadlock, since this is
1292            only used when unfreezing IOs. All the extents of the requests
1293            that made it into the TL are already active */
1294
1295         drbd_req_make_private_bio(req, req->master_bio);
1296         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1297         generic_make_request(req->private_bio);
1298
1299         return 1;
1300 }
1301
1302 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1303 {
1304         struct drbd_conf *odev = mdev;
1305
1306         while (1) {
1307                 if (odev->sync_conf.after == -1)
1308                         return 1;
1309                 odev = minor_to_mdev(odev->sync_conf.after);
1310                 ERR_IF(!odev) return 1;
1311                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1312                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1313                     odev->state.aftr_isp || odev->state.peer_isp ||
1314                     odev->state.user_isp)
1315                         return 0;
1316         }
1317 }
1318
1319 /**
1320  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1321  * @mdev:       DRBD device.
1322  *
1323  * Called from process context only (admin command and after_state_ch).
1324  */
1325 static int _drbd_pause_after(struct drbd_conf *mdev)
1326 {
1327         struct drbd_conf *odev;
1328         int i, rv = 0;
1329
1330         for (i = 0; i < minor_count; i++) {
1331                 odev = minor_to_mdev(i);
1332                 if (!odev)
1333                         continue;
1334                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1335                         continue;
1336                 if (!_drbd_may_sync_now(odev))
1337                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1338                                != SS_NOTHING_TO_DO);
1339         }
1340
1341         return rv;
1342 }
1343
1344 /**
1345  * _drbd_resume_next() - Resume resync on all devices that may resync now
1346  * @mdev:       DRBD device.
1347  *
1348  * Called from process context only (admin command and worker).
1349  */
1350 static int _drbd_resume_next(struct drbd_conf *mdev)
1351 {
1352         struct drbd_conf *odev;
1353         int i, rv = 0;
1354
1355         for (i = 0; i < minor_count; i++) {
1356                 odev = minor_to_mdev(i);
1357                 if (!odev)
1358                         continue;
1359                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1360                         continue;
1361                 if (odev->state.aftr_isp) {
1362                         if (_drbd_may_sync_now(odev))
1363                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1364                                                         CS_HARD, NULL)
1365                                        != SS_NOTHING_TO_DO) ;
1366                 }
1367         }
1368         return rv;
1369 }
1370
1371 void resume_next_sg(struct drbd_conf *mdev)
1372 {
1373         write_lock_irq(&global_state_lock);
1374         _drbd_resume_next(mdev);
1375         write_unlock_irq(&global_state_lock);
1376 }
1377
1378 void suspend_other_sg(struct drbd_conf *mdev)
1379 {
1380         write_lock_irq(&global_state_lock);
1381         _drbd_pause_after(mdev);
1382         write_unlock_irq(&global_state_lock);
1383 }
1384
1385 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1386 {
1387         struct drbd_conf *odev;
1388
1389         if (o_minor == -1)
1390                 return NO_ERROR;
1391         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1392                 return ERR_SYNC_AFTER;
1393
1394         /* check for loops */
1395         odev = minor_to_mdev(o_minor);
1396         while (1) {
1397                 if (odev == mdev)
1398                         return ERR_SYNC_AFTER_CYCLE;
1399
1400                 /* dependency chain ends here, no cycles. */
1401                 if (odev->sync_conf.after == -1)
1402                         return NO_ERROR;
1403
1404                 /* follow the dependency chain */
1405                 odev = minor_to_mdev(odev->sync_conf.after);
1406         }
1407 }
1408
1409 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1410 {
1411         int changes;
1412         int retcode;
1413
1414         write_lock_irq(&global_state_lock);
1415         retcode = sync_after_error(mdev, na);
1416         if (retcode == NO_ERROR) {
1417                 mdev->sync_conf.after = na;
1418                 do {
1419                         changes  = _drbd_pause_after(mdev);
1420                         changes |= _drbd_resume_next(mdev);
1421                 } while (changes);
1422         }
1423         write_unlock_irq(&global_state_lock);
1424         return retcode;
1425 }
1426
1427 /**
1428  * drbd_start_resync() - Start the resync process
1429  * @mdev:       DRBD device.
1430  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1431  *
1432  * This function might bring you directly into one of the
1433  * C_PAUSED_SYNC_* states.
1434  */
1435 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1436 {
1437         union drbd_state ns;
1438         int r;
1439
1440         if (mdev->state.conn >= C_SYNC_SOURCE) {
1441                 dev_err(DEV, "Resync already running!\n");
1442                 return;
1443         }
1444
1445         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1446         drbd_rs_cancel_all(mdev);
1447
1448         if (side == C_SYNC_TARGET) {
1449                 /* Since application IO was locked out during C_WF_BITMAP_T and
1450                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1451                    we check that we might make the data inconsistent. */
1452                 r = drbd_khelper(mdev, "before-resync-target");
1453                 r = (r >> 8) & 0xff;
1454                 if (r > 0) {
1455                         dev_info(DEV, "before-resync-target handler returned %d, "
1456                              "dropping connection.\n", r);
1457                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1458                         return;
1459                 }
1460         }
1461
1462         drbd_state_lock(mdev);
1463
1464         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1465                 drbd_state_unlock(mdev);
1466                 return;
1467         }
1468
1469         if (side == C_SYNC_TARGET) {
1470                 mdev->bm_resync_fo = 0;
1471         } else /* side == C_SYNC_SOURCE */ {
1472                 u64 uuid;
1473
1474                 get_random_bytes(&uuid, sizeof(u64));
1475                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1476                 drbd_send_sync_uuid(mdev, uuid);
1477
1478                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1479         }
1480
1481         write_lock_irq(&global_state_lock);
1482         ns = mdev->state;
1483
1484         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1485
1486         ns.conn = side;
1487
1488         if (side == C_SYNC_TARGET)
1489                 ns.disk = D_INCONSISTENT;
1490         else /* side == C_SYNC_SOURCE */
1491                 ns.pdsk = D_INCONSISTENT;
1492
1493         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1494         ns = mdev->state;
1495
1496         if (ns.conn < C_CONNECTED)
1497                 r = SS_UNKNOWN_ERROR;
1498
1499         if (r == SS_SUCCESS) {
1500                 unsigned long tw = drbd_bm_total_weight(mdev);
1501                 unsigned long now = jiffies;
1502                 int i;
1503
1504                 mdev->rs_failed    = 0;
1505                 mdev->rs_paused    = 0;
1506                 mdev->rs_same_csum = 0;
1507                 mdev->rs_last_events = 0;
1508                 mdev->rs_last_sect_ev = 0;
1509                 mdev->rs_total     = tw;
1510                 mdev->rs_start     = now;
1511                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1512                         mdev->rs_mark_left[i] = tw;
1513                         mdev->rs_mark_time[i] = now;
1514                 }
1515                 _drbd_pause_after(mdev);
1516         }
1517         write_unlock_irq(&global_state_lock);
1518         put_ldev(mdev);
1519
1520         if (r == SS_SUCCESS) {
1521                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1522                      drbd_conn_str(ns.conn),
1523                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1524                      (unsigned long) mdev->rs_total);
1525
1526                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1527                         /* This still has a race (about when exactly the peers
1528                          * detect connection loss) that can lead to a full sync
1529                          * on next handshake. In 8.3.9 we fixed this with explicit
1530                          * resync-finished notifications, but the fix
1531                          * introduces a protocol change.  Sleeping for some
1532                          * time longer than the ping interval + timeout on the
1533                          * SyncSource, to give the SyncTarget the chance to
1534                          * detect connection loss, then waiting for a ping
1535                          * response (implicit in drbd_resync_finished) reduces
1536                          * the race considerably, but does not solve it. */
1537                         if (side == C_SYNC_SOURCE)
1538                                 schedule_timeout_interruptible(
1539                                         mdev->net_conf->ping_int * HZ +
1540                                         mdev->net_conf->ping_timeo*HZ/9);
1541                         drbd_resync_finished(mdev);
1542                 }
1543
1544                 atomic_set(&mdev->rs_sect_in, 0);
1545                 atomic_set(&mdev->rs_sect_ev, 0);
1546                 mdev->rs_in_flight = 0;
1547                 mdev->rs_planed = 0;
1548                 spin_lock(&mdev->peer_seq_lock);
1549                 fifo_set(&mdev->rs_plan_s, 0);
1550                 spin_unlock(&mdev->peer_seq_lock);
1551                 /* ns.conn may already be != mdev->state.conn,
1552                  * we may have been paused in between, or become paused until
1553                  * the timer triggers.
1554                  * No matter, that is handled in resync_timer_fn() */
1555                 if (ns.conn == C_SYNC_TARGET)
1556                         mod_timer(&mdev->resync_timer, jiffies);
1557
1558                 drbd_md_sync(mdev);
1559         }
1560         drbd_state_unlock(mdev);
1561 }
1562
1563 int drbd_worker(struct drbd_thread *thi)
1564 {
1565         struct drbd_conf *mdev = thi->mdev;
1566         struct drbd_work *w = NULL;
1567         LIST_HEAD(work_list);
1568         int intr = 0, i;
1569
1570         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1571
1572         while (get_t_state(thi) == Running) {
1573                 drbd_thread_current_set_cpu(mdev);
1574
1575                 if (down_trylock(&mdev->data.work.s)) {
1576                         mutex_lock(&mdev->data.mutex);
1577                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1578                                 drbd_tcp_uncork(mdev->data.socket);
1579                         mutex_unlock(&mdev->data.mutex);
1580
1581                         intr = down_interruptible(&mdev->data.work.s);
1582
1583                         mutex_lock(&mdev->data.mutex);
1584                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1585                                 drbd_tcp_cork(mdev->data.socket);
1586                         mutex_unlock(&mdev->data.mutex);
1587                 }
1588
1589                 if (intr) {
1590                         D_ASSERT(intr == -EINTR);
1591                         flush_signals(current);
1592                         ERR_IF (get_t_state(thi) == Running)
1593                                 continue;
1594                         break;
1595                 }
1596
1597                 if (get_t_state(thi) != Running)
1598                         break;
1599                 /* With this break, we have done a down() but not consumed
1600                    the entry from the list. The cleanup code takes care of
1601                    this...   */
1602
1603                 w = NULL;
1604                 spin_lock_irq(&mdev->data.work.q_lock);
1605                 ERR_IF(list_empty(&mdev->data.work.q)) {
1606                         /* something terribly wrong in our logic.
1607                          * we were able to down() the semaphore,
1608                          * but the list is empty... doh.
1609                          *
1610                          * what is the best thing to do now?
1611                          * try again from scratch, restarting the receiver,
1612                          * asender, whatnot? could break even more ugly,
1613                          * e.g. when we are primary, but no good local data.
1614                          *
1615                          * I'll try to get away just starting over this loop.
1616                          */
1617                         spin_unlock_irq(&mdev->data.work.q_lock);
1618                         continue;
1619                 }
1620                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1621                 list_del_init(&w->list);
1622                 spin_unlock_irq(&mdev->data.work.q_lock);
1623
1624                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1625                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1626                         if (mdev->state.conn >= C_CONNECTED)
1627                                 drbd_force_state(mdev,
1628                                                 NS(conn, C_NETWORK_FAILURE));
1629                 }
1630         }
1631         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1632         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1633
1634         spin_lock_irq(&mdev->data.work.q_lock);
1635         i = 0;
1636         while (!list_empty(&mdev->data.work.q)) {
1637                 list_splice_init(&mdev->data.work.q, &work_list);
1638                 spin_unlock_irq(&mdev->data.work.q_lock);
1639
1640                 while (!list_empty(&work_list)) {
1641                         w = list_entry(work_list.next, struct drbd_work, list);
1642                         list_del_init(&w->list);
1643                         w->cb(mdev, w, 1);
1644                         i++; /* dead debugging code */
1645                 }
1646
1647                 spin_lock_irq(&mdev->data.work.q_lock);
1648         }
1649         sema_init(&mdev->data.work.s, 0);
1650         /* DANGEROUS race: if someone did queue his work within the spinlock,
1651          * but up() ed outside the spinlock, we could get an up() on the
1652          * semaphore without corresponding list entry.
1653          * So don't do that.
1654          */
1655         spin_unlock_irq(&mdev->data.work.q_lock);
1656
1657         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1658         /* _drbd_set_state only uses stop_nowait.
1659          * wait here for the Exiting receiver. */
1660         drbd_thread_stop(&mdev->receiver);
1661         drbd_mdev_cleanup(mdev);
1662
1663         dev_info(DEV, "worker terminated\n");
1664
1665         clear_bit(DEVICE_DYING, &mdev->flags);
1666         clear_bit(CONFIG_PENDING, &mdev->flags);
1667         wake_up(&mdev->state_wait);
1668
1669         return 0;
1670 }