25c7a73c50621734cf22fda0654e93825e08e0ca
[linux-2.6.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77                            union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84               "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
89 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
90
91 #include <linux/moduleparam.h>
92 /* allow_open_on_secondary */
93 MODULE_PARM_DESC(allow_oos, "DONT USE!");
94 /* thanks to these macros, if compiled into the kernel (not-module),
95  * this becomes the boot parameter drbd.minor_count */
96 module_param(minor_count, uint, 0444);
97 module_param(disable_sendpage, bool, 0644);
98 module_param(allow_oos, bool, 0);
99 module_param(cn_idx, uint, 0444);
100 module_param(proc_details, int, 0644);
101
102 #ifdef CONFIG_DRBD_FAULT_INJECTION
103 int enable_faults;
104 int fault_rate;
105 static int fault_count;
106 int fault_devs;
107 /* bitmap of enabled faults */
108 module_param(enable_faults, int, 0664);
109 /* fault rate % value - applies to all enabled faults */
110 module_param(fault_rate, int, 0664);
111 /* count of faults inserted */
112 module_param(fault_count, int, 0664);
113 /* bitmap of devices to insert faults on */
114 module_param(fault_devs, int, 0644);
115 #endif
116
117 /* module parameter, defined */
118 unsigned int minor_count = 32;
119 int disable_sendpage;
120 int allow_oos;
121 unsigned int cn_idx = CN_IDX_DRBD;
122 int proc_details;       /* Detail level in proc drbd*/
123
124 /* Module parameter for setting the user mode helper program
125  * to run. Default is /sbin/drbdadm */
126 char usermode_helper[80] = "/sbin/drbdadm";
127
128 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
129
130 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
131  * as member "struct gendisk *vdisk;"
132  */
133 struct drbd_conf **minor_table;
134
135 struct kmem_cache *drbd_request_cache;
136 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
137 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
138 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
139 mempool_t *drbd_request_mempool;
140 mempool_t *drbd_ee_mempool;
141
142 /* I do not use a standard mempool, because:
143    1) I want to hand out the pre-allocated objects first.
144    2) I want to be able to interrupt sleeping allocation with a signal.
145    Note: This is a single linked list, the next pointer is the private
146          member of struct page.
147  */
148 struct page *drbd_pp_pool;
149 spinlock_t   drbd_pp_lock;
150 int          drbd_pp_vacant;
151 wait_queue_head_t drbd_pp_wait;
152
153 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
154
155 static const struct block_device_operations drbd_ops = {
156         .owner =   THIS_MODULE,
157         .open =    drbd_open,
158         .release = drbd_release,
159 };
160
161 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
162
163 #ifdef __CHECKER__
164 /* When checking with sparse, and this is an inline function, sparse will
165    give tons of false positives. When this is a real functions sparse works.
166  */
167 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
168 {
169         int io_allowed;
170
171         atomic_inc(&mdev->local_cnt);
172         io_allowed = (mdev->state.disk >= mins);
173         if (!io_allowed) {
174                 if (atomic_dec_and_test(&mdev->local_cnt))
175                         wake_up(&mdev->misc_wait);
176         }
177         return io_allowed;
178 }
179
180 #endif
181
182 /**
183  * DOC: The transfer log
184  *
185  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
186  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
187  * of the list. There is always at least one &struct drbd_tl_epoch object.
188  *
189  * Each &struct drbd_tl_epoch has a circular double linked list of requests
190  * attached.
191  */
192 static int tl_init(struct drbd_conf *mdev)
193 {
194         struct drbd_tl_epoch *b;
195
196         /* during device minor initialization, we may well use GFP_KERNEL */
197         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
198         if (!b)
199                 return 0;
200         INIT_LIST_HEAD(&b->requests);
201         INIT_LIST_HEAD(&b->w.list);
202         b->next = NULL;
203         b->br_number = 4711;
204         b->n_writes = 0;
205         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
206
207         mdev->oldest_tle = b;
208         mdev->newest_tle = b;
209         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
210
211         mdev->tl_hash = NULL;
212         mdev->tl_hash_s = 0;
213
214         return 1;
215 }
216
217 static void tl_cleanup(struct drbd_conf *mdev)
218 {
219         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
220         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
221         kfree(mdev->oldest_tle);
222         mdev->oldest_tle = NULL;
223         kfree(mdev->unused_spare_tle);
224         mdev->unused_spare_tle = NULL;
225         kfree(mdev->tl_hash);
226         mdev->tl_hash = NULL;
227         mdev->tl_hash_s = 0;
228 }
229
230 /**
231  * _tl_add_barrier() - Adds a barrier to the transfer log
232  * @mdev:       DRBD device.
233  * @new:        Barrier to be added before the current head of the TL.
234  *
235  * The caller must hold the req_lock.
236  */
237 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
238 {
239         struct drbd_tl_epoch *newest_before;
240
241         INIT_LIST_HEAD(&new->requests);
242         INIT_LIST_HEAD(&new->w.list);
243         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
244         new->next = NULL;
245         new->n_writes = 0;
246
247         newest_before = mdev->newest_tle;
248         /* never send a barrier number == 0, because that is special-cased
249          * when using TCQ for our write ordering code */
250         new->br_number = (newest_before->br_number+1) ?: 1;
251         if (mdev->newest_tle != new) {
252                 mdev->newest_tle->next = new;
253                 mdev->newest_tle = new;
254         }
255 }
256
257 /**
258  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259  * @mdev:       DRBD device.
260  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
261  * @set_size:   Expected number of requests before that barrier.
262  *
263  * In case the passed barrier_nr or set_size does not match the oldest
264  * &struct drbd_tl_epoch objects this function will cause a termination
265  * of the connection.
266  */
267 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
268                        unsigned int set_size)
269 {
270         struct drbd_tl_epoch *b, *nob; /* next old barrier */
271         struct list_head *le, *tle;
272         struct drbd_request *r;
273
274         spin_lock_irq(&mdev->req_lock);
275
276         b = mdev->oldest_tle;
277
278         /* first some paranoia code */
279         if (b == NULL) {
280                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
281                         barrier_nr);
282                 goto bail;
283         }
284         if (b->br_number != barrier_nr) {
285                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
286                         barrier_nr, b->br_number);
287                 goto bail;
288         }
289         if (b->n_writes != set_size) {
290                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
291                         barrier_nr, set_size, b->n_writes);
292                 goto bail;
293         }
294
295         /* Clean up list of requests processed during current epoch */
296         list_for_each_safe(le, tle, &b->requests) {
297                 r = list_entry(le, struct drbd_request, tl_requests);
298                 _req_mod(r, barrier_acked);
299         }
300         /* There could be requests on the list waiting for completion
301            of the write to the local disk. To avoid corruptions of
302            slab's data structures we have to remove the lists head.
303
304            Also there could have been a barrier ack out of sequence, overtaking
305            the write acks - which would be a bug and violating write ordering.
306            To not deadlock in case we lose connection while such requests are
307            still pending, we need some way to find them for the
308            _req_mode(connection_lost_while_pending).
309
310            These have been list_move'd to the out_of_sequence_requests list in
311            _req_mod(, barrier_acked) above.
312            */
313         list_del_init(&b->requests);
314
315         nob = b->next;
316         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
317                 _tl_add_barrier(mdev, b);
318                 if (nob)
319                         mdev->oldest_tle = nob;
320                 /* if nob == NULL b was the only barrier, and becomes the new
321                    barrier. Therefore mdev->oldest_tle points already to b */
322         } else {
323                 D_ASSERT(nob != NULL);
324                 mdev->oldest_tle = nob;
325                 kfree(b);
326         }
327
328         spin_unlock_irq(&mdev->req_lock);
329         dec_ap_pending(mdev);
330
331         return;
332
333 bail:
334         spin_unlock_irq(&mdev->req_lock);
335         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
336 }
337
338 /**
339  * _tl_restart() - Walks the transfer log, and applies an action to all requests
340  * @mdev:       DRBD device.
341  * @what:       The action/event to perform with all request objects
342  *
343  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
344  * restart_frozen_disk_io.
345  */
346 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
347 {
348         struct drbd_tl_epoch *b, *tmp, **pn;
349         struct list_head *le, *tle, carry_reads;
350         struct drbd_request *req;
351         int rv, n_writes, n_reads;
352
353         b = mdev->oldest_tle;
354         pn = &mdev->oldest_tle;
355         while (b) {
356                 n_writes = 0;
357                 n_reads = 0;
358                 INIT_LIST_HEAD(&carry_reads);
359                 list_for_each_safe(le, tle, &b->requests) {
360                         req = list_entry(le, struct drbd_request, tl_requests);
361                         rv = _req_mod(req, what);
362
363                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
364                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
365                 }
366                 tmp = b->next;
367
368                 if (n_writes) {
369                         if (what == resend) {
370                                 b->n_writes = n_writes;
371                                 if (b->w.cb == NULL) {
372                                         b->w.cb = w_send_barrier;
373                                         inc_ap_pending(mdev);
374                                         set_bit(CREATE_BARRIER, &mdev->flags);
375                                 }
376
377                                 drbd_queue_work(&mdev->data.work, &b->w);
378                         }
379                         pn = &b->next;
380                 } else {
381                         if (n_reads)
382                                 list_add(&carry_reads, &b->requests);
383                         /* there could still be requests on that ring list,
384                          * in case local io is still pending */
385                         list_del(&b->requests);
386
387                         /* dec_ap_pending corresponding to queue_barrier.
388                          * the newest barrier may not have been queued yet,
389                          * in which case w.cb is still NULL. */
390                         if (b->w.cb != NULL)
391                                 dec_ap_pending(mdev);
392
393                         if (b == mdev->newest_tle) {
394                                 /* recycle, but reinit! */
395                                 D_ASSERT(tmp == NULL);
396                                 INIT_LIST_HEAD(&b->requests);
397                                 list_splice(&carry_reads, &b->requests);
398                                 INIT_LIST_HEAD(&b->w.list);
399                                 b->w.cb = NULL;
400                                 b->br_number = net_random();
401                                 b->n_writes = 0;
402
403                                 *pn = b;
404                                 break;
405                         }
406                         *pn = tmp;
407                         kfree(b);
408                 }
409                 b = tmp;
410                 list_splice(&carry_reads, &b->requests);
411         }
412 }
413
414
415 /**
416  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
417  * @mdev:       DRBD device.
418  *
419  * This is called after the connection to the peer was lost. The storage covered
420  * by the requests on the transfer gets marked as our of sync. Called from the
421  * receiver thread and the worker thread.
422  */
423 void tl_clear(struct drbd_conf *mdev)
424 {
425         struct list_head *le, *tle;
426         struct drbd_request *r;
427
428         spin_lock_irq(&mdev->req_lock);
429
430         _tl_restart(mdev, connection_lost_while_pending);
431
432         /* we expect this list to be empty. */
433         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
434
435         /* but just in case, clean it up anyways! */
436         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
437                 r = list_entry(le, struct drbd_request, tl_requests);
438                 /* It would be nice to complete outside of spinlock.
439                  * But this is easier for now. */
440                 _req_mod(r, connection_lost_while_pending);
441         }
442
443         /* ensure bit indicating barrier is required is clear */
444         clear_bit(CREATE_BARRIER, &mdev->flags);
445
446         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
447
448         spin_unlock_irq(&mdev->req_lock);
449 }
450
451 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
452 {
453         spin_lock_irq(&mdev->req_lock);
454         _tl_restart(mdev, what);
455         spin_unlock_irq(&mdev->req_lock);
456 }
457
458 /**
459  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
460  * @mdev:       DRBD device.
461  * @os:         old (current) state.
462  * @ns:         new (wanted) state.
463  */
464 static int cl_wide_st_chg(struct drbd_conf *mdev,
465                           union drbd_state os, union drbd_state ns)
466 {
467         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
468                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
469                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
470                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
471                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
472                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
473                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
474 }
475
476 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
477                       union drbd_state mask, union drbd_state val)
478 {
479         unsigned long flags;
480         union drbd_state os, ns;
481         int rv;
482
483         spin_lock_irqsave(&mdev->req_lock, flags);
484         os = mdev->state;
485         ns.i = (os.i & ~mask.i) | val.i;
486         rv = _drbd_set_state(mdev, ns, f, NULL);
487         ns = mdev->state;
488         spin_unlock_irqrestore(&mdev->req_lock, flags);
489
490         return rv;
491 }
492
493 /**
494  * drbd_force_state() - Impose a change which happens outside our control on our state
495  * @mdev:       DRBD device.
496  * @mask:       mask of state bits to change.
497  * @val:        value of new state bits.
498  */
499 void drbd_force_state(struct drbd_conf *mdev,
500         union drbd_state mask, union drbd_state val)
501 {
502         drbd_change_state(mdev, CS_HARD, mask, val);
503 }
504
505 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
506 static int is_valid_state_transition(struct drbd_conf *,
507                                      union drbd_state, union drbd_state);
508 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
509                                        union drbd_state ns, const char **warn_sync_abort);
510 int drbd_send_state_req(struct drbd_conf *,
511                         union drbd_state, union drbd_state);
512
513 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
514                                     union drbd_state mask, union drbd_state val)
515 {
516         union drbd_state os, ns;
517         unsigned long flags;
518         int rv;
519
520         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
521                 return SS_CW_SUCCESS;
522
523         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
524                 return SS_CW_FAILED_BY_PEER;
525
526         rv = 0;
527         spin_lock_irqsave(&mdev->req_lock, flags);
528         os = mdev->state;
529         ns.i = (os.i & ~mask.i) | val.i;
530         ns = sanitize_state(mdev, os, ns, NULL);
531
532         if (!cl_wide_st_chg(mdev, os, ns))
533                 rv = SS_CW_NO_NEED;
534         if (!rv) {
535                 rv = is_valid_state(mdev, ns);
536                 if (rv == SS_SUCCESS) {
537                         rv = is_valid_state_transition(mdev, ns, os);
538                         if (rv == SS_SUCCESS)
539                                 rv = 0; /* cont waiting, otherwise fail. */
540                 }
541         }
542         spin_unlock_irqrestore(&mdev->req_lock, flags);
543
544         return rv;
545 }
546
547 /**
548  * drbd_req_state() - Perform an eventually cluster wide state change
549  * @mdev:       DRBD device.
550  * @mask:       mask of state bits to change.
551  * @val:        value of new state bits.
552  * @f:          flags
553  *
554  * Should not be called directly, use drbd_request_state() or
555  * _drbd_request_state().
556  */
557 static int drbd_req_state(struct drbd_conf *mdev,
558                           union drbd_state mask, union drbd_state val,
559                           enum chg_state_flags f)
560 {
561         struct completion done;
562         unsigned long flags;
563         union drbd_state os, ns;
564         int rv;
565
566         init_completion(&done);
567
568         if (f & CS_SERIALIZE)
569                 mutex_lock(&mdev->state_mutex);
570
571         spin_lock_irqsave(&mdev->req_lock, flags);
572         os = mdev->state;
573         ns.i = (os.i & ~mask.i) | val.i;
574         ns = sanitize_state(mdev, os, ns, NULL);
575
576         if (cl_wide_st_chg(mdev, os, ns)) {
577                 rv = is_valid_state(mdev, ns);
578                 if (rv == SS_SUCCESS)
579                         rv = is_valid_state_transition(mdev, ns, os);
580                 spin_unlock_irqrestore(&mdev->req_lock, flags);
581
582                 if (rv < SS_SUCCESS) {
583                         if (f & CS_VERBOSE)
584                                 print_st_err(mdev, os, ns, rv);
585                         goto abort;
586                 }
587
588                 drbd_state_lock(mdev);
589                 if (!drbd_send_state_req(mdev, mask, val)) {
590                         drbd_state_unlock(mdev);
591                         rv = SS_CW_FAILED_BY_PEER;
592                         if (f & CS_VERBOSE)
593                                 print_st_err(mdev, os, ns, rv);
594                         goto abort;
595                 }
596
597                 wait_event(mdev->state_wait,
598                         (rv = _req_st_cond(mdev, mask, val)));
599
600                 if (rv < SS_SUCCESS) {
601                         drbd_state_unlock(mdev);
602                         if (f & CS_VERBOSE)
603                                 print_st_err(mdev, os, ns, rv);
604                         goto abort;
605                 }
606                 spin_lock_irqsave(&mdev->req_lock, flags);
607                 os = mdev->state;
608                 ns.i = (os.i & ~mask.i) | val.i;
609                 rv = _drbd_set_state(mdev, ns, f, &done);
610                 drbd_state_unlock(mdev);
611         } else {
612                 rv = _drbd_set_state(mdev, ns, f, &done);
613         }
614
615         spin_unlock_irqrestore(&mdev->req_lock, flags);
616
617         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
618                 D_ASSERT(current != mdev->worker.task);
619                 wait_for_completion(&done);
620         }
621
622 abort:
623         if (f & CS_SERIALIZE)
624                 mutex_unlock(&mdev->state_mutex);
625
626         return rv;
627 }
628
629 /**
630  * _drbd_request_state() - Request a state change (with flags)
631  * @mdev:       DRBD device.
632  * @mask:       mask of state bits to change.
633  * @val:        value of new state bits.
634  * @f:          flags
635  *
636  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
637  * flag, or when logging of failed state change requests is not desired.
638  */
639 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
640                         union drbd_state val,   enum chg_state_flags f)
641 {
642         int rv;
643
644         wait_event(mdev->state_wait,
645                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
646
647         return rv;
648 }
649
650 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
651 {
652         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
653             name,
654             drbd_conn_str(ns.conn),
655             drbd_role_str(ns.role),
656             drbd_role_str(ns.peer),
657             drbd_disk_str(ns.disk),
658             drbd_disk_str(ns.pdsk),
659             is_susp(ns) ? 's' : 'r',
660             ns.aftr_isp ? 'a' : '-',
661             ns.peer_isp ? 'p' : '-',
662             ns.user_isp ? 'u' : '-'
663             );
664 }
665
666 void print_st_err(struct drbd_conf *mdev,
667         union drbd_state os, union drbd_state ns, int err)
668 {
669         if (err == SS_IN_TRANSIENT_STATE)
670                 return;
671         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
672         print_st(mdev, " state", os);
673         print_st(mdev, "wanted", ns);
674 }
675
676
677 #define drbd_peer_str drbd_role_str
678 #define drbd_pdsk_str drbd_disk_str
679
680 #define drbd_susp_str(A)     ((A) ? "1" : "0")
681 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
682 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
683 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
684
685 #define PSC(A) \
686         ({ if (ns.A != os.A) { \
687                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
688                               drbd_##A##_str(os.A), \
689                               drbd_##A##_str(ns.A)); \
690         } })
691
692 /**
693  * is_valid_state() - Returns an SS_ error code if ns is not valid
694  * @mdev:       DRBD device.
695  * @ns:         State to consider.
696  */
697 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
698 {
699         /* See drbd_state_sw_errors in drbd_strings.c */
700
701         enum drbd_fencing_p fp;
702         int rv = SS_SUCCESS;
703
704         fp = FP_DONT_CARE;
705         if (get_ldev(mdev)) {
706                 fp = mdev->ldev->dc.fencing;
707                 put_ldev(mdev);
708         }
709
710         if (get_net_conf(mdev)) {
711                 if (!mdev->net_conf->two_primaries &&
712                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
713                         rv = SS_TWO_PRIMARIES;
714                 put_net_conf(mdev);
715         }
716
717         if (rv <= 0)
718                 /* already found a reason to abort */;
719         else if (ns.role == R_SECONDARY && mdev->open_cnt)
720                 rv = SS_DEVICE_IN_USE;
721
722         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
723                 rv = SS_NO_UP_TO_DATE_DISK;
724
725         else if (fp >= FP_RESOURCE &&
726                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
727                 rv = SS_PRIMARY_NOP;
728
729         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
730                 rv = SS_NO_UP_TO_DATE_DISK;
731
732         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
733                 rv = SS_NO_LOCAL_DISK;
734
735         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
736                 rv = SS_NO_REMOTE_DISK;
737
738         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
739                 rv = SS_NO_UP_TO_DATE_DISK;
740
741         else if ((ns.conn == C_CONNECTED ||
742                   ns.conn == C_WF_BITMAP_S ||
743                   ns.conn == C_SYNC_SOURCE ||
744                   ns.conn == C_PAUSED_SYNC_S) &&
745                   ns.disk == D_OUTDATED)
746                 rv = SS_CONNECTED_OUTDATES;
747
748         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
749                  (mdev->sync_conf.verify_alg[0] == 0))
750                 rv = SS_NO_VERIFY_ALG;
751
752         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
753                   mdev->agreed_pro_version < 88)
754                 rv = SS_NOT_SUPPORTED;
755
756         return rv;
757 }
758
759 /**
760  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
761  * @mdev:       DRBD device.
762  * @ns:         new state.
763  * @os:         old state.
764  */
765 static int is_valid_state_transition(struct drbd_conf *mdev,
766                                      union drbd_state ns, union drbd_state os)
767 {
768         int rv = SS_SUCCESS;
769
770         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
771             os.conn > C_CONNECTED)
772                 rv = SS_RESYNC_RUNNING;
773
774         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
775                 rv = SS_ALREADY_STANDALONE;
776
777         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
778                 rv = SS_IS_DISKLESS;
779
780         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
781                 rv = SS_NO_NET_CONFIG;
782
783         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
784                 rv = SS_LOWER_THAN_OUTDATED;
785
786         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
787                 rv = SS_IN_TRANSIENT_STATE;
788
789         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
790                 rv = SS_IN_TRANSIENT_STATE;
791
792         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
793                 rv = SS_NEED_CONNECTION;
794
795         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
796             ns.conn != os.conn && os.conn > C_CONNECTED)
797                 rv = SS_RESYNC_RUNNING;
798
799         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
800             os.conn < C_CONNECTED)
801                 rv = SS_NEED_CONNECTION;
802
803         return rv;
804 }
805
806 /**
807  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
808  * @mdev:       DRBD device.
809  * @os:         old state.
810  * @ns:         new state.
811  * @warn_sync_abort:
812  *
813  * When we loose connection, we have to set the state of the peers disk (pdsk)
814  * to D_UNKNOWN. This rule and many more along those lines are in this function.
815  */
816 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
817                                        union drbd_state ns, const char **warn_sync_abort)
818 {
819         enum drbd_fencing_p fp;
820
821         fp = FP_DONT_CARE;
822         if (get_ldev(mdev)) {
823                 fp = mdev->ldev->dc.fencing;
824                 put_ldev(mdev);
825         }
826
827         /* Disallow Network errors to configure a device's network part */
828         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
829             os.conn <= C_DISCONNECTING)
830                 ns.conn = os.conn;
831
832         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
833          * If you try to go into some Sync* state, that shall fail (elsewhere). */
834         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
835             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
836                 ns.conn = os.conn;
837
838         /* After C_DISCONNECTING only C_STANDALONE may follow */
839         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
840                 ns.conn = os.conn;
841
842         if (ns.conn < C_CONNECTED) {
843                 ns.peer_isp = 0;
844                 ns.peer = R_UNKNOWN;
845                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
846                         ns.pdsk = D_UNKNOWN;
847         }
848
849         /* Clear the aftr_isp when becoming unconfigured */
850         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
851                 ns.aftr_isp = 0;
852
853         /* Abort resync if a disk fails/detaches */
854         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
855             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
856                 if (warn_sync_abort)
857                         *warn_sync_abort =
858                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
859                                 "Online-verify" : "Resync";
860                 ns.conn = C_CONNECTED;
861         }
862
863         if (ns.conn >= C_CONNECTED &&
864             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
865              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
866                 switch (ns.conn) {
867                 case C_WF_BITMAP_T:
868                 case C_PAUSED_SYNC_T:
869                         ns.disk = D_OUTDATED;
870                         break;
871                 case C_CONNECTED:
872                 case C_WF_BITMAP_S:
873                 case C_SYNC_SOURCE:
874                 case C_PAUSED_SYNC_S:
875                         ns.disk = D_UP_TO_DATE;
876                         break;
877                 case C_SYNC_TARGET:
878                         ns.disk = D_INCONSISTENT;
879                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
880                         break;
881                 }
882                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
883                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
884         }
885
886         if (ns.conn >= C_CONNECTED &&
887             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
888                 switch (ns.conn) {
889                 case C_CONNECTED:
890                 case C_WF_BITMAP_T:
891                 case C_PAUSED_SYNC_T:
892                 case C_SYNC_TARGET:
893                         ns.pdsk = D_UP_TO_DATE;
894                         break;
895                 case C_WF_BITMAP_S:
896                 case C_PAUSED_SYNC_S:
897                         /* remap any consistent state to D_OUTDATED,
898                          * but disallow "upgrade" of not even consistent states.
899                          */
900                         ns.pdsk =
901                                 (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
902                                 ? os.pdsk : D_OUTDATED;
903                         break;
904                 case C_SYNC_SOURCE:
905                         ns.pdsk = D_INCONSISTENT;
906                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
907                         break;
908                 }
909                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
910                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
911         }
912
913         /* Connection breaks down before we finished "Negotiating" */
914         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
915             get_ldev_if_state(mdev, D_NEGOTIATING)) {
916                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
917                         ns.disk = mdev->new_state_tmp.disk;
918                         ns.pdsk = mdev->new_state_tmp.pdsk;
919                 } else {
920                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
921                         ns.disk = D_DISKLESS;
922                         ns.pdsk = D_UNKNOWN;
923                 }
924                 put_ldev(mdev);
925         }
926
927         if (fp == FP_STONITH &&
928             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
929             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
930                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
931
932         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
933             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
934             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
935                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
936
937         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
938                 if (ns.conn == C_SYNC_SOURCE)
939                         ns.conn = C_PAUSED_SYNC_S;
940                 if (ns.conn == C_SYNC_TARGET)
941                         ns.conn = C_PAUSED_SYNC_T;
942         } else {
943                 if (ns.conn == C_PAUSED_SYNC_S)
944                         ns.conn = C_SYNC_SOURCE;
945                 if (ns.conn == C_PAUSED_SYNC_T)
946                         ns.conn = C_SYNC_TARGET;
947         }
948
949         return ns;
950 }
951
952 /* helper for __drbd_set_state */
953 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
954 {
955         if (cs == C_VERIFY_T) {
956                 /* starting online verify from an arbitrary position
957                  * does not fit well into the existing protocol.
958                  * on C_VERIFY_T, we initialize ov_left and friends
959                  * implicitly in receive_DataRequest once the
960                  * first P_OV_REQUEST is received */
961                 mdev->ov_start_sector = ~(sector_t)0;
962         } else {
963                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
964                 if (bit >= mdev->rs_total)
965                         mdev->ov_start_sector =
966                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
967                 mdev->ov_position = mdev->ov_start_sector;
968         }
969 }
970
971 static void drbd_resume_al(struct drbd_conf *mdev)
972 {
973         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
974                 dev_info(DEV, "Resumed AL updates\n");
975 }
976
977 /**
978  * __drbd_set_state() - Set a new DRBD state
979  * @mdev:       DRBD device.
980  * @ns:         new state.
981  * @flags:      Flags
982  * @done:       Optional completion, that will get completed after the after_state_ch() finished
983  *
984  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
985  */
986 int __drbd_set_state(struct drbd_conf *mdev,
987                     union drbd_state ns, enum chg_state_flags flags,
988                     struct completion *done)
989 {
990         union drbd_state os;
991         int rv = SS_SUCCESS;
992         const char *warn_sync_abort = NULL;
993         struct after_state_chg_work *ascw;
994
995         os = mdev->state;
996
997         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
998
999         if (ns.i == os.i)
1000                 return SS_NOTHING_TO_DO;
1001
1002         if (!(flags & CS_HARD)) {
1003                 /*  pre-state-change checks ; only look at ns  */
1004                 /* See drbd_state_sw_errors in drbd_strings.c */
1005
1006                 rv = is_valid_state(mdev, ns);
1007                 if (rv < SS_SUCCESS) {
1008                         /* If the old state was illegal as well, then let
1009                            this happen...*/
1010
1011                         if (is_valid_state(mdev, os) == rv)
1012                                 rv = is_valid_state_transition(mdev, ns, os);
1013                 } else
1014                         rv = is_valid_state_transition(mdev, ns, os);
1015         }
1016
1017         if (rv < SS_SUCCESS) {
1018                 if (flags & CS_VERBOSE)
1019                         print_st_err(mdev, os, ns, rv);
1020                 return rv;
1021         }
1022
1023         if (warn_sync_abort)
1024                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1025
1026         {
1027                 char *pbp, pb[300];
1028                 pbp = pb;
1029                 *pbp = 0;
1030                 PSC(role);
1031                 PSC(peer);
1032                 PSC(conn);
1033                 PSC(disk);
1034                 PSC(pdsk);
1035                 if (is_susp(ns) != is_susp(os))
1036                         pbp += sprintf(pbp, "susp( %s -> %s ) ",
1037                                        drbd_susp_str(is_susp(os)),
1038                                        drbd_susp_str(is_susp(ns)));
1039                 PSC(aftr_isp);
1040                 PSC(peer_isp);
1041                 PSC(user_isp);
1042                 dev_info(DEV, "%s\n", pb);
1043         }
1044
1045         /* solve the race between becoming unconfigured,
1046          * worker doing the cleanup, and
1047          * admin reconfiguring us:
1048          * on (re)configure, first set CONFIG_PENDING,
1049          * then wait for a potentially exiting worker,
1050          * start the worker, and schedule one no_op.
1051          * then proceed with configuration.
1052          */
1053         if (ns.disk == D_DISKLESS &&
1054             ns.conn == C_STANDALONE &&
1055             ns.role == R_SECONDARY &&
1056             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1057                 set_bit(DEVICE_DYING, &mdev->flags);
1058
1059         mdev->state.i = ns.i;
1060         wake_up(&mdev->misc_wait);
1061         wake_up(&mdev->state_wait);
1062
1063         /* aborted verify run. log the last position */
1064         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1065             ns.conn < C_CONNECTED) {
1066                 mdev->ov_start_sector =
1067                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1068                 dev_info(DEV, "Online Verify reached sector %llu\n",
1069                         (unsigned long long)mdev->ov_start_sector);
1070         }
1071
1072         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1073             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1074                 dev_info(DEV, "Syncer continues.\n");
1075                 mdev->rs_paused += (long)jiffies
1076                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1077                 if (ns.conn == C_SYNC_TARGET)
1078                         mod_timer(&mdev->resync_timer, jiffies);
1079         }
1080
1081         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1082             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1083                 dev_info(DEV, "Resync suspended\n");
1084                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1085         }
1086
1087         if (os.conn == C_CONNECTED &&
1088             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1089                 unsigned long now = jiffies;
1090                 int i;
1091
1092                 mdev->ov_position = 0;
1093                 mdev->rs_total = drbd_bm_bits(mdev);
1094                 if (mdev->agreed_pro_version >= 90)
1095                         set_ov_position(mdev, ns.conn);
1096                 else
1097                         mdev->ov_start_sector = 0;
1098                 mdev->ov_left = mdev->rs_total
1099                               - BM_SECT_TO_BIT(mdev->ov_position);
1100                 mdev->rs_start = now;
1101                 mdev->rs_last_events = 0;
1102                 mdev->rs_last_sect_ev = 0;
1103                 mdev->ov_last_oos_size = 0;
1104                 mdev->ov_last_oos_start = 0;
1105
1106                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1107                         mdev->rs_mark_left[i] = mdev->rs_total;
1108                         mdev->rs_mark_time[i] = now;
1109                 }
1110
1111                 if (ns.conn == C_VERIFY_S) {
1112                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1113                                         (unsigned long long)mdev->ov_position);
1114                         mod_timer(&mdev->resync_timer, jiffies);
1115                 }
1116         }
1117
1118         if (get_ldev(mdev)) {
1119                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1120                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1121                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1122
1123                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1124                         mdf |= MDF_CRASHED_PRIMARY;
1125                 if (mdev->state.role == R_PRIMARY ||
1126                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1127                         mdf |= MDF_PRIMARY_IND;
1128                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1129                         mdf |= MDF_CONNECTED_IND;
1130                 if (mdev->state.disk > D_INCONSISTENT)
1131                         mdf |= MDF_CONSISTENT;
1132                 if (mdev->state.disk > D_OUTDATED)
1133                         mdf |= MDF_WAS_UP_TO_DATE;
1134                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1135                         mdf |= MDF_PEER_OUT_DATED;
1136                 if (mdf != mdev->ldev->md.flags) {
1137                         mdev->ldev->md.flags = mdf;
1138                         drbd_md_mark_dirty(mdev);
1139                 }
1140                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1141                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1142                 put_ldev(mdev);
1143         }
1144
1145         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1146         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1147             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1148                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1149
1150         /* Receiver should clean up itself */
1151         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1152                 drbd_thread_stop_nowait(&mdev->receiver);
1153
1154         /* Now the receiver finished cleaning up itself, it should die */
1155         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1156                 drbd_thread_stop_nowait(&mdev->receiver);
1157
1158         /* Upon network failure, we need to restart the receiver. */
1159         if (os.conn > C_TEAR_DOWN &&
1160             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1161                 drbd_thread_restart_nowait(&mdev->receiver);
1162
1163         /* Resume AL writing if we get a connection */
1164         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1165                 drbd_resume_al(mdev);
1166
1167         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1168         if (ascw) {
1169                 ascw->os = os;
1170                 ascw->ns = ns;
1171                 ascw->flags = flags;
1172                 ascw->w.cb = w_after_state_ch;
1173                 ascw->done = done;
1174                 drbd_queue_work(&mdev->data.work, &ascw->w);
1175         } else {
1176                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1177         }
1178
1179         return rv;
1180 }
1181
1182 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1183 {
1184         struct after_state_chg_work *ascw =
1185                 container_of(w, struct after_state_chg_work, w);
1186         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1187         if (ascw->flags & CS_WAIT_COMPLETE) {
1188                 D_ASSERT(ascw->done != NULL);
1189                 complete(ascw->done);
1190         }
1191         kfree(ascw);
1192
1193         return 1;
1194 }
1195
1196 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1197 {
1198         if (rv) {
1199                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1200                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1201                 return;
1202         }
1203
1204         switch (mdev->state.conn) {
1205         case C_STARTING_SYNC_T:
1206                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1207                 break;
1208         case C_STARTING_SYNC_S:
1209                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1210                 break;
1211         }
1212 }
1213
1214 /**
1215  * after_state_ch() - Perform after state change actions that may sleep
1216  * @mdev:       DRBD device.
1217  * @os:         old state.
1218  * @ns:         new state.
1219  * @flags:      Flags
1220  */
1221 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1222                            union drbd_state ns, enum chg_state_flags flags)
1223 {
1224         enum drbd_fencing_p fp;
1225         enum drbd_req_event what = nothing;
1226         union drbd_state nsm = (union drbd_state){ .i = -1 };
1227
1228         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1229                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1230                 if (mdev->p_uuid)
1231                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1232         }
1233
1234         fp = FP_DONT_CARE;
1235         if (get_ldev(mdev)) {
1236                 fp = mdev->ldev->dc.fencing;
1237                 put_ldev(mdev);
1238         }
1239
1240         /* Inform userspace about the change... */
1241         drbd_bcast_state(mdev, ns);
1242
1243         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1244             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1245                 drbd_khelper(mdev, "pri-on-incon-degr");
1246
1247         /* Here we have the actions that are performed after a
1248            state change. This function might sleep */
1249
1250         nsm.i = -1;
1251         if (ns.susp_nod) {
1252                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1253                         if (ns.conn == C_CONNECTED)
1254                                 what = resend, nsm.susp_nod = 0;
1255                         else /* ns.conn > C_CONNECTED */
1256                                 dev_err(DEV, "Unexpected Resynd going on!\n");
1257                 }
1258
1259                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1260                         what = restart_frozen_disk_io, nsm.susp_nod = 0;
1261
1262         }
1263
1264         if (ns.susp_fen) {
1265                 /* case1: The outdate peer handler is successful: */
1266                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1267                         tl_clear(mdev);
1268                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1269                                 drbd_uuid_new_current(mdev);
1270                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1271                                 drbd_md_sync(mdev);
1272                         }
1273                         spin_lock_irq(&mdev->req_lock);
1274                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1275                         spin_unlock_irq(&mdev->req_lock);
1276                 }
1277                 /* case2: The connection was established again: */
1278                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1279                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1280                         what = resend;
1281                         nsm.susp_fen = 0;
1282                 }
1283         }
1284
1285         if (what != nothing) {
1286                 spin_lock_irq(&mdev->req_lock);
1287                 _tl_restart(mdev, what);
1288                 nsm.i &= mdev->state.i;
1289                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1290                 spin_unlock_irq(&mdev->req_lock);
1291         }
1292
1293         /* Do not change the order of the if above and the two below... */
1294         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1295                 drbd_send_uuids(mdev);
1296                 drbd_send_state(mdev);
1297         }
1298         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1299                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1300
1301         /* Lost contact to peer's copy of the data */
1302         if ((os.pdsk >= D_INCONSISTENT &&
1303              os.pdsk != D_UNKNOWN &&
1304              os.pdsk != D_OUTDATED)
1305         &&  (ns.pdsk < D_INCONSISTENT ||
1306              ns.pdsk == D_UNKNOWN ||
1307              ns.pdsk == D_OUTDATED)) {
1308                 if (get_ldev(mdev)) {
1309                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1310                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1311                                 if (is_susp(mdev->state)) {
1312                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1313                                 } else {
1314                                         drbd_uuid_new_current(mdev);
1315                                         drbd_send_uuids(mdev);
1316                                 }
1317                         }
1318                         put_ldev(mdev);
1319                 }
1320         }
1321
1322         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1323                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1324                         drbd_uuid_new_current(mdev);
1325                         drbd_send_uuids(mdev);
1326                 }
1327
1328                 /* D_DISKLESS Peer becomes secondary */
1329                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1330                         drbd_al_to_on_disk_bm(mdev);
1331                 put_ldev(mdev);
1332         }
1333
1334         /* Last part of the attaching process ... */
1335         if (ns.conn >= C_CONNECTED &&
1336             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1337                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1338                 drbd_send_uuids(mdev);
1339                 drbd_send_state(mdev);
1340         }
1341
1342         /* We want to pause/continue resync, tell peer. */
1343         if (ns.conn >= C_CONNECTED &&
1344              ((os.aftr_isp != ns.aftr_isp) ||
1345               (os.user_isp != ns.user_isp)))
1346                 drbd_send_state(mdev);
1347
1348         /* In case one of the isp bits got set, suspend other devices. */
1349         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1350             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1351                 suspend_other_sg(mdev);
1352
1353         /* Make sure the peer gets informed about eventual state
1354            changes (ISP bits) while we were in WFReportParams. */
1355         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1356                 drbd_send_state(mdev);
1357
1358         /* We are in the progress to start a full sync... */
1359         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1360             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1361                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1362
1363         /* We are invalidating our self... */
1364         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1365             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1366                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1367
1368         /* first half of local IO error */
1369         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1370                 enum drbd_io_error_p eh = EP_PASS_ON;
1371
1372                 if (drbd_send_state(mdev))
1373                         dev_warn(DEV, "Notified peer that my disk is broken.\n");
1374                 else
1375                         dev_err(DEV, "Sending state for drbd_io_error() failed\n");
1376
1377                 drbd_rs_cancel_all(mdev);
1378
1379                 if (get_ldev_if_state(mdev, D_FAILED)) {
1380                         eh = mdev->ldev->dc.on_io_error;
1381                         put_ldev(mdev);
1382                 }
1383                 if (eh == EP_CALL_HELPER)
1384                         drbd_khelper(mdev, "local-io-error");
1385         }
1386
1387
1388         /* second half of local IO error handling,
1389          * after local_cnt references have reached zero: */
1390         if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
1391                 mdev->rs_total = 0;
1392                 mdev->rs_failed = 0;
1393                 atomic_set(&mdev->rs_pending_cnt, 0);
1394         }
1395
1396         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1397                 /* We must still be diskless,
1398                  * re-attach has to be serialized with this! */
1399                 if (mdev->state.disk != D_DISKLESS)
1400                         dev_err(DEV,
1401                                 "ASSERT FAILED: disk is %s while going diskless\n",
1402                                 drbd_disk_str(mdev->state.disk));
1403
1404                 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state
1405                  * will inc/dec it frequently. Since we became D_DISKLESS, no
1406                  * one has touched the protected members anymore, though, so we
1407                  * are safe to free them here. */
1408                 if (drbd_send_state(mdev))
1409                         dev_warn(DEV, "Notified peer that I detached my disk.\n");
1410                 else
1411                         dev_err(DEV, "Sending state for detach failed\n");
1412
1413                 lc_destroy(mdev->resync);
1414                 mdev->resync = NULL;
1415                 lc_destroy(mdev->act_log);
1416                 mdev->act_log = NULL;
1417                 __no_warn(local,
1418                         drbd_free_bc(mdev->ldev);
1419                         mdev->ldev = NULL;);
1420
1421                 if (mdev->md_io_tmpp) {
1422                         __free_page(mdev->md_io_tmpp);
1423                         mdev->md_io_tmpp = NULL;
1424                 }
1425         }
1426
1427         /* Disks got bigger while they were detached */
1428         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1429             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1430                 if (ns.conn == C_CONNECTED)
1431                         resync_after_online_grow(mdev);
1432         }
1433
1434         /* A resync finished or aborted, wake paused devices... */
1435         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1436             (os.peer_isp && !ns.peer_isp) ||
1437             (os.user_isp && !ns.user_isp))
1438                 resume_next_sg(mdev);
1439
1440         /* sync target done with resync.  Explicitly notify peer, even though
1441          * it should (at least for non-empty resyncs) already know itself. */
1442         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1443                 drbd_send_state(mdev);
1444
1445         /* free tl_hash if we Got thawed and are C_STANDALONE */
1446         if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1447                 drbd_free_tl_hash(mdev);
1448
1449         /* Upon network connection, we need to start the receiver */
1450         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1451                 drbd_thread_start(&mdev->receiver);
1452
1453         /* Terminate worker thread if we are unconfigured - it will be
1454            restarted as needed... */
1455         if (ns.disk == D_DISKLESS &&
1456             ns.conn == C_STANDALONE &&
1457             ns.role == R_SECONDARY) {
1458                 if (os.aftr_isp != ns.aftr_isp)
1459                         resume_next_sg(mdev);
1460                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1461                 if (test_bit(DEVICE_DYING, &mdev->flags))
1462                         drbd_thread_stop_nowait(&mdev->worker);
1463         }
1464
1465         drbd_md_sync(mdev);
1466 }
1467
1468
1469 static int drbd_thread_setup(void *arg)
1470 {
1471         struct drbd_thread *thi = (struct drbd_thread *) arg;
1472         struct drbd_conf *mdev = thi->mdev;
1473         unsigned long flags;
1474         int retval;
1475
1476 restart:
1477         retval = thi->function(thi);
1478
1479         spin_lock_irqsave(&thi->t_lock, flags);
1480
1481         /* if the receiver has been "Exiting", the last thing it did
1482          * was set the conn state to "StandAlone",
1483          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1484          * and receiver thread will be "started".
1485          * drbd_thread_start needs to set "Restarting" in that case.
1486          * t_state check and assignment needs to be within the same spinlock,
1487          * so either thread_start sees Exiting, and can remap to Restarting,
1488          * or thread_start see None, and can proceed as normal.
1489          */
1490
1491         if (thi->t_state == Restarting) {
1492                 dev_info(DEV, "Restarting %s\n", current->comm);
1493                 thi->t_state = Running;
1494                 spin_unlock_irqrestore(&thi->t_lock, flags);
1495                 goto restart;
1496         }
1497
1498         thi->task = NULL;
1499         thi->t_state = None;
1500         smp_mb();
1501         complete(&thi->stop);
1502         spin_unlock_irqrestore(&thi->t_lock, flags);
1503
1504         dev_info(DEV, "Terminating %s\n", current->comm);
1505
1506         /* Release mod reference taken when thread was started */
1507         module_put(THIS_MODULE);
1508         return retval;
1509 }
1510
1511 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1512                       int (*func) (struct drbd_thread *))
1513 {
1514         spin_lock_init(&thi->t_lock);
1515         thi->task    = NULL;
1516         thi->t_state = None;
1517         thi->function = func;
1518         thi->mdev = mdev;
1519 }
1520
1521 int drbd_thread_start(struct drbd_thread *thi)
1522 {
1523         struct drbd_conf *mdev = thi->mdev;
1524         struct task_struct *nt;
1525         unsigned long flags;
1526
1527         const char *me =
1528                 thi == &mdev->receiver ? "receiver" :
1529                 thi == &mdev->asender  ? "asender"  :
1530                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1531
1532         /* is used from state engine doing drbd_thread_stop_nowait,
1533          * while holding the req lock irqsave */
1534         spin_lock_irqsave(&thi->t_lock, flags);
1535
1536         switch (thi->t_state) {
1537         case None:
1538                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1539                                 me, current->comm, current->pid);
1540
1541                 /* Get ref on module for thread - this is released when thread exits */
1542                 if (!try_module_get(THIS_MODULE)) {
1543                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1544                         spin_unlock_irqrestore(&thi->t_lock, flags);
1545                         return FALSE;
1546                 }
1547
1548                 init_completion(&thi->stop);
1549                 D_ASSERT(thi->task == NULL);
1550                 thi->reset_cpu_mask = 1;
1551                 thi->t_state = Running;
1552                 spin_unlock_irqrestore(&thi->t_lock, flags);
1553                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1554
1555                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1556                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1557
1558                 if (IS_ERR(nt)) {
1559                         dev_err(DEV, "Couldn't start thread\n");
1560
1561                         module_put(THIS_MODULE);
1562                         return FALSE;
1563                 }
1564                 spin_lock_irqsave(&thi->t_lock, flags);
1565                 thi->task = nt;
1566                 thi->t_state = Running;
1567                 spin_unlock_irqrestore(&thi->t_lock, flags);
1568                 wake_up_process(nt);
1569                 break;
1570         case Exiting:
1571                 thi->t_state = Restarting;
1572                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1573                                 me, current->comm, current->pid);
1574                 /* fall through */
1575         case Running:
1576         case Restarting:
1577         default:
1578                 spin_unlock_irqrestore(&thi->t_lock, flags);
1579                 break;
1580         }
1581
1582         return TRUE;
1583 }
1584
1585
1586 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1587 {
1588         unsigned long flags;
1589
1590         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1591
1592         /* may be called from state engine, holding the req lock irqsave */
1593         spin_lock_irqsave(&thi->t_lock, flags);
1594
1595         if (thi->t_state == None) {
1596                 spin_unlock_irqrestore(&thi->t_lock, flags);
1597                 if (restart)
1598                         drbd_thread_start(thi);
1599                 return;
1600         }
1601
1602         if (thi->t_state != ns) {
1603                 if (thi->task == NULL) {
1604                         spin_unlock_irqrestore(&thi->t_lock, flags);
1605                         return;
1606                 }
1607
1608                 thi->t_state = ns;
1609                 smp_mb();
1610                 init_completion(&thi->stop);
1611                 if (thi->task != current)
1612                         force_sig(DRBD_SIGKILL, thi->task);
1613
1614         }
1615
1616         spin_unlock_irqrestore(&thi->t_lock, flags);
1617
1618         if (wait)
1619                 wait_for_completion(&thi->stop);
1620 }
1621
1622 #ifdef CONFIG_SMP
1623 /**
1624  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1625  * @mdev:       DRBD device.
1626  *
1627  * Forces all threads of a device onto the same CPU. This is beneficial for
1628  * DRBD's performance. May be overwritten by user's configuration.
1629  */
1630 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1631 {
1632         int ord, cpu;
1633
1634         /* user override. */
1635         if (cpumask_weight(mdev->cpu_mask))
1636                 return;
1637
1638         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1639         for_each_online_cpu(cpu) {
1640                 if (ord-- == 0) {
1641                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1642                         return;
1643                 }
1644         }
1645         /* should not be reached */
1646         cpumask_setall(mdev->cpu_mask);
1647 }
1648
1649 /**
1650  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1651  * @mdev:       DRBD device.
1652  *
1653  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1654  * prematurely.
1655  */
1656 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1657 {
1658         struct task_struct *p = current;
1659         struct drbd_thread *thi =
1660                 p == mdev->asender.task  ? &mdev->asender  :
1661                 p == mdev->receiver.task ? &mdev->receiver :
1662                 p == mdev->worker.task   ? &mdev->worker   :
1663                 NULL;
1664         ERR_IF(thi == NULL)
1665                 return;
1666         if (!thi->reset_cpu_mask)
1667                 return;
1668         thi->reset_cpu_mask = 0;
1669         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1670 }
1671 #endif
1672
1673 /* the appropriate socket mutex must be held already */
1674 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1675                           enum drbd_packets cmd, struct p_header80 *h,
1676                           size_t size, unsigned msg_flags)
1677 {
1678         int sent, ok;
1679
1680         ERR_IF(!h) return FALSE;
1681         ERR_IF(!size) return FALSE;
1682
1683         h->magic   = BE_DRBD_MAGIC;
1684         h->command = cpu_to_be16(cmd);
1685         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1686
1687         sent = drbd_send(mdev, sock, h, size, msg_flags);
1688
1689         ok = (sent == size);
1690         if (!ok)
1691                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1692                     cmdname(cmd), (int)size, sent);
1693         return ok;
1694 }
1695
1696 /* don't pass the socket. we may only look at it
1697  * when we hold the appropriate socket mutex.
1698  */
1699 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1700                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1701 {
1702         int ok = 0;
1703         struct socket *sock;
1704
1705         if (use_data_socket) {
1706                 mutex_lock(&mdev->data.mutex);
1707                 sock = mdev->data.socket;
1708         } else {
1709                 mutex_lock(&mdev->meta.mutex);
1710                 sock = mdev->meta.socket;
1711         }
1712
1713         /* drbd_disconnect() could have called drbd_free_sock()
1714          * while we were waiting in down()... */
1715         if (likely(sock != NULL))
1716                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1717
1718         if (use_data_socket)
1719                 mutex_unlock(&mdev->data.mutex);
1720         else
1721                 mutex_unlock(&mdev->meta.mutex);
1722         return ok;
1723 }
1724
1725 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1726                    size_t size)
1727 {
1728         struct p_header80 h;
1729         int ok;
1730
1731         h.magic   = BE_DRBD_MAGIC;
1732         h.command = cpu_to_be16(cmd);
1733         h.length  = cpu_to_be16(size);
1734
1735         if (!drbd_get_data_sock(mdev))
1736                 return 0;
1737
1738         ok = (sizeof(h) ==
1739                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1740         ok = ok && (size ==
1741                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1742
1743         drbd_put_data_sock(mdev);
1744
1745         return ok;
1746 }
1747
1748 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1749 {
1750         struct p_rs_param_95 *p;
1751         struct socket *sock;
1752         int size, rv;
1753         const int apv = mdev->agreed_pro_version;
1754
1755         size = apv <= 87 ? sizeof(struct p_rs_param)
1756                 : apv == 88 ? sizeof(struct p_rs_param)
1757                         + strlen(mdev->sync_conf.verify_alg) + 1
1758                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1759                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1760
1761         /* used from admin command context and receiver/worker context.
1762          * to avoid kmalloc, grab the socket right here,
1763          * then use the pre-allocated sbuf there */
1764         mutex_lock(&mdev->data.mutex);
1765         sock = mdev->data.socket;
1766
1767         if (likely(sock != NULL)) {
1768                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1769
1770                 p = &mdev->data.sbuf.rs_param_95;
1771
1772                 /* initialize verify_alg and csums_alg */
1773                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1774
1775                 p->rate = cpu_to_be32(sc->rate);
1776                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1777                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1778                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1779                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1780
1781                 if (apv >= 88)
1782                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1783                 if (apv >= 89)
1784                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1785
1786                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1787         } else
1788                 rv = 0; /* not ok */
1789
1790         mutex_unlock(&mdev->data.mutex);
1791
1792         return rv;
1793 }
1794
1795 int drbd_send_protocol(struct drbd_conf *mdev)
1796 {
1797         struct p_protocol *p;
1798         int size, cf, rv;
1799
1800         size = sizeof(struct p_protocol);
1801
1802         if (mdev->agreed_pro_version >= 87)
1803                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1804
1805         /* we must not recurse into our own queue,
1806          * as that is blocked during handshake */
1807         p = kmalloc(size, GFP_NOIO);
1808         if (p == NULL)
1809                 return 0;
1810
1811         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1812         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1813         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1814         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1815         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1816
1817         cf = 0;
1818         if (mdev->net_conf->want_lose)
1819                 cf |= CF_WANT_LOSE;
1820         if (mdev->net_conf->dry_run) {
1821                 if (mdev->agreed_pro_version >= 92)
1822                         cf |= CF_DRY_RUN;
1823                 else {
1824                         dev_err(DEV, "--dry-run is not supported by peer");
1825                         kfree(p);
1826                         return 0;
1827                 }
1828         }
1829         p->conn_flags    = cpu_to_be32(cf);
1830
1831         if (mdev->agreed_pro_version >= 87)
1832                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1833
1834         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1835                            (struct p_header80 *)p, size);
1836         kfree(p);
1837         return rv;
1838 }
1839
1840 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1841 {
1842         struct p_uuids p;
1843         int i;
1844
1845         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1846                 return 1;
1847
1848         for (i = UI_CURRENT; i < UI_SIZE; i++)
1849                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1850
1851         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1852         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1853         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1854         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1855         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1856         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1857
1858         put_ldev(mdev);
1859
1860         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1861                              (struct p_header80 *)&p, sizeof(p));
1862 }
1863
1864 int drbd_send_uuids(struct drbd_conf *mdev)
1865 {
1866         return _drbd_send_uuids(mdev, 0);
1867 }
1868
1869 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1870 {
1871         return _drbd_send_uuids(mdev, 8);
1872 }
1873
1874
1875 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1876 {
1877         struct p_rs_uuid p;
1878
1879         p.uuid = cpu_to_be64(val);
1880
1881         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1882                              (struct p_header80 *)&p, sizeof(p));
1883 }
1884
1885 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1886 {
1887         struct p_sizes p;
1888         sector_t d_size, u_size;
1889         int q_order_type;
1890         int ok;
1891
1892         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1893                 D_ASSERT(mdev->ldev->backing_bdev);
1894                 d_size = drbd_get_max_capacity(mdev->ldev);
1895                 u_size = mdev->ldev->dc.disk_size;
1896                 q_order_type = drbd_queue_order_type(mdev);
1897                 put_ldev(mdev);
1898         } else {
1899                 d_size = 0;
1900                 u_size = 0;
1901                 q_order_type = QUEUE_ORDERED_NONE;
1902         }
1903
1904         p.d_size = cpu_to_be64(d_size);
1905         p.u_size = cpu_to_be64(u_size);
1906         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1907         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1908         p.queue_order_type = cpu_to_be16(q_order_type);
1909         p.dds_flags = cpu_to_be16(flags);
1910
1911         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1912                            (struct p_header80 *)&p, sizeof(p));
1913         return ok;
1914 }
1915
1916 /**
1917  * drbd_send_state() - Sends the drbd state to the peer
1918  * @mdev:       DRBD device.
1919  */
1920 int drbd_send_state(struct drbd_conf *mdev)
1921 {
1922         struct socket *sock;
1923         struct p_state p;
1924         int ok = 0;
1925
1926         /* Grab state lock so we wont send state if we're in the middle
1927          * of a cluster wide state change on another thread */
1928         drbd_state_lock(mdev);
1929
1930         mutex_lock(&mdev->data.mutex);
1931
1932         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1933         sock = mdev->data.socket;
1934
1935         if (likely(sock != NULL)) {
1936                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1937                                     (struct p_header80 *)&p, sizeof(p), 0);
1938         }
1939
1940         mutex_unlock(&mdev->data.mutex);
1941
1942         drbd_state_unlock(mdev);
1943         return ok;
1944 }
1945
1946 int drbd_send_state_req(struct drbd_conf *mdev,
1947         union drbd_state mask, union drbd_state val)
1948 {
1949         struct p_req_state p;
1950
1951         p.mask    = cpu_to_be32(mask.i);
1952         p.val     = cpu_to_be32(val.i);
1953
1954         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1955                              (struct p_header80 *)&p, sizeof(p));
1956 }
1957
1958 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1959 {
1960         struct p_req_state_reply p;
1961
1962         p.retcode    = cpu_to_be32(retcode);
1963
1964         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1965                              (struct p_header80 *)&p, sizeof(p));
1966 }
1967
1968 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1969         struct p_compressed_bm *p,
1970         struct bm_xfer_ctx *c)
1971 {
1972         struct bitstream bs;
1973         unsigned long plain_bits;
1974         unsigned long tmp;
1975         unsigned long rl;
1976         unsigned len;
1977         unsigned toggle;
1978         int bits;
1979
1980         /* may we use this feature? */
1981         if ((mdev->sync_conf.use_rle == 0) ||
1982                 (mdev->agreed_pro_version < 90))
1983                         return 0;
1984
1985         if (c->bit_offset >= c->bm_bits)
1986                 return 0; /* nothing to do. */
1987
1988         /* use at most thus many bytes */
1989         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1990         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1991         /* plain bits covered in this code string */
1992         plain_bits = 0;
1993
1994         /* p->encoding & 0x80 stores whether the first run length is set.
1995          * bit offset is implicit.
1996          * start with toggle == 2 to be able to tell the first iteration */
1997         toggle = 2;
1998
1999         /* see how much plain bits we can stuff into one packet
2000          * using RLE and VLI. */
2001         do {
2002                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2003                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2004                 if (tmp == -1UL)
2005                         tmp = c->bm_bits;
2006                 rl = tmp - c->bit_offset;
2007
2008                 if (toggle == 2) { /* first iteration */
2009                         if (rl == 0) {
2010                                 /* the first checked bit was set,
2011                                  * store start value, */
2012                                 DCBP_set_start(p, 1);
2013                                 /* but skip encoding of zero run length */
2014                                 toggle = !toggle;
2015                                 continue;
2016                         }
2017                         DCBP_set_start(p, 0);
2018                 }
2019
2020                 /* paranoia: catch zero runlength.
2021                  * can only happen if bitmap is modified while we scan it. */
2022                 if (rl == 0) {
2023                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2024                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2025                         return -1;
2026                 }
2027
2028                 bits = vli_encode_bits(&bs, rl);
2029                 if (bits == -ENOBUFS) /* buffer full */
2030                         break;
2031                 if (bits <= 0) {
2032                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2033                         return 0;
2034                 }
2035
2036                 toggle = !toggle;
2037                 plain_bits += rl;
2038                 c->bit_offset = tmp;
2039         } while (c->bit_offset < c->bm_bits);
2040
2041         len = bs.cur.b - p->code + !!bs.cur.bit;
2042
2043         if (plain_bits < (len << 3)) {
2044                 /* incompressible with this method.
2045                  * we need to rewind both word and bit position. */
2046                 c->bit_offset -= plain_bits;
2047                 bm_xfer_ctx_bit_to_word_offset(c);
2048                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2049                 return 0;
2050         }
2051
2052         /* RLE + VLI was able to compress it just fine.
2053          * update c->word_offset. */
2054         bm_xfer_ctx_bit_to_word_offset(c);
2055
2056         /* store pad_bits */
2057         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2058
2059         return len;
2060 }
2061
2062 enum { OK, FAILED, DONE }
2063 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2064         struct p_header80 *h, struct bm_xfer_ctx *c)
2065 {
2066         struct p_compressed_bm *p = (void*)h;
2067         unsigned long num_words;
2068         int len;
2069         int ok;
2070
2071         len = fill_bitmap_rle_bits(mdev, p, c);
2072
2073         if (len < 0)
2074                 return FAILED;
2075
2076         if (len) {
2077                 DCBP_set_code(p, RLE_VLI_Bits);
2078                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2079                         sizeof(*p) + len, 0);
2080
2081                 c->packets[0]++;
2082                 c->bytes[0] += sizeof(*p) + len;
2083
2084                 if (c->bit_offset >= c->bm_bits)
2085                         len = 0; /* DONE */
2086         } else {
2087                 /* was not compressible.
2088                  * send a buffer full of plain text bits instead. */
2089                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2090                 len = num_words * sizeof(long);
2091                 if (len)
2092                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2093                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2094                                    h, sizeof(struct p_header80) + len, 0);
2095                 c->word_offset += num_words;
2096                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2097
2098                 c->packets[1]++;
2099                 c->bytes[1] += sizeof(struct p_header80) + len;
2100
2101                 if (c->bit_offset > c->bm_bits)
2102                         c->bit_offset = c->bm_bits;
2103         }
2104         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2105
2106         if (ok == DONE)
2107                 INFO_bm_xfer_stats(mdev, "send", c);
2108         return ok;
2109 }
2110
2111 /* See the comment at receive_bitmap() */
2112 int _drbd_send_bitmap(struct drbd_conf *mdev)
2113 {
2114         struct bm_xfer_ctx c;
2115         struct p_header80 *p;
2116         int ret;
2117
2118         ERR_IF(!mdev->bitmap) return FALSE;
2119
2120         /* maybe we should use some per thread scratch page,
2121          * and allocate that during initial device creation? */
2122         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2123         if (!p) {
2124                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2125                 return FALSE;
2126         }
2127
2128         if (get_ldev(mdev)) {
2129                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2130                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2131                         drbd_bm_set_all(mdev);
2132                         if (drbd_bm_write(mdev)) {
2133                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2134                                  * but otherwise process as per normal - need to tell other
2135                                  * side that a full resync is required! */
2136                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2137                         } else {
2138                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2139                                 drbd_md_sync(mdev);
2140                         }
2141                 }
2142                 put_ldev(mdev);
2143         }
2144
2145         c = (struct bm_xfer_ctx) {
2146                 .bm_bits = drbd_bm_bits(mdev),
2147                 .bm_words = drbd_bm_words(mdev),
2148         };
2149
2150         do {
2151                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2152         } while (ret == OK);
2153
2154         free_page((unsigned long) p);
2155         return (ret == DONE);
2156 }
2157
2158 int drbd_send_bitmap(struct drbd_conf *mdev)
2159 {
2160         int err;
2161
2162         if (!drbd_get_data_sock(mdev))
2163                 return -1;
2164         err = !_drbd_send_bitmap(mdev);
2165         drbd_put_data_sock(mdev);
2166         return err;
2167 }
2168
2169 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2170 {
2171         int ok;
2172         struct p_barrier_ack p;
2173
2174         p.barrier  = barrier_nr;
2175         p.set_size = cpu_to_be32(set_size);
2176
2177         if (mdev->state.conn < C_CONNECTED)
2178                 return FALSE;
2179         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2180                         (struct p_header80 *)&p, sizeof(p));
2181         return ok;
2182 }
2183
2184 /**
2185  * _drbd_send_ack() - Sends an ack packet
2186  * @mdev:       DRBD device.
2187  * @cmd:        Packet command code.
2188  * @sector:     sector, needs to be in big endian byte order
2189  * @blksize:    size in byte, needs to be in big endian byte order
2190  * @block_id:   Id, big endian byte order
2191  */
2192 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2193                           u64 sector,
2194                           u32 blksize,
2195                           u64 block_id)
2196 {
2197         int ok;
2198         struct p_block_ack p;
2199
2200         p.sector   = sector;
2201         p.block_id = block_id;
2202         p.blksize  = blksize;
2203         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2204
2205         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2206                 return FALSE;
2207         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2208                                 (struct p_header80 *)&p, sizeof(p));
2209         return ok;
2210 }
2211
2212 /* dp->sector and dp->block_id already/still in network byte order,
2213  * data_size is payload size according to dp->head,
2214  * and may need to be corrected for digest size. */
2215 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2216                      struct p_data *dp, int data_size)
2217 {
2218         data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2219                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2220         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2221                               dp->block_id);
2222 }
2223
2224 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2225                      struct p_block_req *rp)
2226 {
2227         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2228 }
2229
2230 /**
2231  * drbd_send_ack() - Sends an ack packet
2232  * @mdev:       DRBD device.
2233  * @cmd:        Packet command code.
2234  * @e:          Epoch entry.
2235  */
2236 int drbd_send_ack(struct drbd_conf *mdev,
2237         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2238 {
2239         return _drbd_send_ack(mdev, cmd,
2240                               cpu_to_be64(e->sector),
2241                               cpu_to_be32(e->size),
2242                               e->block_id);
2243 }
2244
2245 /* This function misuses the block_id field to signal if the blocks
2246  * are is sync or not. */
2247 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2248                      sector_t sector, int blksize, u64 block_id)
2249 {
2250         return _drbd_send_ack(mdev, cmd,
2251                               cpu_to_be64(sector),
2252                               cpu_to_be32(blksize),
2253                               cpu_to_be64(block_id));
2254 }
2255
2256 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2257                        sector_t sector, int size, u64 block_id)
2258 {
2259         int ok;
2260         struct p_block_req p;
2261
2262         p.sector   = cpu_to_be64(sector);
2263         p.block_id = block_id;
2264         p.blksize  = cpu_to_be32(size);
2265
2266         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2267                                 (struct p_header80 *)&p, sizeof(p));
2268         return ok;
2269 }
2270
2271 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2272                             sector_t sector, int size,
2273                             void *digest, int digest_size,
2274                             enum drbd_packets cmd)
2275 {
2276         int ok;
2277         struct p_block_req p;
2278
2279         p.sector   = cpu_to_be64(sector);
2280         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2281         p.blksize  = cpu_to_be32(size);
2282
2283         p.head.magic   = BE_DRBD_MAGIC;
2284         p.head.command = cpu_to_be16(cmd);
2285         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2286
2287         mutex_lock(&mdev->data.mutex);
2288
2289         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2290         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2291
2292         mutex_unlock(&mdev->data.mutex);
2293
2294         return ok;
2295 }
2296
2297 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2298 {
2299         int ok;
2300         struct p_block_req p;
2301
2302         p.sector   = cpu_to_be64(sector);
2303         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2304         p.blksize  = cpu_to_be32(size);
2305
2306         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2307                            (struct p_header80 *)&p, sizeof(p));
2308         return ok;
2309 }
2310
2311 /* called on sndtimeo
2312  * returns FALSE if we should retry,
2313  * TRUE if we think connection is dead
2314  */
2315 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2316 {
2317         int drop_it;
2318         /* long elapsed = (long)(jiffies - mdev->last_received); */
2319
2320         drop_it =   mdev->meta.socket == sock
2321                 || !mdev->asender.task
2322                 || get_t_state(&mdev->asender) != Running
2323                 || mdev->state.conn < C_CONNECTED;
2324
2325         if (drop_it)
2326                 return TRUE;
2327
2328         drop_it = !--mdev->ko_count;
2329         if (!drop_it) {
2330                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2331                        current->comm, current->pid, mdev->ko_count);
2332                 request_ping(mdev);
2333         }
2334
2335         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2336 }
2337
2338 /* The idea of sendpage seems to be to put some kind of reference
2339  * to the page into the skb, and to hand it over to the NIC. In
2340  * this process get_page() gets called.
2341  *
2342  * As soon as the page was really sent over the network put_page()
2343  * gets called by some part of the network layer. [ NIC driver? ]
2344  *
2345  * [ get_page() / put_page() increment/decrement the count. If count
2346  *   reaches 0 the page will be freed. ]
2347  *
2348  * This works nicely with pages from FSs.
2349  * But this means that in protocol A we might signal IO completion too early!
2350  *
2351  * In order not to corrupt data during a resync we must make sure
2352  * that we do not reuse our own buffer pages (EEs) to early, therefore
2353  * we have the net_ee list.
2354  *
2355  * XFS seems to have problems, still, it submits pages with page_count == 0!
2356  * As a workaround, we disable sendpage on pages
2357  * with page_count == 0 or PageSlab.
2358  */
2359 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2360                    int offset, size_t size, unsigned msg_flags)
2361 {
2362         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2363         kunmap(page);
2364         if (sent == size)
2365                 mdev->send_cnt += size>>9;
2366         return sent == size;
2367 }
2368
2369 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2370                     int offset, size_t size, unsigned msg_flags)
2371 {
2372         mm_segment_t oldfs = get_fs();
2373         int sent, ok;
2374         int len = size;
2375
2376         /* e.g. XFS meta- & log-data is in slab pages, which have a
2377          * page_count of 0 and/or have PageSlab() set.
2378          * we cannot use send_page for those, as that does get_page();
2379          * put_page(); and would cause either a VM_BUG directly, or
2380          * __page_cache_release a page that would actually still be referenced
2381          * by someone, leading to some obscure delayed Oops somewhere else. */
2382         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2383                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2384
2385         msg_flags |= MSG_NOSIGNAL;
2386         drbd_update_congested(mdev);
2387         set_fs(KERNEL_DS);
2388         do {
2389                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2390                                                         offset, len,
2391                                                         msg_flags);
2392                 if (sent == -EAGAIN) {
2393                         if (we_should_drop_the_connection(mdev,
2394                                                           mdev->data.socket))
2395                                 break;
2396                         else
2397                                 continue;
2398                 }
2399                 if (sent <= 0) {
2400                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2401                              __func__, (int)size, len, sent);
2402                         break;
2403                 }
2404                 len    -= sent;
2405                 offset += sent;
2406         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2407         set_fs(oldfs);
2408         clear_bit(NET_CONGESTED, &mdev->flags);
2409
2410         ok = (len == 0);
2411         if (likely(ok))
2412                 mdev->send_cnt += size>>9;
2413         return ok;
2414 }
2415
2416 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2417 {
2418         struct bio_vec *bvec;
2419         int i;
2420         /* hint all but last page with MSG_MORE */
2421         __bio_for_each_segment(bvec, bio, i, 0) {
2422                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2423                                      bvec->bv_offset, bvec->bv_len,
2424                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2425                         return 0;
2426         }
2427         return 1;
2428 }
2429
2430 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2431 {
2432         struct bio_vec *bvec;
2433         int i;
2434         /* hint all but last page with MSG_MORE */
2435         __bio_for_each_segment(bvec, bio, i, 0) {
2436                 if (!_drbd_send_page(mdev, bvec->bv_page,
2437                                      bvec->bv_offset, bvec->bv_len,
2438                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2439                         return 0;
2440         }
2441         return 1;
2442 }
2443
2444 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2445 {
2446         struct page *page = e->pages;
2447         unsigned len = e->size;
2448         /* hint all but last page with MSG_MORE */
2449         page_chain_for_each(page) {
2450                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2451                 if (!_drbd_send_page(mdev, page, 0, l,
2452                                 page_chain_next(page) ? MSG_MORE : 0))
2453                         return 0;
2454                 len -= l;
2455         }
2456         return 1;
2457 }
2458
2459 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2460 {
2461         if (mdev->agreed_pro_version >= 95)
2462                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2463                         (bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
2464                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2465                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2466                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2467         else
2468                 return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
2469 }
2470
2471 /* Used to send write requests
2472  * R_PRIMARY -> Peer    (P_DATA)
2473  */
2474 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2475 {
2476         int ok = 1;
2477         struct p_data p;
2478         unsigned int dp_flags = 0;
2479         void *dgb;
2480         int dgs;
2481
2482         if (!drbd_get_data_sock(mdev))
2483                 return 0;
2484
2485         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2486                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2487
2488         if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2489                 p.head.h80.magic   = BE_DRBD_MAGIC;
2490                 p.head.h80.command = cpu_to_be16(P_DATA);
2491                 p.head.h80.length  =
2492                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2493         } else {
2494                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2495                 p.head.h95.command = cpu_to_be16(P_DATA);
2496                 p.head.h95.length  =
2497                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2498         }
2499
2500         p.sector   = cpu_to_be64(req->sector);
2501         p.block_id = (unsigned long)req;
2502         p.seq_num  = cpu_to_be32(req->seq_num =
2503                                  atomic_add_return(1, &mdev->packet_seq));
2504
2505         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2506
2507         if (mdev->state.conn >= C_SYNC_SOURCE &&
2508             mdev->state.conn <= C_PAUSED_SYNC_T)
2509                 dp_flags |= DP_MAY_SET_IN_SYNC;
2510
2511         p.dp_flags = cpu_to_be32(dp_flags);
2512         set_bit(UNPLUG_REMOTE, &mdev->flags);
2513         ok = (sizeof(p) ==
2514                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2515         if (ok && dgs) {
2516                 dgb = mdev->int_dig_out;
2517                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2518                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2519         }
2520         if (ok) {
2521                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2522                         ok = _drbd_send_bio(mdev, req->master_bio);
2523                 else
2524                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2525         }
2526
2527         drbd_put_data_sock(mdev);
2528
2529         return ok;
2530 }
2531
2532 /* answer packet, used to send data back for read requests:
2533  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2534  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2535  */
2536 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2537                     struct drbd_epoch_entry *e)
2538 {
2539         int ok;
2540         struct p_data p;
2541         void *dgb;
2542         int dgs;
2543
2544         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2545                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2546
2547         if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2548                 p.head.h80.magic   = BE_DRBD_MAGIC;
2549                 p.head.h80.command = cpu_to_be16(cmd);
2550                 p.head.h80.length  =
2551                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2552         } else {
2553                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2554                 p.head.h95.command = cpu_to_be16(cmd);
2555                 p.head.h95.length  =
2556                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2557         }
2558
2559         p.sector   = cpu_to_be64(e->sector);
2560         p.block_id = e->block_id;
2561         /* p.seq_num  = 0;    No sequence numbers here.. */
2562
2563         /* Only called by our kernel thread.
2564          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2565          * in response to admin command or module unload.
2566          */
2567         if (!drbd_get_data_sock(mdev))
2568                 return 0;
2569
2570         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2571         if (ok && dgs) {
2572                 dgb = mdev->int_dig_out;
2573                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2574                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2575         }
2576         if (ok)
2577                 ok = _drbd_send_zc_ee(mdev, e);
2578
2579         drbd_put_data_sock(mdev);
2580
2581         return ok;
2582 }
2583
2584 /*
2585   drbd_send distinguishes two cases:
2586
2587   Packets sent via the data socket "sock"
2588   and packets sent via the meta data socket "msock"
2589
2590                     sock                      msock
2591   -----------------+-------------------------+------------------------------
2592   timeout           conf.timeout / 2          conf.timeout / 2
2593   timeout action    send a ping via msock     Abort communication
2594                                               and close all sockets
2595 */
2596
2597 /*
2598  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2599  */
2600 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2601               void *buf, size_t size, unsigned msg_flags)
2602 {
2603         struct kvec iov;
2604         struct msghdr msg;
2605         int rv, sent = 0;
2606
2607         if (!sock)
2608                 return -1000;
2609
2610         /* THINK  if (signal_pending) return ... ? */
2611
2612         iov.iov_base = buf;
2613         iov.iov_len  = size;
2614
2615         msg.msg_name       = NULL;
2616         msg.msg_namelen    = 0;
2617         msg.msg_control    = NULL;
2618         msg.msg_controllen = 0;
2619         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2620
2621         if (sock == mdev->data.socket) {
2622                 mdev->ko_count = mdev->net_conf->ko_count;
2623                 drbd_update_congested(mdev);
2624         }
2625         do {
2626                 /* STRANGE
2627                  * tcp_sendmsg does _not_ use its size parameter at all ?
2628                  *
2629                  * -EAGAIN on timeout, -EINTR on signal.
2630                  */
2631 /* THINK
2632  * do we need to block DRBD_SIG if sock == &meta.socket ??
2633  * otherwise wake_asender() might interrupt some send_*Ack !
2634  */
2635                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2636                 if (rv == -EAGAIN) {
2637                         if (we_should_drop_the_connection(mdev, sock))
2638                                 break;
2639                         else
2640                                 continue;
2641                 }
2642                 D_ASSERT(rv != 0);
2643                 if (rv == -EINTR) {
2644                         flush_signals(current);
2645                         rv = 0;
2646                 }
2647                 if (rv < 0)
2648                         break;
2649                 sent += rv;
2650                 iov.iov_base += rv;
2651                 iov.iov_len  -= rv;
2652         } while (sent < size);
2653
2654         if (sock == mdev->data.socket)
2655                 clear_bit(NET_CONGESTED, &mdev->flags);
2656
2657         if (rv <= 0) {
2658                 if (rv != -EAGAIN) {
2659                         dev_err(DEV, "%s_sendmsg returned %d\n",
2660                             sock == mdev->meta.socket ? "msock" : "sock",
2661                             rv);
2662                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2663                 } else
2664                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2665         }
2666
2667         return sent;
2668 }
2669
2670 static int drbd_open(struct block_device *bdev, fmode_t mode)
2671 {
2672         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2673         unsigned long flags;
2674         int rv = 0;
2675
2676         mutex_lock(&drbd_main_mutex);
2677         spin_lock_irqsave(&mdev->req_lock, flags);
2678         /* to have a stable mdev->state.role
2679          * and no race with updating open_cnt */
2680
2681         if (mdev->state.role != R_PRIMARY) {
2682                 if (mode & FMODE_WRITE)
2683                         rv = -EROFS;
2684                 else if (!allow_oos)
2685                         rv = -EMEDIUMTYPE;
2686         }
2687
2688         if (!rv)
2689                 mdev->open_cnt++;
2690         spin_unlock_irqrestore(&mdev->req_lock, flags);
2691         mutex_unlock(&drbd_main_mutex);
2692
2693         return rv;
2694 }
2695
2696 static int drbd_release(struct gendisk *gd, fmode_t mode)
2697 {
2698         struct drbd_conf *mdev = gd->private_data;
2699         mutex_lock(&drbd_main_mutex);
2700         mdev->open_cnt--;
2701         mutex_unlock(&drbd_main_mutex);
2702         return 0;
2703 }
2704
2705 static void drbd_unplug_fn(struct request_queue *q)
2706 {
2707         struct drbd_conf *mdev = q->queuedata;
2708
2709         /* unplug FIRST */
2710         spin_lock_irq(q->queue_lock);
2711         blk_remove_plug(q);
2712         spin_unlock_irq(q->queue_lock);
2713
2714         /* only if connected */
2715         spin_lock_irq(&mdev->req_lock);
2716         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2717                 D_ASSERT(mdev->state.role == R_PRIMARY);
2718                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2719                         /* add to the data.work queue,
2720                          * unless already queued.
2721                          * XXX this might be a good addition to drbd_queue_work
2722                          * anyways, to detect "double queuing" ... */
2723                         if (list_empty(&mdev->unplug_work.list))
2724                                 drbd_queue_work(&mdev->data.work,
2725                                                 &mdev->unplug_work);
2726                 }
2727         }
2728         spin_unlock_irq(&mdev->req_lock);
2729
2730         if (mdev->state.disk >= D_INCONSISTENT)
2731                 drbd_kick_lo(mdev);
2732 }
2733
2734 static void drbd_set_defaults(struct drbd_conf *mdev)
2735 {
2736         /* This way we get a compile error when sync_conf grows,
2737            and we forgot to initialize it here */
2738         mdev->sync_conf = (struct syncer_conf) {
2739                 /* .rate = */           DRBD_RATE_DEF,
2740                 /* .after = */          DRBD_AFTER_DEF,
2741                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2742                 /* .verify_alg = */     {}, 0,
2743                 /* .cpu_mask = */       {}, 0,
2744                 /* .csums_alg = */      {}, 0,
2745                 /* .use_rle = */        0,
2746                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2747                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2748                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2749                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2750                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2751                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2752         };
2753
2754         /* Have to use that way, because the layout differs between
2755            big endian and little endian */
2756         mdev->state = (union drbd_state) {
2757                 { .role = R_SECONDARY,
2758                   .peer = R_UNKNOWN,
2759                   .conn = C_STANDALONE,
2760                   .disk = D_DISKLESS,
2761                   .pdsk = D_UNKNOWN,
2762                   .susp = 0,
2763                   .susp_nod = 0,
2764                   .susp_fen = 0
2765                 } };
2766 }
2767
2768 void drbd_init_set_defaults(struct drbd_conf *mdev)
2769 {
2770         /* the memset(,0,) did most of this.
2771          * note: only assignments, no allocation in here */
2772
2773         drbd_set_defaults(mdev);
2774
2775         /* for now, we do NOT yet support it,
2776          * even though we start some framework
2777          * to eventually support barriers */
2778         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2779
2780         atomic_set(&mdev->ap_bio_cnt, 0);
2781         atomic_set(&mdev->ap_pending_cnt, 0);
2782         atomic_set(&mdev->rs_pending_cnt, 0);
2783         atomic_set(&mdev->unacked_cnt, 0);
2784         atomic_set(&mdev->local_cnt, 0);
2785         atomic_set(&mdev->net_cnt, 0);
2786         atomic_set(&mdev->packet_seq, 0);
2787         atomic_set(&mdev->pp_in_use, 0);
2788         atomic_set(&mdev->pp_in_use_by_net, 0);
2789         atomic_set(&mdev->rs_sect_in, 0);
2790         atomic_set(&mdev->rs_sect_ev, 0);
2791
2792         mutex_init(&mdev->md_io_mutex);
2793         mutex_init(&mdev->data.mutex);
2794         mutex_init(&mdev->meta.mutex);
2795         sema_init(&mdev->data.work.s, 0);
2796         sema_init(&mdev->meta.work.s, 0);
2797         mutex_init(&mdev->state_mutex);
2798
2799         spin_lock_init(&mdev->data.work.q_lock);
2800         spin_lock_init(&mdev->meta.work.q_lock);
2801
2802         spin_lock_init(&mdev->al_lock);
2803         spin_lock_init(&mdev->req_lock);
2804         spin_lock_init(&mdev->peer_seq_lock);
2805         spin_lock_init(&mdev->epoch_lock);
2806
2807         INIT_LIST_HEAD(&mdev->active_ee);
2808         INIT_LIST_HEAD(&mdev->sync_ee);
2809         INIT_LIST_HEAD(&mdev->done_ee);
2810         INIT_LIST_HEAD(&mdev->read_ee);
2811         INIT_LIST_HEAD(&mdev->net_ee);
2812         INIT_LIST_HEAD(&mdev->resync_reads);
2813         INIT_LIST_HEAD(&mdev->data.work.q);
2814         INIT_LIST_HEAD(&mdev->meta.work.q);
2815         INIT_LIST_HEAD(&mdev->resync_work.list);
2816         INIT_LIST_HEAD(&mdev->unplug_work.list);
2817         INIT_LIST_HEAD(&mdev->go_diskless.list);
2818         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2819         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2820
2821         mdev->resync_work.cb  = w_resync_inactive;
2822         mdev->unplug_work.cb  = w_send_write_hint;
2823         mdev->go_diskless.cb  = w_go_diskless;
2824         mdev->md_sync_work.cb = w_md_sync;
2825         mdev->bm_io_work.w.cb = w_bitmap_io;
2826         init_timer(&mdev->resync_timer);
2827         init_timer(&mdev->md_sync_timer);
2828         mdev->resync_timer.function = resync_timer_fn;
2829         mdev->resync_timer.data = (unsigned long) mdev;
2830         mdev->md_sync_timer.function = md_sync_timer_fn;
2831         mdev->md_sync_timer.data = (unsigned long) mdev;
2832
2833         init_waitqueue_head(&mdev->misc_wait);
2834         init_waitqueue_head(&mdev->state_wait);
2835         init_waitqueue_head(&mdev->net_cnt_wait);
2836         init_waitqueue_head(&mdev->ee_wait);
2837         init_waitqueue_head(&mdev->al_wait);
2838         init_waitqueue_head(&mdev->seq_wait);
2839
2840         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2841         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2842         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2843
2844         mdev->agreed_pro_version = PRO_VERSION_MAX;
2845         mdev->write_ordering = WO_bio_barrier;
2846         mdev->resync_wenr = LC_FREE;
2847 }
2848
2849 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2850 {
2851         int i;
2852         if (mdev->receiver.t_state != None)
2853                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2854                                 mdev->receiver.t_state);
2855
2856         /* no need to lock it, I'm the only thread alive */
2857         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2858                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2859         mdev->al_writ_cnt  =
2860         mdev->bm_writ_cnt  =
2861         mdev->read_cnt     =
2862         mdev->recv_cnt     =
2863         mdev->send_cnt     =
2864         mdev->writ_cnt     =
2865         mdev->p_size       =
2866         mdev->rs_start     =
2867         mdev->rs_total     =
2868         mdev->rs_failed    = 0;
2869         mdev->rs_last_events = 0;
2870         mdev->rs_last_sect_ev = 0;
2871         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2872                 mdev->rs_mark_left[i] = 0;
2873                 mdev->rs_mark_time[i] = 0;
2874         }
2875         D_ASSERT(mdev->net_conf == NULL);
2876
2877         drbd_set_my_capacity(mdev, 0);
2878         if (mdev->bitmap) {
2879                 /* maybe never allocated. */
2880                 drbd_bm_resize(mdev, 0, 1);
2881                 drbd_bm_cleanup(mdev);
2882         }
2883
2884         drbd_free_resources(mdev);
2885         clear_bit(AL_SUSPENDED, &mdev->flags);
2886
2887         /*
2888          * currently we drbd_init_ee only on module load, so
2889          * we may do drbd_release_ee only on module unload!
2890          */
2891         D_ASSERT(list_empty(&mdev->active_ee));
2892         D_ASSERT(list_empty(&mdev->sync_ee));
2893         D_ASSERT(list_empty(&mdev->done_ee));
2894         D_ASSERT(list_empty(&mdev->read_ee));
2895         D_ASSERT(list_empty(&mdev->net_ee));
2896         D_ASSERT(list_empty(&mdev->resync_reads));
2897         D_ASSERT(list_empty(&mdev->data.work.q));
2898         D_ASSERT(list_empty(&mdev->meta.work.q));
2899         D_ASSERT(list_empty(&mdev->resync_work.list));
2900         D_ASSERT(list_empty(&mdev->unplug_work.list));
2901         D_ASSERT(list_empty(&mdev->go_diskless.list));
2902
2903 }
2904
2905
2906 static void drbd_destroy_mempools(void)
2907 {
2908         struct page *page;
2909
2910         while (drbd_pp_pool) {
2911                 page = drbd_pp_pool;
2912                 drbd_pp_pool = (struct page *)page_private(page);
2913                 __free_page(page);
2914                 drbd_pp_vacant--;
2915         }
2916
2917         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2918
2919         if (drbd_ee_mempool)
2920                 mempool_destroy(drbd_ee_mempool);
2921         if (drbd_request_mempool)
2922                 mempool_destroy(drbd_request_mempool);
2923         if (drbd_ee_cache)
2924                 kmem_cache_destroy(drbd_ee_cache);
2925         if (drbd_request_cache)
2926                 kmem_cache_destroy(drbd_request_cache);
2927         if (drbd_bm_ext_cache)
2928                 kmem_cache_destroy(drbd_bm_ext_cache);
2929         if (drbd_al_ext_cache)
2930                 kmem_cache_destroy(drbd_al_ext_cache);
2931
2932         drbd_ee_mempool      = NULL;
2933         drbd_request_mempool = NULL;
2934         drbd_ee_cache        = NULL;
2935         drbd_request_cache   = NULL;
2936         drbd_bm_ext_cache    = NULL;
2937         drbd_al_ext_cache    = NULL;
2938
2939         return;
2940 }
2941
2942 static int drbd_create_mempools(void)
2943 {
2944         struct page *page;
2945         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2946         int i;
2947
2948         /* prepare our caches and mempools */
2949         drbd_request_mempool = NULL;
2950         drbd_ee_cache        = NULL;
2951         drbd_request_cache   = NULL;
2952         drbd_bm_ext_cache    = NULL;
2953         drbd_al_ext_cache    = NULL;
2954         drbd_pp_pool         = NULL;
2955
2956         /* caches */
2957         drbd_request_cache = kmem_cache_create(
2958                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2959         if (drbd_request_cache == NULL)
2960                 goto Enomem;
2961
2962         drbd_ee_cache = kmem_cache_create(
2963                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2964         if (drbd_ee_cache == NULL)
2965                 goto Enomem;
2966
2967         drbd_bm_ext_cache = kmem_cache_create(
2968                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2969         if (drbd_bm_ext_cache == NULL)
2970                 goto Enomem;
2971
2972         drbd_al_ext_cache = kmem_cache_create(
2973                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2974         if (drbd_al_ext_cache == NULL)
2975                 goto Enomem;
2976
2977         /* mempools */
2978         drbd_request_mempool = mempool_create(number,
2979                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2980         if (drbd_request_mempool == NULL)
2981                 goto Enomem;
2982
2983         drbd_ee_mempool = mempool_create(number,
2984                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2985         if (drbd_ee_mempool == NULL)
2986                 goto Enomem;
2987
2988         /* drbd's page pool */
2989         spin_lock_init(&drbd_pp_lock);
2990
2991         for (i = 0; i < number; i++) {
2992                 page = alloc_page(GFP_HIGHUSER);
2993                 if (!page)
2994                         goto Enomem;
2995                 set_page_private(page, (unsigned long)drbd_pp_pool);
2996                 drbd_pp_pool = page;
2997         }
2998         drbd_pp_vacant = number;
2999
3000         return 0;
3001
3002 Enomem:
3003         drbd_destroy_mempools(); /* in case we allocated some */
3004         return -ENOMEM;
3005 }
3006
3007 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3008         void *unused)
3009 {
3010         /* just so we have it.  you never know what interesting things we
3011          * might want to do here some day...
3012          */
3013
3014         return NOTIFY_DONE;
3015 }
3016
3017 static struct notifier_block drbd_notifier = {
3018         .notifier_call = drbd_notify_sys,
3019 };
3020
3021 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3022 {
3023         int rr;
3024
3025         rr = drbd_release_ee(mdev, &mdev->active_ee);
3026         if (rr)
3027                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3028
3029         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3030         if (rr)
3031                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3032
3033         rr = drbd_release_ee(mdev, &mdev->read_ee);
3034         if (rr)
3035                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3036
3037         rr = drbd_release_ee(mdev, &mdev->done_ee);
3038         if (rr)
3039                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3040
3041         rr = drbd_release_ee(mdev, &mdev->net_ee);
3042         if (rr)
3043                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3044 }
3045
3046 /* caution. no locking.
3047  * currently only used from module cleanup code. */
3048 static void drbd_delete_device(unsigned int minor)
3049 {
3050         struct drbd_conf *mdev = minor_to_mdev(minor);
3051
3052         if (!mdev)
3053                 return;
3054
3055         /* paranoia asserts */
3056         if (mdev->open_cnt != 0)
3057                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3058                                 __FILE__ , __LINE__);
3059
3060         ERR_IF (!list_empty(&mdev->data.work.q)) {
3061                 struct list_head *lp;
3062                 list_for_each(lp, &mdev->data.work.q) {
3063                         dev_err(DEV, "lp = %p\n", lp);
3064                 }
3065         };
3066         /* end paranoia asserts */
3067
3068         del_gendisk(mdev->vdisk);
3069
3070         /* cleanup stuff that may have been allocated during
3071          * device (re-)configuration or state changes */
3072
3073         if (mdev->this_bdev)
3074                 bdput(mdev->this_bdev);
3075
3076         drbd_free_resources(mdev);
3077
3078         drbd_release_ee_lists(mdev);
3079
3080         /* should be free'd on disconnect? */
3081         kfree(mdev->ee_hash);
3082         /*
3083         mdev->ee_hash_s = 0;
3084         mdev->ee_hash = NULL;
3085         */
3086
3087         lc_destroy(mdev->act_log);
3088         lc_destroy(mdev->resync);
3089
3090         kfree(mdev->p_uuid);
3091         /* mdev->p_uuid = NULL; */
3092
3093         kfree(mdev->int_dig_out);
3094         kfree(mdev->int_dig_in);
3095         kfree(mdev->int_dig_vv);
3096
3097         /* cleanup the rest that has been
3098          * allocated from drbd_new_device
3099          * and actually free the mdev itself */
3100         drbd_free_mdev(mdev);
3101 }
3102
3103 static void drbd_cleanup(void)
3104 {
3105         unsigned int i;
3106
3107         unregister_reboot_notifier(&drbd_notifier);
3108
3109         drbd_nl_cleanup();
3110
3111         if (minor_table) {
3112                 if (drbd_proc)
3113                         remove_proc_entry("drbd", NULL);
3114                 i = minor_count;
3115                 while (i--)
3116                         drbd_delete_device(i);
3117                 drbd_destroy_mempools();
3118         }
3119
3120         kfree(minor_table);
3121
3122         unregister_blkdev(DRBD_MAJOR, "drbd");
3123
3124         printk(KERN_INFO "drbd: module cleanup done.\n");
3125 }
3126
3127 /**
3128  * drbd_congested() - Callback for pdflush
3129  * @congested_data:     User data
3130  * @bdi_bits:           Bits pdflush is currently interested in
3131  *
3132  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3133  */
3134 static int drbd_congested(void *congested_data, int bdi_bits)
3135 {
3136         struct drbd_conf *mdev = congested_data;
3137         struct request_queue *q;
3138         char reason = '-';
3139         int r = 0;
3140
3141         if (!__inc_ap_bio_cond(mdev)) {
3142                 /* DRBD has frozen IO */
3143                 r = bdi_bits;
3144                 reason = 'd';
3145                 goto out;
3146         }
3147
3148         if (get_ldev(mdev)) {
3149                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3150                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3151                 put_ldev(mdev);
3152                 if (r)
3153                         reason = 'b';
3154         }
3155
3156         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3157                 r |= (1 << BDI_async_congested);
3158                 reason = reason == 'b' ? 'a' : 'n';
3159         }
3160
3161 out:
3162         mdev->congestion_reason = reason;
3163         return r;
3164 }
3165
3166 struct drbd_conf *drbd_new_device(unsigned int minor)
3167 {
3168         struct drbd_conf *mdev;
3169         struct gendisk *disk;
3170         struct request_queue *q;
3171
3172         /* GFP_KERNEL, we are outside of all write-out paths */
3173         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3174         if (!mdev)
3175                 return NULL;
3176         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3177                 goto out_no_cpumask;
3178
3179         mdev->minor = minor;
3180
3181         drbd_init_set_defaults(mdev);
3182
3183         q = blk_alloc_queue(GFP_KERNEL);
3184         if (!q)
3185                 goto out_no_q;
3186         mdev->rq_queue = q;
3187         q->queuedata   = mdev;
3188
3189         disk = alloc_disk(1);
3190         if (!disk)
3191                 goto out_no_disk;
3192         mdev->vdisk = disk;
3193
3194         set_disk_ro(disk, TRUE);
3195
3196         disk->queue = q;
3197         disk->major = DRBD_MAJOR;
3198         disk->first_minor = minor;
3199         disk->fops = &drbd_ops;
3200         sprintf(disk->disk_name, "drbd%d", minor);
3201         disk->private_data = mdev;
3202
3203         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3204         /* we have no partitions. we contain only ourselves. */
3205         mdev->this_bdev->bd_contains = mdev->this_bdev;
3206
3207         q->backing_dev_info.congested_fn = drbd_congested;
3208         q->backing_dev_info.congested_data = mdev;
3209
3210         blk_queue_make_request(q, drbd_make_request_26);
3211         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3212         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3213         blk_queue_merge_bvec(q, drbd_merge_bvec);
3214         q->queue_lock = &mdev->req_lock; /* needed since we use */
3215                 /* plugging on a queue, that actually has no requests! */
3216         q->unplug_fn = drbd_unplug_fn;
3217
3218         mdev->md_io_page = alloc_page(GFP_KERNEL);
3219         if (!mdev->md_io_page)
3220                 goto out_no_io_page;
3221
3222         if (drbd_bm_init(mdev))
3223                 goto out_no_bitmap;
3224         /* no need to lock access, we are still initializing this minor device. */
3225         if (!tl_init(mdev))
3226                 goto out_no_tl;
3227
3228         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3229         if (!mdev->app_reads_hash)
3230                 goto out_no_app_reads;
3231
3232         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3233         if (!mdev->current_epoch)
3234                 goto out_no_epoch;
3235
3236         INIT_LIST_HEAD(&mdev->current_epoch->list);
3237         mdev->epochs = 1;
3238
3239         return mdev;
3240
3241 /* out_whatever_else:
3242         kfree(mdev->current_epoch); */
3243 out_no_epoch:
3244         kfree(mdev->app_reads_hash);
3245 out_no_app_reads:
3246         tl_cleanup(mdev);
3247 out_no_tl:
3248         drbd_bm_cleanup(mdev);
3249 out_no_bitmap:
3250         __free_page(mdev->md_io_page);
3251 out_no_io_page:
3252         put_disk(disk);
3253 out_no_disk:
3254         blk_cleanup_queue(q);
3255 out_no_q:
3256         free_cpumask_var(mdev->cpu_mask);
3257 out_no_cpumask:
3258         kfree(mdev);
3259         return NULL;
3260 }
3261
3262 /* counterpart of drbd_new_device.
3263  * last part of drbd_delete_device. */
3264 void drbd_free_mdev(struct drbd_conf *mdev)
3265 {
3266         kfree(mdev->current_epoch);
3267         kfree(mdev->app_reads_hash);
3268         tl_cleanup(mdev);
3269         if (mdev->bitmap) /* should no longer be there. */
3270                 drbd_bm_cleanup(mdev);
3271         __free_page(mdev->md_io_page);
3272         put_disk(mdev->vdisk);
3273         blk_cleanup_queue(mdev->rq_queue);
3274         free_cpumask_var(mdev->cpu_mask);
3275         kfree(mdev);
3276 }
3277
3278
3279 int __init drbd_init(void)
3280 {
3281         int err;
3282
3283         if (sizeof(struct p_handshake) != 80) {
3284                 printk(KERN_ERR
3285                        "drbd: never change the size or layout "
3286                        "of the HandShake packet.\n");
3287                 return -EINVAL;
3288         }
3289
3290         if (1 > minor_count || minor_count > 255) {
3291                 printk(KERN_ERR
3292                         "drbd: invalid minor_count (%d)\n", minor_count);
3293 #ifdef MODULE
3294                 return -EINVAL;
3295 #else
3296                 minor_count = 8;
3297 #endif
3298         }
3299
3300         err = drbd_nl_init();
3301         if (err)
3302                 return err;
3303
3304         err = register_blkdev(DRBD_MAJOR, "drbd");
3305         if (err) {
3306                 printk(KERN_ERR
3307                        "drbd: unable to register block device major %d\n",
3308                        DRBD_MAJOR);
3309                 return err;
3310         }
3311
3312         register_reboot_notifier(&drbd_notifier);
3313
3314         /*
3315          * allocate all necessary structs
3316          */
3317         err = -ENOMEM;
3318
3319         init_waitqueue_head(&drbd_pp_wait);
3320
3321         drbd_proc = NULL; /* play safe for drbd_cleanup */
3322         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3323                                 GFP_KERNEL);
3324         if (!minor_table)
3325                 goto Enomem;
3326
3327         err = drbd_create_mempools();
3328         if (err)
3329                 goto Enomem;
3330
3331         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3332         if (!drbd_proc) {
3333                 printk(KERN_ERR "drbd: unable to register proc file\n");
3334                 goto Enomem;
3335         }
3336
3337         rwlock_init(&global_state_lock);
3338
3339         printk(KERN_INFO "drbd: initialized. "
3340                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3341                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3342         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3343         printk(KERN_INFO "drbd: registered as block device major %d\n",
3344                 DRBD_MAJOR);
3345         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3346
3347         return 0; /* Success! */
3348
3349 Enomem:
3350         drbd_cleanup();
3351         if (err == -ENOMEM)
3352                 /* currently always the case */
3353                 printk(KERN_ERR "drbd: ran out of memory\n");
3354         else
3355                 printk(KERN_ERR "drbd: initialization failure\n");
3356         return err;
3357 }
3358
3359 void drbd_free_bc(struct drbd_backing_dev *ldev)
3360 {
3361         if (ldev == NULL)
3362                 return;
3363
3364         bd_release(ldev->backing_bdev);
3365         bd_release(ldev->md_bdev);
3366
3367         fput(ldev->lo_file);
3368         fput(ldev->md_file);
3369
3370         kfree(ldev);
3371 }
3372
3373 void drbd_free_sock(struct drbd_conf *mdev)
3374 {
3375         if (mdev->data.socket) {
3376                 mutex_lock(&mdev->data.mutex);
3377                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3378                 sock_release(mdev->data.socket);
3379                 mdev->data.socket = NULL;
3380                 mutex_unlock(&mdev->data.mutex);
3381         }
3382         if (mdev->meta.socket) {
3383                 mutex_lock(&mdev->meta.mutex);
3384                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3385                 sock_release(mdev->meta.socket);
3386                 mdev->meta.socket = NULL;
3387                 mutex_unlock(&mdev->meta.mutex);
3388         }
3389 }
3390
3391
3392 void drbd_free_resources(struct drbd_conf *mdev)
3393 {
3394         crypto_free_hash(mdev->csums_tfm);
3395         mdev->csums_tfm = NULL;
3396         crypto_free_hash(mdev->verify_tfm);
3397         mdev->verify_tfm = NULL;
3398         crypto_free_hash(mdev->cram_hmac_tfm);
3399         mdev->cram_hmac_tfm = NULL;
3400         crypto_free_hash(mdev->integrity_w_tfm);
3401         mdev->integrity_w_tfm = NULL;
3402         crypto_free_hash(mdev->integrity_r_tfm);
3403         mdev->integrity_r_tfm = NULL;
3404
3405         drbd_free_sock(mdev);
3406
3407         __no_warn(local,
3408                   drbd_free_bc(mdev->ldev);
3409                   mdev->ldev = NULL;);
3410 }
3411
3412 /* meta data management */
3413
3414 struct meta_data_on_disk {
3415         u64 la_size;           /* last agreed size. */
3416         u64 uuid[UI_SIZE];   /* UUIDs. */
3417         u64 device_uuid;
3418         u64 reserved_u64_1;
3419         u32 flags;             /* MDF */
3420         u32 magic;
3421         u32 md_size_sect;
3422         u32 al_offset;         /* offset to this block */
3423         u32 al_nr_extents;     /* important for restoring the AL */
3424               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3425         u32 bm_offset;         /* offset to the bitmap, from here */
3426         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3427         u32 reserved_u32[4];
3428
3429 } __packed;
3430
3431 /**
3432  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3433  * @mdev:       DRBD device.
3434  */
3435 void drbd_md_sync(struct drbd_conf *mdev)
3436 {
3437         struct meta_data_on_disk *buffer;
3438         sector_t sector;
3439         int i;
3440
3441         del_timer(&mdev->md_sync_timer);
3442         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3443         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3444                 return;
3445
3446         /* We use here D_FAILED and not D_ATTACHING because we try to write
3447          * metadata even if we detach due to a disk failure! */
3448         if (!get_ldev_if_state(mdev, D_FAILED))
3449                 return;
3450
3451         mutex_lock(&mdev->md_io_mutex);
3452         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3453         memset(buffer, 0, 512);
3454
3455         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3456         for (i = UI_CURRENT; i < UI_SIZE; i++)
3457                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3458         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3459         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3460
3461         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3462         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3463         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3464         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3465         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3466
3467         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3468
3469         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3470         sector = mdev->ldev->md.md_offset;
3471
3472         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3473                 /* this was a try anyways ... */
3474                 dev_err(DEV, "meta data update failed!\n");
3475                 drbd_chk_io_error(mdev, 1, TRUE);
3476         }
3477
3478         /* Update mdev->ldev->md.la_size_sect,
3479          * since we updated it on metadata. */
3480         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3481
3482         mutex_unlock(&mdev->md_io_mutex);
3483         put_ldev(mdev);
3484 }
3485
3486 /**
3487  * drbd_md_read() - Reads in the meta data super block
3488  * @mdev:       DRBD device.
3489  * @bdev:       Device from which the meta data should be read in.
3490  *
3491  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3492  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3493  */
3494 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3495 {
3496         struct meta_data_on_disk *buffer;
3497         int i, rv = NO_ERROR;
3498
3499         if (!get_ldev_if_state(mdev, D_ATTACHING))
3500                 return ERR_IO_MD_DISK;
3501
3502         mutex_lock(&mdev->md_io_mutex);
3503         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3504
3505         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3506                 /* NOTE: cant do normal error processing here as this is
3507                    called BEFORE disk is attached */
3508                 dev_err(DEV, "Error while reading metadata.\n");
3509                 rv = ERR_IO_MD_DISK;
3510                 goto err;
3511         }
3512
3513         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3514                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3515                 rv = ERR_MD_INVALID;
3516                 goto err;
3517         }
3518         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3519                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3520                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3521                 rv = ERR_MD_INVALID;
3522                 goto err;
3523         }
3524         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3525                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3526                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3527                 rv = ERR_MD_INVALID;
3528                 goto err;
3529         }
3530         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3531                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3532                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3533                 rv = ERR_MD_INVALID;
3534                 goto err;
3535         }
3536
3537         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3538                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3539                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3540                 rv = ERR_MD_INVALID;
3541                 goto err;
3542         }
3543
3544         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3545         for (i = UI_CURRENT; i < UI_SIZE; i++)
3546                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3547         bdev->md.flags = be32_to_cpu(buffer->flags);
3548         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3549         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3550
3551         if (mdev->sync_conf.al_extents < 7)
3552                 mdev->sync_conf.al_extents = 127;
3553
3554  err:
3555         mutex_unlock(&mdev->md_io_mutex);
3556         put_ldev(mdev);
3557
3558         return rv;
3559 }
3560
3561 static void debug_drbd_uuid(struct drbd_conf *mdev, enum drbd_uuid_index index)
3562 {
3563         static char *uuid_str[UI_EXTENDED_SIZE] = {
3564                 [UI_CURRENT] = "CURRENT",
3565                 [UI_BITMAP] = "BITMAP",
3566                 [UI_HISTORY_START] = "HISTORY_START",
3567                 [UI_HISTORY_END] = "HISTORY_END",
3568                 [UI_SIZE] = "SIZE",
3569                 [UI_FLAGS] = "FLAGS",
3570         };
3571
3572         if (index >= UI_EXTENDED_SIZE) {
3573                 dev_warn(DEV, " uuid_index >= EXTENDED_SIZE\n");
3574                 return;
3575         }
3576
3577         dynamic_dev_dbg(DEV, " uuid[%s] now %016llX\n",
3578                  uuid_str[index],
3579                  (unsigned long long)mdev->ldev->md.uuid[index]);
3580 }
3581
3582
3583 /**
3584  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3585  * @mdev:       DRBD device.
3586  *
3587  * Call this function if you change anything that should be written to
3588  * the meta-data super block. This function sets MD_DIRTY, and starts a
3589  * timer that ensures that within five seconds you have to call drbd_md_sync().
3590  */
3591 #ifdef DEBUG
3592 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3593 {
3594         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3595                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3596                 mdev->last_md_mark_dirty.line = line;
3597                 mdev->last_md_mark_dirty.func = func;
3598         }
3599 }
3600 #else
3601 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3602 {
3603         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3604                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3605 }
3606 #endif
3607
3608 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3609 {
3610         int i;
3611
3612         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
3613                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3614                 debug_drbd_uuid(mdev, i+1);
3615         }
3616 }
3617
3618 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3619 {
3620         if (idx == UI_CURRENT) {
3621                 if (mdev->state.role == R_PRIMARY)
3622                         val |= 1;
3623                 else
3624                         val &= ~((u64)1);
3625
3626                 drbd_set_ed_uuid(mdev, val);
3627         }
3628
3629         mdev->ldev->md.uuid[idx] = val;
3630         debug_drbd_uuid(mdev, idx);
3631         drbd_md_mark_dirty(mdev);
3632 }
3633
3634
3635 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3636 {
3637         if (mdev->ldev->md.uuid[idx]) {
3638                 drbd_uuid_move_history(mdev);
3639                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3640                 debug_drbd_uuid(mdev, UI_HISTORY_START);
3641         }
3642         _drbd_uuid_set(mdev, idx, val);
3643 }
3644
3645 /**
3646  * drbd_uuid_new_current() - Creates a new current UUID
3647  * @mdev:       DRBD device.
3648  *
3649  * Creates a new current UUID, and rotates the old current UUID into
3650  * the bitmap slot. Causes an incremental resync upon next connect.
3651  */
3652 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3653 {
3654         u64 val;
3655
3656         dev_info(DEV, "Creating new current UUID\n");
3657         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3658         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3659         debug_drbd_uuid(mdev, UI_BITMAP);
3660
3661         get_random_bytes(&val, sizeof(u64));
3662         _drbd_uuid_set(mdev, UI_CURRENT, val);
3663 }
3664
3665 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3666 {
3667         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3668                 return;
3669
3670         if (val == 0) {
3671                 drbd_uuid_move_history(mdev);
3672                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3673                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3674                 debug_drbd_uuid(mdev, UI_HISTORY_START);
3675                 debug_drbd_uuid(mdev, UI_BITMAP);
3676         } else {
3677                 if (mdev->ldev->md.uuid[UI_BITMAP])
3678                         dev_warn(DEV, "bm UUID already set");
3679
3680                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3681                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3682
3683                 debug_drbd_uuid(mdev, UI_BITMAP);
3684         }
3685         drbd_md_mark_dirty(mdev);
3686 }
3687
3688 /**
3689  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3690  * @mdev:       DRBD device.
3691  *
3692  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3693  */
3694 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3695 {
3696         int rv = -EIO;
3697
3698         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3699                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3700                 drbd_md_sync(mdev);
3701                 drbd_bm_set_all(mdev);
3702
3703                 rv = drbd_bm_write(mdev);
3704
3705                 if (!rv) {
3706                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3707                         drbd_md_sync(mdev);
3708                 }
3709
3710                 put_ldev(mdev);
3711         }
3712
3713         return rv;
3714 }
3715
3716 /**
3717  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3718  * @mdev:       DRBD device.
3719  *
3720  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3721  */
3722 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3723 {
3724         int rv = -EIO;
3725
3726         drbd_resume_al(mdev);
3727         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3728                 drbd_bm_clear_all(mdev);
3729                 rv = drbd_bm_write(mdev);
3730                 put_ldev(mdev);
3731         }
3732
3733         return rv;
3734 }
3735
3736 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3737 {
3738         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3739         int rv;
3740
3741         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3742
3743         drbd_bm_lock(mdev, work->why);
3744         rv = work->io_fn(mdev);
3745         drbd_bm_unlock(mdev);
3746
3747         clear_bit(BITMAP_IO, &mdev->flags);
3748         wake_up(&mdev->misc_wait);
3749
3750         if (work->done)
3751                 work->done(mdev, rv);
3752
3753         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3754         work->why = NULL;
3755
3756         return 1;
3757 }
3758
3759 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3760 {
3761         D_ASSERT(mdev->state.disk == D_FAILED);
3762         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3763          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3764          * the protected members anymore, though, so in the after_state_ch work
3765          * it will be safe to free them. */
3766         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3767         /* We need to wait for return of references checked out while we still
3768          * have been D_FAILED, though (drbd_md_sync, bitmap io). */
3769         wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
3770
3771         clear_bit(GO_DISKLESS, &mdev->flags);
3772         return 1;
3773 }
3774
3775 void drbd_go_diskless(struct drbd_conf *mdev)
3776 {
3777         D_ASSERT(mdev->state.disk == D_FAILED);
3778         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3779                 drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3780                 /* don't drbd_queue_work_front,
3781                  * we need to serialize with the after_state_ch work
3782                  * of the -> D_FAILED transition. */
3783 }
3784
3785 /**
3786  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3787  * @mdev:       DRBD device.
3788  * @io_fn:      IO callback to be called when bitmap IO is possible
3789  * @done:       callback to be called after the bitmap IO was performed
3790  * @why:        Descriptive text of the reason for doing the IO
3791  *
3792  * While IO on the bitmap happens we freeze application IO thus we ensure
3793  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3794  * called from worker context. It MUST NOT be used while a previous such
3795  * work is still pending!
3796  */
3797 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3798                           int (*io_fn)(struct drbd_conf *),
3799                           void (*done)(struct drbd_conf *, int),
3800                           char *why)
3801 {
3802         D_ASSERT(current == mdev->worker.task);
3803
3804         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3805         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3806         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3807         if (mdev->bm_io_work.why)
3808                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3809                         why, mdev->bm_io_work.why);
3810
3811         mdev->bm_io_work.io_fn = io_fn;
3812         mdev->bm_io_work.done = done;
3813         mdev->bm_io_work.why = why;
3814
3815         set_bit(BITMAP_IO, &mdev->flags);
3816         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3817                 if (list_empty(&mdev->bm_io_work.w.list)) {
3818                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3819                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3820                 } else
3821                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3822         }
3823 }
3824
3825 /**
3826  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3827  * @mdev:       DRBD device.
3828  * @io_fn:      IO callback to be called when bitmap IO is possible
3829  * @why:        Descriptive text of the reason for doing the IO
3830  *
3831  * freezes application IO while that the actual IO operations runs. This
3832  * functions MAY NOT be called from worker context.
3833  */
3834 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3835 {
3836         int rv;
3837
3838         D_ASSERT(current != mdev->worker.task);
3839
3840         drbd_suspend_io(mdev);
3841
3842         drbd_bm_lock(mdev, why);
3843         rv = io_fn(mdev);
3844         drbd_bm_unlock(mdev);
3845
3846         drbd_resume_io(mdev);
3847
3848         return rv;
3849 }
3850
3851 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3852 {
3853         if ((mdev->ldev->md.flags & flag) != flag) {
3854                 drbd_md_mark_dirty(mdev);
3855                 mdev->ldev->md.flags |= flag;
3856         }
3857 }
3858
3859 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3860 {
3861         if ((mdev->ldev->md.flags & flag) != 0) {
3862                 drbd_md_mark_dirty(mdev);
3863                 mdev->ldev->md.flags &= ~flag;
3864         }
3865 }
3866 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3867 {
3868         return (bdev->md.flags & flag) != 0;
3869 }
3870
3871 static void md_sync_timer_fn(unsigned long data)
3872 {
3873         struct drbd_conf *mdev = (struct drbd_conf *) data;
3874
3875         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3876 }
3877
3878 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3879 {
3880         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3881 #ifdef DEBUG
3882         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3883                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3884 #endif
3885         drbd_md_sync(mdev);
3886         return 1;
3887 }
3888
3889 #ifdef CONFIG_DRBD_FAULT_INJECTION
3890 /* Fault insertion support including random number generator shamelessly
3891  * stolen from kernel/rcutorture.c */
3892 struct fault_random_state {
3893         unsigned long state;
3894         unsigned long count;
3895 };
3896
3897 #define FAULT_RANDOM_MULT 39916801  /* prime */
3898 #define FAULT_RANDOM_ADD        479001701 /* prime */
3899 #define FAULT_RANDOM_REFRESH 10000
3900
3901 /*
3902  * Crude but fast random-number generator.  Uses a linear congruential
3903  * generator, with occasional help from get_random_bytes().
3904  */
3905 static unsigned long
3906 _drbd_fault_random(struct fault_random_state *rsp)
3907 {
3908         long refresh;
3909
3910         if (!rsp->count--) {
3911                 get_random_bytes(&refresh, sizeof(refresh));
3912                 rsp->state += refresh;
3913                 rsp->count = FAULT_RANDOM_REFRESH;
3914         }
3915         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3916         return swahw32(rsp->state);
3917 }
3918
3919 static char *
3920 _drbd_fault_str(unsigned int type) {
3921         static char *_faults[] = {
3922                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3923                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3924                 [DRBD_FAULT_RS_WR] = "Resync write",
3925                 [DRBD_FAULT_RS_RD] = "Resync read",
3926                 [DRBD_FAULT_DT_WR] = "Data write",
3927                 [DRBD_FAULT_DT_RD] = "Data read",
3928                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3929                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3930                 [DRBD_FAULT_AL_EE] = "EE allocation",
3931                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3932         };
3933
3934         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3935 }
3936
3937 unsigned int
3938 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3939 {
3940         static struct fault_random_state rrs = {0, 0};
3941
3942         unsigned int ret = (
3943                 (fault_devs == 0 ||
3944                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3945                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3946
3947         if (ret) {
3948                 fault_count++;
3949
3950                 if (__ratelimit(&drbd_ratelimit_state))
3951                         dev_warn(DEV, "***Simulating %s failure\n",
3952                                 _drbd_fault_str(type));
3953         }
3954
3955         return ret;
3956 }
3957 #endif
3958
3959 const char *drbd_buildtag(void)
3960 {
3961         /* DRBD built from external sources has here a reference to the
3962            git hash of the source code. */
3963
3964         static char buildtag[38] = "\0uilt-in";
3965
3966         if (buildtag[0] == 0) {
3967 #ifdef CONFIG_MODULES
3968                 if (THIS_MODULE != NULL)
3969                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3970                 else
3971 #endif
3972                         buildtag[0] = 'b';
3973         }
3974
3975         return buildtag;
3976 }
3977
3978 module_init(drbd_init)
3979 module_exit(drbd_cleanup)
3980
3981 EXPORT_SYMBOL(drbd_conn_str);
3982 EXPORT_SYMBOL(drbd_role_str);
3983 EXPORT_SYMBOL(drbd_disk_str);
3984 EXPORT_SYMBOL(drbd_set_st_err_str);