dlm: remove shared message stub for recovery
[linux-3.10.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329
330 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331 {
332         struct dlm_rsb *r;
333
334         r = dlm_allocate_rsb(ls, len);
335         if (!r)
336                 return NULL;
337
338         r->res_ls = ls;
339         r->res_length = len;
340         memcpy(r->res_name, name, len);
341         mutex_init(&r->res_mutex);
342
343         INIT_LIST_HEAD(&r->res_lookup);
344         INIT_LIST_HEAD(&r->res_grantqueue);
345         INIT_LIST_HEAD(&r->res_convertqueue);
346         INIT_LIST_HEAD(&r->res_waitqueue);
347         INIT_LIST_HEAD(&r->res_root_list);
348         INIT_LIST_HEAD(&r->res_recover_list);
349
350         return r;
351 }
352
353 static int search_rsb_list(struct list_head *head, char *name, int len,
354                            unsigned int flags, struct dlm_rsb **r_ret)
355 {
356         struct dlm_rsb *r;
357         int error = 0;
358
359         list_for_each_entry(r, head, res_hashchain) {
360                 if (len == r->res_length && !memcmp(name, r->res_name, len))
361                         goto found;
362         }
363         *r_ret = NULL;
364         return -EBADR;
365
366  found:
367         if (r->res_nodeid && (flags & R_MASTER))
368                 error = -ENOTBLK;
369         *r_ret = r;
370         return error;
371 }
372
373 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374                        unsigned int flags, struct dlm_rsb **r_ret)
375 {
376         struct dlm_rsb *r;
377         int error;
378
379         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380         if (!error) {
381                 kref_get(&r->res_ref);
382                 goto out;
383         }
384         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385         if (error)
386                 goto out;
387
388         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390         if (dlm_no_directory(ls))
391                 goto out;
392
393         if (r->res_nodeid == -1) {
394                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395                 r->res_first_lkid = 0;
396         } else if (r->res_nodeid > 0) {
397                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else {
400                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402         }
403  out:
404         *r_ret = r;
405         return error;
406 }
407
408 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409                       unsigned int flags, struct dlm_rsb **r_ret)
410 {
411         int error;
412         spin_lock(&ls->ls_rsbtbl[b].lock);
413         error = _search_rsb(ls, name, len, b, flags, r_ret);
414         spin_unlock(&ls->ls_rsbtbl[b].lock);
415         return error;
416 }
417
418 /*
419  * Find rsb in rsbtbl and potentially create/add one
420  *
421  * Delaying the release of rsb's has a similar benefit to applications keeping
422  * NL locks on an rsb, but without the guarantee that the cached master value
423  * will still be valid when the rsb is reused.  Apps aren't always smart enough
424  * to keep NL locks on an rsb that they may lock again shortly; this can lead
425  * to excessive master lookups and removals if we don't delay the release.
426  *
427  * Searching for an rsb means looking through both the normal list and toss
428  * list.  When found on the toss list the rsb is moved to the normal list with
429  * ref count of 1; when found on normal list the ref count is incremented.
430  */
431
432 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433                     unsigned int flags, struct dlm_rsb **r_ret)
434 {
435         struct dlm_rsb *r = NULL, *tmp;
436         uint32_t hash, bucket;
437         int error = -EINVAL;
438
439         if (namelen > DLM_RESNAME_MAXLEN)
440                 goto out;
441
442         if (dlm_no_directory(ls))
443                 flags |= R_CREATE;
444
445         error = 0;
446         hash = jhash(name, namelen, 0);
447         bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449         error = search_rsb(ls, name, namelen, bucket, flags, &r);
450         if (!error)
451                 goto out;
452
453         if (error == -EBADR && !(flags & R_CREATE))
454                 goto out;
455
456         /* the rsb was found but wasn't a master copy */
457         if (error == -ENOTBLK)
458                 goto out;
459
460         error = -ENOMEM;
461         r = create_rsb(ls, name, namelen);
462         if (!r)
463                 goto out;
464
465         r->res_hash = hash;
466         r->res_bucket = bucket;
467         r->res_nodeid = -1;
468         kref_init(&r->res_ref);
469
470         /* With no directory, the master can be set immediately */
471         if (dlm_no_directory(ls)) {
472                 int nodeid = dlm_dir_nodeid(r);
473                 if (nodeid == dlm_our_nodeid())
474                         nodeid = 0;
475                 r->res_nodeid = nodeid;
476         }
477
478         spin_lock(&ls->ls_rsbtbl[bucket].lock);
479         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480         if (!error) {
481                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482                 dlm_free_rsb(r);
483                 r = tmp;
484                 goto out;
485         }
486         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488         error = 0;
489  out:
490         *r_ret = r;
491         return error;
492 }
493
494 /* This is only called to add a reference when the code already holds
495    a valid reference to the rsb, so there's no need for locking. */
496
497 static inline void hold_rsb(struct dlm_rsb *r)
498 {
499         kref_get(&r->res_ref);
500 }
501
502 void dlm_hold_rsb(struct dlm_rsb *r)
503 {
504         hold_rsb(r);
505 }
506
507 static void toss_rsb(struct kref *kref)
508 {
509         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510         struct dlm_ls *ls = r->res_ls;
511
512         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513         kref_init(&r->res_ref);
514         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515         r->res_toss_time = jiffies;
516         if (r->res_lvbptr) {
517                 dlm_free_lvb(r->res_lvbptr);
518                 r->res_lvbptr = NULL;
519         }
520 }
521
522 /* When all references to the rsb are gone it's transfered to
523    the tossed list for later disposal. */
524
525 static void put_rsb(struct dlm_rsb *r)
526 {
527         struct dlm_ls *ls = r->res_ls;
528         uint32_t bucket = r->res_bucket;
529
530         spin_lock(&ls->ls_rsbtbl[bucket].lock);
531         kref_put(&r->res_ref, toss_rsb);
532         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
533 }
534
535 void dlm_put_rsb(struct dlm_rsb *r)
536 {
537         put_rsb(r);
538 }
539
540 /* See comment for unhold_lkb */
541
542 static void unhold_rsb(struct dlm_rsb *r)
543 {
544         int rv;
545         rv = kref_put(&r->res_ref, toss_rsb);
546         DLM_ASSERT(!rv, dlm_dump_rsb(r););
547 }
548
549 static void kill_rsb(struct kref *kref)
550 {
551         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553         /* All work is done after the return from kref_put() so we
554            can release the write_lock before the remove and free. */
555
556         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
562 }
563
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565    The rsb must exist as long as any lkb's for it do. */
566
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568 {
569         hold_rsb(r);
570         lkb->lkb_resource = r;
571 }
572
573 static void detach_lkb(struct dlm_lkb *lkb)
574 {
575         if (lkb->lkb_resource) {
576                 put_rsb(lkb->lkb_resource);
577                 lkb->lkb_resource = NULL;
578         }
579 }
580
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582 {
583         struct dlm_lkb *lkb, *tmp;
584         uint32_t lkid = 0;
585         uint16_t bucket;
586
587         lkb = dlm_allocate_lkb(ls);
588         if (!lkb)
589                 return -ENOMEM;
590
591         lkb->lkb_nodeid = -1;
592         lkb->lkb_grmode = DLM_LOCK_IV;
593         kref_init(&lkb->lkb_ref);
594         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596         INIT_LIST_HEAD(&lkb->lkb_time_list);
597         INIT_LIST_HEAD(&lkb->lkb_astqueue);
598
599         get_random_bytes(&bucket, sizeof(bucket));
600         bucket &= (ls->ls_lkbtbl_size - 1);
601
602         write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604         /* counter can roll over so we must verify lkid is not in use */
605
606         while (lkid == 0) {
607                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608
609                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610                                     lkb_idtbl_list) {
611                         if (tmp->lkb_id != lkid)
612                                 continue;
613                         lkid = 0;
614                         break;
615                 }
616         }
617
618         lkb->lkb_id = lkid;
619         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620         write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622         *lkb_ret = lkb;
623         return 0;
624 }
625
626 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627 {
628         struct dlm_lkb *lkb;
629         uint16_t bucket = (lkid >> 16);
630
631         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632                 if (lkb->lkb_id == lkid)
633                         return lkb;
634         }
635         return NULL;
636 }
637
638 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639 {
640         struct dlm_lkb *lkb;
641         uint16_t bucket = (lkid >> 16);
642
643         if (bucket >= ls->ls_lkbtbl_size)
644                 return -EBADSLT;
645
646         read_lock(&ls->ls_lkbtbl[bucket].lock);
647         lkb = __find_lkb(ls, lkid);
648         if (lkb)
649                 kref_get(&lkb->lkb_ref);
650         read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652         *lkb_ret = lkb;
653         return lkb ? 0 : -ENOENT;
654 }
655
656 static void kill_lkb(struct kref *kref)
657 {
658         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660         /* All work is done after the return from kref_put() so we
661            can release the write_lock before the detach_lkb */
662
663         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664 }
665
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667    it so we need to provide the lockspace explicitly */
668
669 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 {
671         uint16_t bucket = (lkb->lkb_id >> 16);
672
673         write_lock(&ls->ls_lkbtbl[bucket].lock);
674         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675                 list_del(&lkb->lkb_idtbl_list);
676                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678                 detach_lkb(lkb);
679
680                 /* for local/process lkbs, lvbptr points to caller's lksb */
681                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
682                         dlm_free_lvb(lkb->lkb_lvbptr);
683                 dlm_free_lkb(lkb);
684                 return 1;
685         } else {
686                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
687                 return 0;
688         }
689 }
690
691 int dlm_put_lkb(struct dlm_lkb *lkb)
692 {
693         struct dlm_ls *ls;
694
695         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698         ls = lkb->lkb_resource->res_ls;
699         return __put_lkb(ls, lkb);
700 }
701
702 /* This is only called to add a reference when the code already holds
703    a valid reference to the lkb, so there's no need for locking. */
704
705 static inline void hold_lkb(struct dlm_lkb *lkb)
706 {
707         kref_get(&lkb->lkb_ref);
708 }
709
710 /* This is called when we need to remove a reference and are certain
711    it's not the last ref.  e.g. del_lkb is always called between a
712    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713    put_lkb would work fine, but would involve unnecessary locking */
714
715 static inline void unhold_lkb(struct dlm_lkb *lkb)
716 {
717         int rv;
718         rv = kref_put(&lkb->lkb_ref, kill_lkb);
719         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720 }
721
722 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723                             int mode)
724 {
725         struct dlm_lkb *lkb = NULL;
726
727         list_for_each_entry(lkb, head, lkb_statequeue)
728                 if (lkb->lkb_rqmode < mode)
729                         break;
730
731         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738         kref_get(&lkb->lkb_ref);
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
742         lkb->lkb_timestamp = ktime_get();
743
744         lkb->lkb_status = status;
745
746         switch (status) {
747         case DLM_LKSTS_WAITING:
748                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750                 else
751                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752                 break;
753         case DLM_LKSTS_GRANTED:
754                 /* convention says granted locks kept in order of grmode */
755                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756                                 lkb->lkb_grmode);
757                 break;
758         case DLM_LKSTS_CONVERT:
759                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761                 else
762                         list_add_tail(&lkb->lkb_statequeue,
763                                       &r->res_convertqueue);
764                 break;
765         default:
766                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767         }
768 }
769
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771 {
772         lkb->lkb_status = 0;
773         list_del(&lkb->lkb_statequeue);
774         unhold_lkb(lkb);
775 }
776
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778 {
779         hold_lkb(lkb);
780         del_lkb(r, lkb);
781         add_lkb(r, lkb, sts);
782         unhold_lkb(lkb);
783 }
784
785 static int msg_reply_type(int mstype)
786 {
787         switch (mstype) {
788         case DLM_MSG_REQUEST:
789                 return DLM_MSG_REQUEST_REPLY;
790         case DLM_MSG_CONVERT:
791                 return DLM_MSG_CONVERT_REPLY;
792         case DLM_MSG_UNLOCK:
793                 return DLM_MSG_UNLOCK_REPLY;
794         case DLM_MSG_CANCEL:
795                 return DLM_MSG_CANCEL_REPLY;
796         case DLM_MSG_LOOKUP:
797                 return DLM_MSG_LOOKUP_REPLY;
798         }
799         return -1;
800 }
801
802 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803 {
804         int i;
805
806         for (i = 0; i < num_nodes; i++) {
807                 if (!warned[i]) {
808                         warned[i] = nodeid;
809                         return 0;
810                 }
811                 if (warned[i] == nodeid)
812                         return 1;
813         }
814         return 0;
815 }
816
817 void dlm_scan_waiters(struct dlm_ls *ls)
818 {
819         struct dlm_lkb *lkb;
820         ktime_t zero = ktime_set(0, 0);
821         s64 us;
822         s64 debug_maxus = 0;
823         u32 debug_scanned = 0;
824         u32 debug_expired = 0;
825         int num_nodes = 0;
826         int *warned = NULL;
827
828         if (!dlm_config.ci_waitwarn_us)
829                 return;
830
831         mutex_lock(&ls->ls_waiters_mutex);
832
833         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834                 if (ktime_equal(lkb->lkb_wait_time, zero))
835                         continue;
836
837                 debug_scanned++;
838
839                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840
841                 if (us < dlm_config.ci_waitwarn_us)
842                         continue;
843
844                 lkb->lkb_wait_time = zero;
845
846                 debug_expired++;
847                 if (us > debug_maxus)
848                         debug_maxus = us;
849
850                 if (!num_nodes) {
851                         num_nodes = ls->ls_num_nodes;
852                         warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853                         if (warned)
854                                 memset(warned, 0, num_nodes * sizeof(int));
855                 }
856                 if (!warned)
857                         continue;
858                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859                         continue;
860
861                 log_error(ls, "waitwarn %x %lld %d us check connection to "
862                           "node %d", lkb->lkb_id, (long long)us,
863                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864         }
865         mutex_unlock(&ls->ls_waiters_mutex);
866
867         if (warned)
868                 kfree(warned);
869
870         if (debug_expired)
871                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872                           debug_scanned, debug_expired,
873                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874 }
875
876 /* add/remove lkb from global waiters list of lkb's waiting for
877    a reply from a remote node */
878
879 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
880 {
881         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
882         int error = 0;
883
884         mutex_lock(&ls->ls_waiters_mutex);
885
886         if (is_overlap_unlock(lkb) ||
887             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
888                 error = -EINVAL;
889                 goto out;
890         }
891
892         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
893                 switch (mstype) {
894                 case DLM_MSG_UNLOCK:
895                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
896                         break;
897                 case DLM_MSG_CANCEL:
898                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
899                         break;
900                 default:
901                         error = -EBUSY;
902                         goto out;
903                 }
904                 lkb->lkb_wait_count++;
905                 hold_lkb(lkb);
906
907                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
908                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
909                           lkb->lkb_wait_count, lkb->lkb_flags);
910                 goto out;
911         }
912
913         DLM_ASSERT(!lkb->lkb_wait_count,
914                    dlm_print_lkb(lkb);
915                    printk("wait_count %d\n", lkb->lkb_wait_count););
916
917         lkb->lkb_wait_count++;
918         lkb->lkb_wait_type = mstype;
919         lkb->lkb_wait_time = ktime_get();
920         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
921         hold_lkb(lkb);
922         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
923  out:
924         if (error)
925                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
926                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
927                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
928         mutex_unlock(&ls->ls_waiters_mutex);
929         return error;
930 }
931
932 /* We clear the RESEND flag because we might be taking an lkb off the waiters
933    list as part of process_requestqueue (e.g. a lookup that has an optimized
934    request reply on the requestqueue) between dlm_recover_waiters_pre() which
935    set RESEND and dlm_recover_waiters_post() */
936
937 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
938                                 struct dlm_message *ms)
939 {
940         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
941         int overlap_done = 0;
942
943         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
944                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
945                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
946                 overlap_done = 1;
947                 goto out_del;
948         }
949
950         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
951                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
952                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
953                 overlap_done = 1;
954                 goto out_del;
955         }
956
957         /* Cancel state was preemptively cleared by a successful convert,
958            see next comment, nothing to do. */
959
960         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
961             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
962                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
963                           lkb->lkb_id, lkb->lkb_wait_type);
964                 return -1;
965         }
966
967         /* Remove for the convert reply, and premptively remove for the
968            cancel reply.  A convert has been granted while there's still
969            an outstanding cancel on it (the cancel is moot and the result
970            in the cancel reply should be 0).  We preempt the cancel reply
971            because the app gets the convert result and then can follow up
972            with another op, like convert.  This subsequent op would see the
973            lingering state of the cancel and fail with -EBUSY. */
974
975         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
976             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
977             is_overlap_cancel(lkb) && ms && !ms->m_result) {
978                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
979                           lkb->lkb_id);
980                 lkb->lkb_wait_type = 0;
981                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
982                 lkb->lkb_wait_count--;
983                 goto out_del;
984         }
985
986         /* N.B. type of reply may not always correspond to type of original
987            msg due to lookup->request optimization, verify others? */
988
989         if (lkb->lkb_wait_type) {
990                 lkb->lkb_wait_type = 0;
991                 goto out_del;
992         }
993
994         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
995                   lkb->lkb_id, mstype, lkb->lkb_flags);
996         return -1;
997
998  out_del:
999         /* the force-unlock/cancel has completed and we haven't recvd a reply
1000            to the op that was in progress prior to the unlock/cancel; we
1001            give up on any reply to the earlier op.  FIXME: not sure when/how
1002            this would happen */
1003
1004         if (overlap_done && lkb->lkb_wait_type) {
1005                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1006                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1007                 lkb->lkb_wait_count--;
1008                 lkb->lkb_wait_type = 0;
1009         }
1010
1011         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1012
1013         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1014         lkb->lkb_wait_count--;
1015         if (!lkb->lkb_wait_count)
1016                 list_del_init(&lkb->lkb_wait_reply);
1017         unhold_lkb(lkb);
1018         return 0;
1019 }
1020
1021 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1022 {
1023         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1024         int error;
1025
1026         mutex_lock(&ls->ls_waiters_mutex);
1027         error = _remove_from_waiters(lkb, mstype, NULL);
1028         mutex_unlock(&ls->ls_waiters_mutex);
1029         return error;
1030 }
1031
1032 /* Handles situations where we might be processing a "fake" or "stub" reply in
1033    which we can't try to take waiters_mutex again. */
1034
1035 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1036 {
1037         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038         int error;
1039
1040         if (ms->m_flags != DLM_IFL_STUB_MS)
1041                 mutex_lock(&ls->ls_waiters_mutex);
1042         error = _remove_from_waiters(lkb, ms->m_type, ms);
1043         if (ms->m_flags != DLM_IFL_STUB_MS)
1044                 mutex_unlock(&ls->ls_waiters_mutex);
1045         return error;
1046 }
1047
1048 static void dir_remove(struct dlm_rsb *r)
1049 {
1050         int to_nodeid;
1051
1052         if (dlm_no_directory(r->res_ls))
1053                 return;
1054
1055         to_nodeid = dlm_dir_nodeid(r);
1056         if (to_nodeid != dlm_our_nodeid())
1057                 send_remove(r);
1058         else
1059                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1060                                      r->res_name, r->res_length);
1061 }
1062
1063 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064    found since they are in order of newest to oldest? */
1065
1066 static int shrink_bucket(struct dlm_ls *ls, int b)
1067 {
1068         struct dlm_rsb *r;
1069         int count = 0, found;
1070
1071         for (;;) {
1072                 found = 0;
1073                 spin_lock(&ls->ls_rsbtbl[b].lock);
1074                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1075                                             res_hashchain) {
1076                         if (!time_after_eq(jiffies, r->res_toss_time +
1077                                            dlm_config.ci_toss_secs * HZ))
1078                                 continue;
1079                         found = 1;
1080                         break;
1081                 }
1082
1083                 if (!found) {
1084                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1085                         break;
1086                 }
1087
1088                 if (kref_put(&r->res_ref, kill_rsb)) {
1089                         list_del(&r->res_hashchain);
1090                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1091
1092                         if (is_master(r))
1093                                 dir_remove(r);
1094                         dlm_free_rsb(r);
1095                         count++;
1096                 } else {
1097                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1098                         log_error(ls, "tossed rsb in use %s", r->res_name);
1099                 }
1100         }
1101
1102         return count;
1103 }
1104
1105 void dlm_scan_rsbs(struct dlm_ls *ls)
1106 {
1107         int i;
1108
1109         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1110                 shrink_bucket(ls, i);
1111                 if (dlm_locking_stopped(ls))
1112                         break;
1113                 cond_resched();
1114         }
1115 }
1116
1117 static void add_timeout(struct dlm_lkb *lkb)
1118 {
1119         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1120
1121         if (is_master_copy(lkb))
1122                 return;
1123
1124         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1125             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1126                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1127                 goto add_it;
1128         }
1129         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1130                 goto add_it;
1131         return;
1132
1133  add_it:
1134         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1135         mutex_lock(&ls->ls_timeout_mutex);
1136         hold_lkb(lkb);
1137         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1138         mutex_unlock(&ls->ls_timeout_mutex);
1139 }
1140
1141 static void del_timeout(struct dlm_lkb *lkb)
1142 {
1143         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1144
1145         mutex_lock(&ls->ls_timeout_mutex);
1146         if (!list_empty(&lkb->lkb_time_list)) {
1147                 list_del_init(&lkb->lkb_time_list);
1148                 unhold_lkb(lkb);
1149         }
1150         mutex_unlock(&ls->ls_timeout_mutex);
1151 }
1152
1153 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1155    and then lock rsb because of lock ordering in add_timeout.  We may need
1156    to specify some special timeout-related bits in the lkb that are just to
1157    be accessed under the timeout_mutex. */
1158
1159 void dlm_scan_timeout(struct dlm_ls *ls)
1160 {
1161         struct dlm_rsb *r;
1162         struct dlm_lkb *lkb;
1163         int do_cancel, do_warn;
1164         s64 wait_us;
1165
1166         for (;;) {
1167                 if (dlm_locking_stopped(ls))
1168                         break;
1169
1170                 do_cancel = 0;
1171                 do_warn = 0;
1172                 mutex_lock(&ls->ls_timeout_mutex);
1173                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1174
1175                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1176                                                         lkb->lkb_timestamp));
1177
1178                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1179                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1180                                 do_cancel = 1;
1181
1182                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1183                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1184                                 do_warn = 1;
1185
1186                         if (!do_cancel && !do_warn)
1187                                 continue;
1188                         hold_lkb(lkb);
1189                         break;
1190                 }
1191                 mutex_unlock(&ls->ls_timeout_mutex);
1192
1193                 if (!do_cancel && !do_warn)
1194                         break;
1195
1196                 r = lkb->lkb_resource;
1197                 hold_rsb(r);
1198                 lock_rsb(r);
1199
1200                 if (do_warn) {
1201                         /* clear flag so we only warn once */
1202                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1203                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1204                                 del_timeout(lkb);
1205                         dlm_timeout_warn(lkb);
1206                 }
1207
1208                 if (do_cancel) {
1209                         log_debug(ls, "timeout cancel %x node %d %s",
1210                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1211                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1212                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1213                         del_timeout(lkb);
1214                         _cancel_lock(r, lkb);
1215                 }
1216
1217                 unlock_rsb(r);
1218                 unhold_rsb(r);
1219                 dlm_put_lkb(lkb);
1220         }
1221 }
1222
1223 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224    dlm_recoverd before checking/setting ls_recover_begin. */
1225
1226 void dlm_adjust_timeouts(struct dlm_ls *ls)
1227 {
1228         struct dlm_lkb *lkb;
1229         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1230
1231         ls->ls_recover_begin = 0;
1232         mutex_lock(&ls->ls_timeout_mutex);
1233         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1234                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1235         mutex_unlock(&ls->ls_timeout_mutex);
1236
1237         if (!dlm_config.ci_waitwarn_us)
1238                 return;
1239
1240         mutex_lock(&ls->ls_waiters_mutex);
1241         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242                 if (ktime_to_us(lkb->lkb_wait_time))
1243                         lkb->lkb_wait_time = ktime_get();
1244         }
1245         mutex_unlock(&ls->ls_waiters_mutex);
1246 }
1247
1248 /* lkb is master or local copy */
1249
1250 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251 {
1252         int b, len = r->res_ls->ls_lvblen;
1253
1254         /* b=1 lvb returned to caller
1255            b=0 lvb written to rsb or invalidated
1256            b=-1 do nothing */
1257
1258         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1259
1260         if (b == 1) {
1261                 if (!lkb->lkb_lvbptr)
1262                         return;
1263
1264                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1265                         return;
1266
1267                 if (!r->res_lvbptr)
1268                         return;
1269
1270                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1271                 lkb->lkb_lvbseq = r->res_lvbseq;
1272
1273         } else if (b == 0) {
1274                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1275                         rsb_set_flag(r, RSB_VALNOTVALID);
1276                         return;
1277                 }
1278
1279                 if (!lkb->lkb_lvbptr)
1280                         return;
1281
1282                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1283                         return;
1284
1285                 if (!r->res_lvbptr)
1286                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1287
1288                 if (!r->res_lvbptr)
1289                         return;
1290
1291                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1292                 r->res_lvbseq++;
1293                 lkb->lkb_lvbseq = r->res_lvbseq;
1294                 rsb_clear_flag(r, RSB_VALNOTVALID);
1295         }
1296
1297         if (rsb_flag(r, RSB_VALNOTVALID))
1298                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1299 }
1300
1301 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302 {
1303         if (lkb->lkb_grmode < DLM_LOCK_PW)
1304                 return;
1305
1306         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1307                 rsb_set_flag(r, RSB_VALNOTVALID);
1308                 return;
1309         }
1310
1311         if (!lkb->lkb_lvbptr)
1312                 return;
1313
1314         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1315                 return;
1316
1317         if (!r->res_lvbptr)
1318                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1319
1320         if (!r->res_lvbptr)
1321                 return;
1322
1323         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1324         r->res_lvbseq++;
1325         rsb_clear_flag(r, RSB_VALNOTVALID);
1326 }
1327
1328 /* lkb is process copy (pc) */
1329
1330 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1331                             struct dlm_message *ms)
1332 {
1333         int b;
1334
1335         if (!lkb->lkb_lvbptr)
1336                 return;
1337
1338         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1339                 return;
1340
1341         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1342         if (b == 1) {
1343                 int len = receive_extralen(ms);
1344                 if (len > DLM_RESNAME_MAXLEN)
1345                         len = DLM_RESNAME_MAXLEN;
1346                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1347                 lkb->lkb_lvbseq = ms->m_lvbseq;
1348         }
1349 }
1350
1351 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1352    remove_lock -- used for unlock, removes lkb from granted
1353    revert_lock -- used for cancel, moves lkb from convert to granted
1354    grant_lock  -- used for request and convert, adds lkb to granted or
1355                   moves lkb from convert or waiting to granted
1356
1357    Each of these is used for master or local copy lkb's.  There is
1358    also a _pc() variation used to make the corresponding change on
1359    a process copy (pc) lkb. */
1360
1361 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1362 {
1363         del_lkb(r, lkb);
1364         lkb->lkb_grmode = DLM_LOCK_IV;
1365         /* this unhold undoes the original ref from create_lkb()
1366            so this leads to the lkb being freed */
1367         unhold_lkb(lkb);
1368 }
1369
1370 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1371 {
1372         set_lvb_unlock(r, lkb);
1373         _remove_lock(r, lkb);
1374 }
1375
1376 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377 {
1378         _remove_lock(r, lkb);
1379 }
1380
1381 /* returns: 0 did nothing
1382             1 moved lock to granted
1383            -1 removed lock */
1384
1385 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1386 {
1387         int rv = 0;
1388
1389         lkb->lkb_rqmode = DLM_LOCK_IV;
1390
1391         switch (lkb->lkb_status) {
1392         case DLM_LKSTS_GRANTED:
1393                 break;
1394         case DLM_LKSTS_CONVERT:
1395                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1396                 rv = 1;
1397                 break;
1398         case DLM_LKSTS_WAITING:
1399                 del_lkb(r, lkb);
1400                 lkb->lkb_grmode = DLM_LOCK_IV;
1401                 /* this unhold undoes the original ref from create_lkb()
1402                    so this leads to the lkb being freed */
1403                 unhold_lkb(lkb);
1404                 rv = -1;
1405                 break;
1406         default:
1407                 log_print("invalid status for revert %d", lkb->lkb_status);
1408         }
1409         return rv;
1410 }
1411
1412 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1413 {
1414         return revert_lock(r, lkb);
1415 }
1416
1417 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1418 {
1419         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1420                 lkb->lkb_grmode = lkb->lkb_rqmode;
1421                 if (lkb->lkb_status)
1422                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1423                 else
1424                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1425         }
1426
1427         lkb->lkb_rqmode = DLM_LOCK_IV;
1428 }
1429
1430 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1431 {
1432         set_lvb_lock(r, lkb);
1433         _grant_lock(r, lkb);
1434         lkb->lkb_highbast = 0;
1435 }
1436
1437 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1438                           struct dlm_message *ms)
1439 {
1440         set_lvb_lock_pc(r, lkb, ms);
1441         _grant_lock(r, lkb);
1442 }
1443
1444 /* called by grant_pending_locks() which means an async grant message must
1445    be sent to the requesting node in addition to granting the lock if the
1446    lkb belongs to a remote node. */
1447
1448 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1449 {
1450         grant_lock(r, lkb);
1451         if (is_master_copy(lkb))
1452                 send_grant(r, lkb);
1453         else
1454                 queue_cast(r, lkb, 0);
1455 }
1456
1457 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458    change the granted/requested modes.  We're munging things accordingly in
1459    the process copy.
1460    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1461    conversion deadlock
1462    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463    compatible with other granted locks */
1464
1465 static void munge_demoted(struct dlm_lkb *lkb)
1466 {
1467         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1468                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1469                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1470                 return;
1471         }
1472
1473         lkb->lkb_grmode = DLM_LOCK_NL;
1474 }
1475
1476 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1477 {
1478         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1479             ms->m_type != DLM_MSG_GRANT) {
1480                 log_print("munge_altmode %x invalid reply type %d",
1481                           lkb->lkb_id, ms->m_type);
1482                 return;
1483         }
1484
1485         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1486                 lkb->lkb_rqmode = DLM_LOCK_PR;
1487         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1488                 lkb->lkb_rqmode = DLM_LOCK_CW;
1489         else {
1490                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1491                 dlm_print_lkb(lkb);
1492         }
1493 }
1494
1495 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1496 {
1497         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1498                                            lkb_statequeue);
1499         if (lkb->lkb_id == first->lkb_id)
1500                 return 1;
1501
1502         return 0;
1503 }
1504
1505 /* Check if the given lkb conflicts with another lkb on the queue. */
1506
1507 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1508 {
1509         struct dlm_lkb *this;
1510
1511         list_for_each_entry(this, head, lkb_statequeue) {
1512                 if (this == lkb)
1513                         continue;
1514                 if (!modes_compat(this, lkb))
1515                         return 1;
1516         }
1517         return 0;
1518 }
1519
1520 /*
1521  * "A conversion deadlock arises with a pair of lock requests in the converting
1522  * queue for one resource.  The granted mode of each lock blocks the requested
1523  * mode of the other lock."
1524  *
1525  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1526  * convert queue from being granted, then deadlk/demote lkb.
1527  *
1528  * Example:
1529  * Granted Queue: empty
1530  * Convert Queue: NL->EX (first lock)
1531  *                PR->EX (second lock)
1532  *
1533  * The first lock can't be granted because of the granted mode of the second
1534  * lock and the second lock can't be granted because it's not first in the
1535  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1536  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1537  * flag set and return DEMOTED in the lksb flags.
1538  *
1539  * Originally, this function detected conv-deadlk in a more limited scope:
1540  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1541  * - if lkb1 was the first entry in the queue (not just earlier), and was
1542  *   blocked by the granted mode of lkb2, and there was nothing on the
1543  *   granted queue preventing lkb1 from being granted immediately, i.e.
1544  *   lkb2 was the only thing preventing lkb1 from being granted.
1545  *
1546  * That second condition meant we'd only say there was conv-deadlk if
1547  * resolving it (by demotion) would lead to the first lock on the convert
1548  * queue being granted right away.  It allowed conversion deadlocks to exist
1549  * between locks on the convert queue while they couldn't be granted anyway.
1550  *
1551  * Now, we detect and take action on conversion deadlocks immediately when
1552  * they're created, even if they may not be immediately consequential.  If
1553  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1554  * mode that would prevent lkb1's conversion from being granted, we do a
1555  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1556  * I think this means that the lkb_is_ahead condition below should always
1557  * be zero, i.e. there will never be conv-deadlk between two locks that are
1558  * both already on the convert queue.
1559  */
1560
1561 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1562 {
1563         struct dlm_lkb *lkb1;
1564         int lkb_is_ahead = 0;
1565
1566         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1567                 if (lkb1 == lkb2) {
1568                         lkb_is_ahead = 1;
1569                         continue;
1570                 }
1571
1572                 if (!lkb_is_ahead) {
1573                         if (!modes_compat(lkb2, lkb1))
1574                                 return 1;
1575                 } else {
1576                         if (!modes_compat(lkb2, lkb1) &&
1577                             !modes_compat(lkb1, lkb2))
1578                                 return 1;
1579                 }
1580         }
1581         return 0;
1582 }
1583
1584 /*
1585  * Return 1 if the lock can be granted, 0 otherwise.
1586  * Also detect and resolve conversion deadlocks.
1587  *
1588  * lkb is the lock to be granted
1589  *
1590  * now is 1 if the function is being called in the context of the
1591  * immediate request, it is 0 if called later, after the lock has been
1592  * queued.
1593  *
1594  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1595  */
1596
1597 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1598 {
1599         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1600
1601         /*
1602          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1603          * a new request for a NL mode lock being blocked.
1604          *
1605          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1606          * request, then it would be granted.  In essence, the use of this flag
1607          * tells the Lock Manager to expedite theis request by not considering
1608          * what may be in the CONVERTING or WAITING queues...  As of this
1609          * writing, the EXPEDITE flag can be used only with new requests for NL
1610          * mode locks.  This flag is not valid for conversion requests.
1611          *
1612          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1613          * conversion or used with a non-NL requested mode.  We also know an
1614          * EXPEDITE request is always granted immediately, so now must always
1615          * be 1.  The full condition to grant an expedite request: (now &&
1616          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1617          * therefore be shortened to just checking the flag.
1618          */
1619
1620         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1621                 return 1;
1622
1623         /*
1624          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1625          * added to the remaining conditions.
1626          */
1627
1628         if (queue_conflict(&r->res_grantqueue, lkb))
1629                 goto out;
1630
1631         /*
1632          * 6-3: By default, a conversion request is immediately granted if the
1633          * requested mode is compatible with the modes of all other granted
1634          * locks
1635          */
1636
1637         if (queue_conflict(&r->res_convertqueue, lkb))
1638                 goto out;
1639
1640         /*
1641          * 6-5: But the default algorithm for deciding whether to grant or
1642          * queue conversion requests does not by itself guarantee that such
1643          * requests are serviced on a "first come first serve" basis.  This, in
1644          * turn, can lead to a phenomenon known as "indefinate postponement".
1645          *
1646          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1647          * the system service employed to request a lock conversion.  This flag
1648          * forces certain conversion requests to be queued, even if they are
1649          * compatible with the granted modes of other locks on the same
1650          * resource.  Thus, the use of this flag results in conversion requests
1651          * being ordered on a "first come first servce" basis.
1652          *
1653          * DCT: This condition is all about new conversions being able to occur
1654          * "in place" while the lock remains on the granted queue (assuming
1655          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1656          * doesn't _have_ to go onto the convert queue where it's processed in
1657          * order.  The "now" variable is necessary to distinguish converts
1658          * being received and processed for the first time now, because once a
1659          * convert is moved to the conversion queue the condition below applies
1660          * requiring fifo granting.
1661          */
1662
1663         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1664                 return 1;
1665
1666         /*
1667          * The NOORDER flag is set to avoid the standard vms rules on grant
1668          * order.
1669          */
1670
1671         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1672                 return 1;
1673
1674         /*
1675          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1676          * granted until all other conversion requests ahead of it are granted
1677          * and/or canceled.
1678          */
1679
1680         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1681                 return 1;
1682
1683         /*
1684          * 6-4: By default, a new request is immediately granted only if all
1685          * three of the following conditions are satisfied when the request is
1686          * issued:
1687          * - The queue of ungranted conversion requests for the resource is
1688          *   empty.
1689          * - The queue of ungranted new requests for the resource is empty.
1690          * - The mode of the new request is compatible with the most
1691          *   restrictive mode of all granted locks on the resource.
1692          */
1693
1694         if (now && !conv && list_empty(&r->res_convertqueue) &&
1695             list_empty(&r->res_waitqueue))
1696                 return 1;
1697
1698         /*
1699          * 6-4: Once a lock request is in the queue of ungranted new requests,
1700          * it cannot be granted until the queue of ungranted conversion
1701          * requests is empty, all ungranted new requests ahead of it are
1702          * granted and/or canceled, and it is compatible with the granted mode
1703          * of the most restrictive lock granted on the resource.
1704          */
1705
1706         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1707             first_in_list(lkb, &r->res_waitqueue))
1708                 return 1;
1709  out:
1710         return 0;
1711 }
1712
1713 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1714                           int *err)
1715 {
1716         int rv;
1717         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1718         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1719
1720         if (err)
1721                 *err = 0;
1722
1723         rv = _can_be_granted(r, lkb, now);
1724         if (rv)
1725                 goto out;
1726
1727         /*
1728          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1729          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1730          * cancels one of the locks.
1731          */
1732
1733         if (is_convert && can_be_queued(lkb) &&
1734             conversion_deadlock_detect(r, lkb)) {
1735                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1736                         lkb->lkb_grmode = DLM_LOCK_NL;
1737                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1738                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1739                         if (err)
1740                                 *err = -EDEADLK;
1741                         else {
1742                                 log_print("can_be_granted deadlock %x now %d",
1743                                           lkb->lkb_id, now);
1744                                 dlm_dump_rsb(r);
1745                         }
1746                 }
1747                 goto out;
1748         }
1749
1750         /*
1751          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1752          * to grant a request in a mode other than the normal rqmode.  It's a
1753          * simple way to provide a big optimization to applications that can
1754          * use them.
1755          */
1756
1757         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1758                 alt = DLM_LOCK_PR;
1759         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1760                 alt = DLM_LOCK_CW;
1761
1762         if (alt) {
1763                 lkb->lkb_rqmode = alt;
1764                 rv = _can_be_granted(r, lkb, now);
1765                 if (rv)
1766                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1767                 else
1768                         lkb->lkb_rqmode = rqmode;
1769         }
1770  out:
1771         return rv;
1772 }
1773
1774 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1775    for locks pending on the convert list.  Once verified (watch for these
1776    log_prints), we should be able to just call _can_be_granted() and not
1777    bother with the demote/deadlk cases here (and there's no easy way to deal
1778    with a deadlk here, we'd have to generate something like grant_lock with
1779    the deadlk error.) */
1780
1781 /* Returns the highest requested mode of all blocked conversions; sets
1782    cw if there's a blocked conversion to DLM_LOCK_CW. */
1783
1784 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1785 {
1786         struct dlm_lkb *lkb, *s;
1787         int hi, demoted, quit, grant_restart, demote_restart;
1788         int deadlk;
1789
1790         quit = 0;
1791  restart:
1792         grant_restart = 0;
1793         demote_restart = 0;
1794         hi = DLM_LOCK_IV;
1795
1796         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1797                 demoted = is_demoted(lkb);
1798                 deadlk = 0;
1799
1800                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1801                         grant_lock_pending(r, lkb);
1802                         grant_restart = 1;
1803                         continue;
1804                 }
1805
1806                 if (!demoted && is_demoted(lkb)) {
1807                         log_print("WARN: pending demoted %x node %d %s",
1808                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1809                         demote_restart = 1;
1810                         continue;
1811                 }
1812
1813                 if (deadlk) {
1814                         log_print("WARN: pending deadlock %x node %d %s",
1815                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1816                         dlm_dump_rsb(r);
1817                         continue;
1818                 }
1819
1820                 hi = max_t(int, lkb->lkb_rqmode, hi);
1821
1822                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1823                         *cw = 1;
1824         }
1825
1826         if (grant_restart)
1827                 goto restart;
1828         if (demote_restart && !quit) {
1829                 quit = 1;
1830                 goto restart;
1831         }
1832
1833         return max_t(int, high, hi);
1834 }
1835
1836 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1837 {
1838         struct dlm_lkb *lkb, *s;
1839
1840         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1841                 if (can_be_granted(r, lkb, 0, NULL))
1842                         grant_lock_pending(r, lkb);
1843                 else {
1844                         high = max_t(int, lkb->lkb_rqmode, high);
1845                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1846                                 *cw = 1;
1847                 }
1848         }
1849
1850         return high;
1851 }
1852
1853 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1854    on either the convert or waiting queue.
1855    high is the largest rqmode of all locks blocked on the convert or
1856    waiting queue. */
1857
1858 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1859 {
1860         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1861                 if (gr->lkb_highbast < DLM_LOCK_EX)
1862                         return 1;
1863                 return 0;
1864         }
1865
1866         if (gr->lkb_highbast < high &&
1867             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1868                 return 1;
1869         return 0;
1870 }
1871
1872 static void grant_pending_locks(struct dlm_rsb *r)
1873 {
1874         struct dlm_lkb *lkb, *s;
1875         int high = DLM_LOCK_IV;
1876         int cw = 0;
1877
1878         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1879
1880         high = grant_pending_convert(r, high, &cw);
1881         high = grant_pending_wait(r, high, &cw);
1882
1883         if (high == DLM_LOCK_IV)
1884                 return;
1885
1886         /*
1887          * If there are locks left on the wait/convert queue then send blocking
1888          * ASTs to granted locks based on the largest requested mode (high)
1889          * found above.
1890          */
1891
1892         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1893                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1894                         if (cw && high == DLM_LOCK_PR &&
1895                             lkb->lkb_grmode == DLM_LOCK_PR)
1896                                 queue_bast(r, lkb, DLM_LOCK_CW);
1897                         else
1898                                 queue_bast(r, lkb, high);
1899                         lkb->lkb_highbast = high;
1900                 }
1901         }
1902 }
1903
1904 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1905 {
1906         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1907             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1908                 if (gr->lkb_highbast < DLM_LOCK_EX)
1909                         return 1;
1910                 return 0;
1911         }
1912
1913         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1914                 return 1;
1915         return 0;
1916 }
1917
1918 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1919                             struct dlm_lkb *lkb)
1920 {
1921         struct dlm_lkb *gr;
1922
1923         list_for_each_entry(gr, head, lkb_statequeue) {
1924                 /* skip self when sending basts to convertqueue */
1925                 if (gr == lkb)
1926                         continue;
1927                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1928                         queue_bast(r, gr, lkb->lkb_rqmode);
1929                         gr->lkb_highbast = lkb->lkb_rqmode;
1930                 }
1931         }
1932 }
1933
1934 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1935 {
1936         send_bast_queue(r, &r->res_grantqueue, lkb);
1937 }
1938
1939 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940 {
1941         send_bast_queue(r, &r->res_grantqueue, lkb);
1942         send_bast_queue(r, &r->res_convertqueue, lkb);
1943 }
1944
1945 /* set_master(r, lkb) -- set the master nodeid of a resource
1946
1947    The purpose of this function is to set the nodeid field in the given
1948    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1949    known, it can just be copied to the lkb and the function will return
1950    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1951    before it can be copied to the lkb.
1952
1953    When the rsb nodeid is being looked up remotely, the initial lkb
1954    causing the lookup is kept on the ls_waiters list waiting for the
1955    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1956    on the rsb's res_lookup list until the master is verified.
1957
1958    Return values:
1959    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1960    1: the rsb master is not available and the lkb has been placed on
1961       a wait queue
1962 */
1963
1964 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965 {
1966         struct dlm_ls *ls = r->res_ls;
1967         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1968
1969         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1970                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1971                 r->res_first_lkid = lkb->lkb_id;
1972                 lkb->lkb_nodeid = r->res_nodeid;
1973                 return 0;
1974         }
1975
1976         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1977                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1978                 return 1;
1979         }
1980
1981         if (r->res_nodeid == 0) {
1982                 lkb->lkb_nodeid = 0;
1983                 return 0;
1984         }
1985
1986         if (r->res_nodeid > 0) {
1987                 lkb->lkb_nodeid = r->res_nodeid;
1988                 return 0;
1989         }
1990
1991         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1992
1993         dir_nodeid = dlm_dir_nodeid(r);
1994
1995         if (dir_nodeid != our_nodeid) {
1996                 r->res_first_lkid = lkb->lkb_id;
1997                 send_lookup(r, lkb);
1998                 return 1;
1999         }
2000
2001         for (i = 0; i < 2; i++) {
2002                 /* It's possible for dlm_scand to remove an old rsb for
2003                    this same resource from the toss list, us to create
2004                    a new one, look up the master locally, and find it
2005                    already exists just before dlm_scand does the
2006                    dir_remove() on the previous rsb. */
2007
2008                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2009                                        r->res_length, &ret_nodeid);
2010                 if (!error)
2011                         break;
2012                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2013                 schedule();
2014         }
2015         if (error && error != -EEXIST)
2016                 return error;
2017
2018         if (ret_nodeid == our_nodeid) {
2019                 r->res_first_lkid = 0;
2020                 r->res_nodeid = 0;
2021                 lkb->lkb_nodeid = 0;
2022         } else {
2023                 r->res_first_lkid = lkb->lkb_id;
2024                 r->res_nodeid = ret_nodeid;
2025                 lkb->lkb_nodeid = ret_nodeid;
2026         }
2027         return 0;
2028 }
2029
2030 static void process_lookup_list(struct dlm_rsb *r)
2031 {
2032         struct dlm_lkb *lkb, *safe;
2033
2034         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2035                 list_del_init(&lkb->lkb_rsb_lookup);
2036                 _request_lock(r, lkb);
2037                 schedule();
2038         }
2039 }
2040
2041 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2042
2043 static void confirm_master(struct dlm_rsb *r, int error)
2044 {
2045         struct dlm_lkb *lkb;
2046
2047         if (!r->res_first_lkid)
2048                 return;
2049
2050         switch (error) {
2051         case 0:
2052         case -EINPROGRESS:
2053                 r->res_first_lkid = 0;
2054                 process_lookup_list(r);
2055                 break;
2056
2057         case -EAGAIN:
2058         case -EBADR:
2059         case -ENOTBLK:
2060                 /* the remote request failed and won't be retried (it was
2061                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2062                    lkb the first_lkid */
2063
2064                 r->res_first_lkid = 0;
2065
2066                 if (!list_empty(&r->res_lookup)) {
2067                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2068                                          lkb_rsb_lookup);
2069                         list_del_init(&lkb->lkb_rsb_lookup);
2070                         r->res_first_lkid = lkb->lkb_id;
2071                         _request_lock(r, lkb);
2072                 }
2073                 break;
2074
2075         default:
2076                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2077         }
2078 }
2079
2080 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2081                          int namelen, unsigned long timeout_cs,
2082                          void (*ast) (void *astparam),
2083                          void *astparam,
2084                          void (*bast) (void *astparam, int mode),
2085                          struct dlm_args *args)
2086 {
2087         int rv = -EINVAL;
2088
2089         /* check for invalid arg usage */
2090
2091         if (mode < 0 || mode > DLM_LOCK_EX)
2092                 goto out;
2093
2094         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2095                 goto out;
2096
2097         if (flags & DLM_LKF_CANCEL)
2098                 goto out;
2099
2100         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2101                 goto out;
2102
2103         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2104                 goto out;
2105
2106         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2107                 goto out;
2108
2109         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2110                 goto out;
2111
2112         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2113                 goto out;
2114
2115         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2116                 goto out;
2117
2118         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2119                 goto out;
2120
2121         if (!ast || !lksb)
2122                 goto out;
2123
2124         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2125                 goto out;
2126
2127         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2128                 goto out;
2129
2130         /* these args will be copied to the lkb in validate_lock_args,
2131            it cannot be done now because when converting locks, fields in
2132            an active lkb cannot be modified before locking the rsb */
2133
2134         args->flags = flags;
2135         args->astfn = ast;
2136         args->astparam = astparam;
2137         args->bastfn = bast;
2138         args->timeout = timeout_cs;
2139         args->mode = mode;
2140         args->lksb = lksb;
2141         rv = 0;
2142  out:
2143         return rv;
2144 }
2145
2146 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2147 {
2148         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2149                       DLM_LKF_FORCEUNLOCK))
2150                 return -EINVAL;
2151
2152         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2153                 return -EINVAL;
2154
2155         args->flags = flags;
2156         args->astparam = astarg;
2157         return 0;
2158 }
2159
2160 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2161                               struct dlm_args *args)
2162 {
2163         int rv = -EINVAL;
2164
2165         if (args->flags & DLM_LKF_CONVERT) {
2166                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2167                         goto out;
2168
2169                 if (args->flags & DLM_LKF_QUECVT &&
2170                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2171                         goto out;
2172
2173                 rv = -EBUSY;
2174                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2175                         goto out;
2176
2177                 if (lkb->lkb_wait_type)
2178                         goto out;
2179
2180                 if (is_overlap(lkb))
2181                         goto out;
2182         }
2183
2184         lkb->lkb_exflags = args->flags;
2185         lkb->lkb_sbflags = 0;
2186         lkb->lkb_astfn = args->astfn;
2187         lkb->lkb_astparam = args->astparam;
2188         lkb->lkb_bastfn = args->bastfn;
2189         lkb->lkb_rqmode = args->mode;
2190         lkb->lkb_lksb = args->lksb;
2191         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2192         lkb->lkb_ownpid = (int) current->pid;
2193         lkb->lkb_timeout_cs = args->timeout;
2194         rv = 0;
2195  out:
2196         if (rv)
2197                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2198                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2199                           lkb->lkb_status, lkb->lkb_wait_type,
2200                           lkb->lkb_resource->res_name);
2201         return rv;
2202 }
2203
2204 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2205    for success */
2206
2207 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2208    because there may be a lookup in progress and it's valid to do
2209    cancel/unlockf on it */
2210
2211 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2212 {
2213         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2214         int rv = -EINVAL;
2215
2216         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2217                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2218                 dlm_print_lkb(lkb);
2219                 goto out;
2220         }
2221
2222         /* an lkb may still exist even though the lock is EOL'ed due to a
2223            cancel, unlock or failed noqueue request; an app can't use these
2224            locks; return same error as if the lkid had not been found at all */
2225
2226         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2227                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2228                 rv = -ENOENT;
2229                 goto out;
2230         }
2231
2232         /* an lkb may be waiting for an rsb lookup to complete where the
2233            lookup was initiated by another lock */
2234
2235         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2236                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2237                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2238                         list_del_init(&lkb->lkb_rsb_lookup);
2239                         queue_cast(lkb->lkb_resource, lkb,
2240                                    args->flags & DLM_LKF_CANCEL ?
2241                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2242                         unhold_lkb(lkb); /* undoes create_lkb() */
2243                 }
2244                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2245                 rv = -EBUSY;
2246                 goto out;
2247         }
2248
2249         /* cancel not allowed with another cancel/unlock in progress */
2250
2251         if (args->flags & DLM_LKF_CANCEL) {
2252                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2253                         goto out;
2254
2255                 if (is_overlap(lkb))
2256                         goto out;
2257
2258                 /* don't let scand try to do a cancel */
2259                 del_timeout(lkb);
2260
2261                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2262                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2263                         rv = -EBUSY;
2264                         goto out;
2265                 }
2266
2267                 /* there's nothing to cancel */
2268                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2269                     !lkb->lkb_wait_type) {
2270                         rv = -EBUSY;
2271                         goto out;
2272                 }
2273
2274                 switch (lkb->lkb_wait_type) {
2275                 case DLM_MSG_LOOKUP:
2276                 case DLM_MSG_REQUEST:
2277                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2278                         rv = -EBUSY;
2279                         goto out;
2280                 case DLM_MSG_UNLOCK:
2281                 case DLM_MSG_CANCEL:
2282                         goto out;
2283                 }
2284                 /* add_to_waiters() will set OVERLAP_CANCEL */
2285                 goto out_ok;
2286         }
2287
2288         /* do we need to allow a force-unlock if there's a normal unlock
2289            already in progress?  in what conditions could the normal unlock
2290            fail such that we'd want to send a force-unlock to be sure? */
2291
2292         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2293                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2294                         goto out;
2295
2296                 if (is_overlap_unlock(lkb))
2297                         goto out;
2298
2299                 /* don't let scand try to do a cancel */
2300                 del_timeout(lkb);
2301
2302                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2303                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2304                         rv = -EBUSY;
2305                         goto out;
2306                 }
2307
2308                 switch (lkb->lkb_wait_type) {
2309                 case DLM_MSG_LOOKUP:
2310                 case DLM_MSG_REQUEST:
2311                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2312                         rv = -EBUSY;
2313                         goto out;
2314                 case DLM_MSG_UNLOCK:
2315                         goto out;
2316                 }
2317                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2318                 goto out_ok;
2319         }
2320
2321         /* normal unlock not allowed if there's any op in progress */
2322         rv = -EBUSY;
2323         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2324                 goto out;
2325
2326  out_ok:
2327         /* an overlapping op shouldn't blow away exflags from other op */
2328         lkb->lkb_exflags |= args->flags;
2329         lkb->lkb_sbflags = 0;
2330         lkb->lkb_astparam = args->astparam;
2331         rv = 0;
2332  out:
2333         if (rv)
2334                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2335                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2336                           args->flags, lkb->lkb_wait_type,
2337                           lkb->lkb_resource->res_name);
2338         return rv;
2339 }
2340
2341 /*
2342  * Four stage 4 varieties:
2343  * do_request(), do_convert(), do_unlock(), do_cancel()
2344  * These are called on the master node for the given lock and
2345  * from the central locking logic.
2346  */
2347
2348 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2349 {
2350         int error = 0;
2351
2352         if (can_be_granted(r, lkb, 1, NULL)) {
2353                 grant_lock(r, lkb);
2354                 queue_cast(r, lkb, 0);
2355                 goto out;
2356         }
2357
2358         if (can_be_queued(lkb)) {
2359                 error = -EINPROGRESS;
2360                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2361                 add_timeout(lkb);
2362                 goto out;
2363         }
2364
2365         error = -EAGAIN;
2366         queue_cast(r, lkb, -EAGAIN);
2367  out:
2368         return error;
2369 }
2370
2371 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2372                                int error)
2373 {
2374         switch (error) {
2375         case -EAGAIN:
2376                 if (force_blocking_asts(lkb))
2377                         send_blocking_asts_all(r, lkb);
2378                 break;
2379         case -EINPROGRESS:
2380                 send_blocking_asts(r, lkb);
2381                 break;
2382         }
2383 }
2384
2385 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387         int error = 0;
2388         int deadlk = 0;
2389
2390         /* changing an existing lock may allow others to be granted */
2391
2392         if (can_be_granted(r, lkb, 1, &deadlk)) {
2393                 grant_lock(r, lkb);
2394                 queue_cast(r, lkb, 0);
2395                 goto out;
2396         }
2397
2398         /* can_be_granted() detected that this lock would block in a conversion
2399            deadlock, so we leave it on the granted queue and return EDEADLK in
2400            the ast for the convert. */
2401
2402         if (deadlk) {
2403                 /* it's left on the granted queue */
2404                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2405                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2406                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2407                 revert_lock(r, lkb);
2408                 queue_cast(r, lkb, -EDEADLK);
2409                 error = -EDEADLK;
2410                 goto out;
2411         }
2412
2413         /* is_demoted() means the can_be_granted() above set the grmode
2414            to NL, and left us on the granted queue.  This auto-demotion
2415            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2416            now grantable.  We have to try to grant other converting locks
2417            before we try again to grant this one. */
2418
2419         if (is_demoted(lkb)) {
2420                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2421                 if (_can_be_granted(r, lkb, 1)) {
2422                         grant_lock(r, lkb);
2423                         queue_cast(r, lkb, 0);
2424                         goto out;
2425                 }
2426                 /* else fall through and move to convert queue */
2427         }
2428
2429         if (can_be_queued(lkb)) {
2430                 error = -EINPROGRESS;
2431                 del_lkb(r, lkb);
2432                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2433                 add_timeout(lkb);
2434                 goto out;
2435         }
2436
2437         error = -EAGAIN;
2438         queue_cast(r, lkb, -EAGAIN);
2439  out:
2440         return error;
2441 }
2442
2443 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2444                                int error)
2445 {
2446         switch (error) {
2447         case 0:
2448                 grant_pending_locks(r);
2449                 /* grant_pending_locks also sends basts */
2450                 break;
2451         case -EAGAIN:
2452                 if (force_blocking_asts(lkb))
2453                         send_blocking_asts_all(r, lkb);
2454                 break;
2455         case -EINPROGRESS:
2456                 send_blocking_asts(r, lkb);
2457                 break;
2458         }
2459 }
2460
2461 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2462 {
2463         remove_lock(r, lkb);
2464         queue_cast(r, lkb, -DLM_EUNLOCK);
2465         return -DLM_EUNLOCK;
2466 }
2467
2468 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2469                               int error)
2470 {
2471         grant_pending_locks(r);
2472 }
2473
2474 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2475  
2476 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2477 {
2478         int error;
2479
2480         error = revert_lock(r, lkb);
2481         if (error) {
2482                 queue_cast(r, lkb, -DLM_ECANCEL);
2483                 return -DLM_ECANCEL;
2484         }
2485         return 0;
2486 }
2487
2488 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2489                               int error)
2490 {
2491         if (error)
2492                 grant_pending_locks(r);
2493 }
2494
2495 /*
2496  * Four stage 3 varieties:
2497  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2498  */
2499
2500 /* add a new lkb to a possibly new rsb, called by requesting process */
2501
2502 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2503 {
2504         int error;
2505
2506         /* set_master: sets lkb nodeid from r */
2507
2508         error = set_master(r, lkb);
2509         if (error < 0)
2510                 goto out;
2511         if (error) {
2512                 error = 0;
2513                 goto out;
2514         }
2515
2516         if (is_remote(r)) {
2517                 /* receive_request() calls do_request() on remote node */
2518                 error = send_request(r, lkb);
2519         } else {
2520                 error = do_request(r, lkb);
2521                 /* for remote locks the request_reply is sent
2522                    between do_request and do_request_effects */
2523                 do_request_effects(r, lkb, error);
2524         }
2525  out:
2526         return error;
2527 }
2528
2529 /* change some property of an existing lkb, e.g. mode */
2530
2531 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 {
2533         int error;
2534
2535         if (is_remote(r)) {
2536                 /* receive_convert() calls do_convert() on remote node */
2537                 error = send_convert(r, lkb);
2538         } else {
2539                 error = do_convert(r, lkb);
2540                 /* for remote locks the convert_reply is sent
2541                    between do_convert and do_convert_effects */
2542                 do_convert_effects(r, lkb, error);
2543         }
2544
2545         return error;
2546 }
2547
2548 /* remove an existing lkb from the granted queue */
2549
2550 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552         int error;
2553
2554         if (is_remote(r)) {
2555                 /* receive_unlock() calls do_unlock() on remote node */
2556                 error = send_unlock(r, lkb);
2557         } else {
2558                 error = do_unlock(r, lkb);
2559                 /* for remote locks the unlock_reply is sent
2560                    between do_unlock and do_unlock_effects */
2561                 do_unlock_effects(r, lkb, error);
2562         }
2563
2564         return error;
2565 }
2566
2567 /* remove an existing lkb from the convert or wait queue */
2568
2569 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571         int error;
2572
2573         if (is_remote(r)) {
2574                 /* receive_cancel() calls do_cancel() on remote node */
2575                 error = send_cancel(r, lkb);
2576         } else {
2577                 error = do_cancel(r, lkb);
2578                 /* for remote locks the cancel_reply is sent
2579                    between do_cancel and do_cancel_effects */
2580                 do_cancel_effects(r, lkb, error);
2581         }
2582
2583         return error;
2584 }
2585
2586 /*
2587  * Four stage 2 varieties:
2588  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2589  */
2590
2591 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2592                         int len, struct dlm_args *args)
2593 {
2594         struct dlm_rsb *r;
2595         int error;
2596
2597         error = validate_lock_args(ls, lkb, args);
2598         if (error)
2599                 goto out;
2600
2601         error = find_rsb(ls, name, len, R_CREATE, &r);
2602         if (error)
2603                 goto out;
2604
2605         lock_rsb(r);
2606
2607         attach_lkb(r, lkb);
2608         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2609
2610         error = _request_lock(r, lkb);
2611
2612         unlock_rsb(r);
2613         put_rsb(r);
2614
2615  out:
2616         return error;
2617 }
2618
2619 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2620                         struct dlm_args *args)
2621 {
2622         struct dlm_rsb *r;
2623         int error;
2624
2625         r = lkb->lkb_resource;
2626
2627         hold_rsb(r);
2628         lock_rsb(r);
2629
2630         error = validate_lock_args(ls, lkb, args);
2631         if (error)
2632                 goto out;
2633
2634         error = _convert_lock(r, lkb);
2635  out:
2636         unlock_rsb(r);
2637         put_rsb(r);
2638         return error;
2639 }
2640
2641 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2642                        struct dlm_args *args)
2643 {
2644         struct dlm_rsb *r;
2645         int error;
2646
2647         r = lkb->lkb_resource;
2648
2649         hold_rsb(r);
2650         lock_rsb(r);
2651
2652         error = validate_unlock_args(lkb, args);
2653         if (error)
2654                 goto out;
2655
2656         error = _unlock_lock(r, lkb);
2657  out:
2658         unlock_rsb(r);
2659         put_rsb(r);
2660         return error;
2661 }
2662
2663 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2664                        struct dlm_args *args)
2665 {
2666         struct dlm_rsb *r;
2667         int error;
2668
2669         r = lkb->lkb_resource;
2670
2671         hold_rsb(r);
2672         lock_rsb(r);
2673
2674         error = validate_unlock_args(lkb, args);
2675         if (error)
2676                 goto out;
2677
2678         error = _cancel_lock(r, lkb);
2679  out:
2680         unlock_rsb(r);
2681         put_rsb(r);
2682         return error;
2683 }
2684
2685 /*
2686  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2687  */
2688
2689 int dlm_lock(dlm_lockspace_t *lockspace,
2690              int mode,
2691              struct dlm_lksb *lksb,
2692              uint32_t flags,
2693              void *name,
2694              unsigned int namelen,
2695              uint32_t parent_lkid,
2696              void (*ast) (void *astarg),
2697              void *astarg,
2698              void (*bast) (void *astarg, int mode))
2699 {
2700         struct dlm_ls *ls;
2701         struct dlm_lkb *lkb;
2702         struct dlm_args args;
2703         int error, convert = flags & DLM_LKF_CONVERT;
2704
2705         ls = dlm_find_lockspace_local(lockspace);
2706         if (!ls)
2707                 return -EINVAL;
2708
2709         dlm_lock_recovery(ls);
2710
2711         if (convert)
2712                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2713         else
2714                 error = create_lkb(ls, &lkb);
2715
2716         if (error)
2717                 goto out;
2718
2719         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2720                               astarg, bast, &args);
2721         if (error)
2722                 goto out_put;
2723
2724         if (convert)
2725                 error = convert_lock(ls, lkb, &args);
2726         else
2727                 error = request_lock(ls, lkb, name, namelen, &args);
2728
2729         if (error == -EINPROGRESS)
2730                 error = 0;
2731  out_put:
2732         if (convert || error)
2733                 __put_lkb(ls, lkb);
2734         if (error == -EAGAIN || error == -EDEADLK)
2735                 error = 0;
2736  out:
2737         dlm_unlock_recovery(ls);
2738         dlm_put_lockspace(ls);
2739         return error;
2740 }
2741
2742 int dlm_unlock(dlm_lockspace_t *lockspace,
2743                uint32_t lkid,
2744                uint32_t flags,
2745                struct dlm_lksb *lksb,
2746                void *astarg)
2747 {
2748         struct dlm_ls *ls;
2749         struct dlm_lkb *lkb;
2750         struct dlm_args args;
2751         int error;
2752
2753         ls = dlm_find_lockspace_local(lockspace);
2754         if (!ls)
2755                 return -EINVAL;
2756
2757         dlm_lock_recovery(ls);
2758
2759         error = find_lkb(ls, lkid, &lkb);
2760         if (error)
2761                 goto out;
2762
2763         error = set_unlock_args(flags, astarg, &args);
2764         if (error)
2765                 goto out_put;
2766
2767         if (flags & DLM_LKF_CANCEL)
2768                 error = cancel_lock(ls, lkb, &args);
2769         else
2770                 error = unlock_lock(ls, lkb, &args);
2771
2772         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2773                 error = 0;
2774         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2775                 error = 0;
2776  out_put:
2777         dlm_put_lkb(lkb);
2778  out:
2779         dlm_unlock_recovery(ls);
2780         dlm_put_lockspace(ls);
2781         return error;
2782 }
2783
2784 /*
2785  * send/receive routines for remote operations and replies
2786  *
2787  * send_args
2788  * send_common
2789  * send_request                 receive_request
2790  * send_convert                 receive_convert
2791  * send_unlock                  receive_unlock
2792  * send_cancel                  receive_cancel
2793  * send_grant                   receive_grant
2794  * send_bast                    receive_bast
2795  * send_lookup                  receive_lookup
2796  * send_remove                  receive_remove
2797  *
2798  *                              send_common_reply
2799  * receive_request_reply        send_request_reply
2800  * receive_convert_reply        send_convert_reply
2801  * receive_unlock_reply         send_unlock_reply
2802  * receive_cancel_reply         send_cancel_reply
2803  * receive_lookup_reply         send_lookup_reply
2804  */
2805
2806 static int _create_message(struct dlm_ls *ls, int mb_len,
2807                            int to_nodeid, int mstype,
2808                            struct dlm_message **ms_ret,
2809                            struct dlm_mhandle **mh_ret)
2810 {
2811         struct dlm_message *ms;
2812         struct dlm_mhandle *mh;
2813         char *mb;
2814
2815         /* get_buffer gives us a message handle (mh) that we need to
2816            pass into lowcomms_commit and a message buffer (mb) that we
2817            write our data into */
2818
2819         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2820         if (!mh)
2821                 return -ENOBUFS;
2822
2823         memset(mb, 0, mb_len);
2824
2825         ms = (struct dlm_message *) mb;
2826
2827         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2828         ms->m_header.h_lockspace = ls->ls_global_id;
2829         ms->m_header.h_nodeid = dlm_our_nodeid();
2830         ms->m_header.h_length = mb_len;
2831         ms->m_header.h_cmd = DLM_MSG;
2832
2833         ms->m_type = mstype;
2834
2835         *mh_ret = mh;
2836         *ms_ret = ms;
2837         return 0;
2838 }
2839
2840 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2841                           int to_nodeid, int mstype,
2842                           struct dlm_message **ms_ret,
2843                           struct dlm_mhandle **mh_ret)
2844 {
2845         int mb_len = sizeof(struct dlm_message);
2846
2847         switch (mstype) {
2848         case DLM_MSG_REQUEST:
2849         case DLM_MSG_LOOKUP:
2850         case DLM_MSG_REMOVE:
2851                 mb_len += r->res_length;
2852                 break;
2853         case DLM_MSG_CONVERT:
2854         case DLM_MSG_UNLOCK:
2855         case DLM_MSG_REQUEST_REPLY:
2856         case DLM_MSG_CONVERT_REPLY:
2857         case DLM_MSG_GRANT:
2858                 if (lkb && lkb->lkb_lvbptr)
2859                         mb_len += r->res_ls->ls_lvblen;
2860                 break;
2861         }
2862
2863         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2864                                ms_ret, mh_ret);
2865 }
2866
2867 /* further lowcomms enhancements or alternate implementations may make
2868    the return value from this function useful at some point */
2869
2870 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2871 {
2872         dlm_message_out(ms);
2873         dlm_lowcomms_commit_buffer(mh);
2874         return 0;
2875 }
2876
2877 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2878                       struct dlm_message *ms)
2879 {
2880         ms->m_nodeid   = lkb->lkb_nodeid;
2881         ms->m_pid      = lkb->lkb_ownpid;
2882         ms->m_lkid     = lkb->lkb_id;
2883         ms->m_remid    = lkb->lkb_remid;
2884         ms->m_exflags  = lkb->lkb_exflags;
2885         ms->m_sbflags  = lkb->lkb_sbflags;
2886         ms->m_flags    = lkb->lkb_flags;
2887         ms->m_lvbseq   = lkb->lkb_lvbseq;
2888         ms->m_status   = lkb->lkb_status;
2889         ms->m_grmode   = lkb->lkb_grmode;
2890         ms->m_rqmode   = lkb->lkb_rqmode;
2891         ms->m_hash     = r->res_hash;
2892
2893         /* m_result and m_bastmode are set from function args,
2894            not from lkb fields */
2895
2896         if (lkb->lkb_bastfn)
2897                 ms->m_asts |= DLM_CB_BAST;
2898         if (lkb->lkb_astfn)
2899                 ms->m_asts |= DLM_CB_CAST;
2900
2901         /* compare with switch in create_message; send_remove() doesn't
2902            use send_args() */
2903
2904         switch (ms->m_type) {
2905         case DLM_MSG_REQUEST:
2906         case DLM_MSG_LOOKUP:
2907                 memcpy(ms->m_extra, r->res_name, r->res_length);
2908                 break;
2909         case DLM_MSG_CONVERT:
2910         case DLM_MSG_UNLOCK:
2911         case DLM_MSG_REQUEST_REPLY:
2912         case DLM_MSG_CONVERT_REPLY:
2913         case DLM_MSG_GRANT:
2914                 if (!lkb->lkb_lvbptr)
2915                         break;
2916                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2917                 break;
2918         }
2919 }
2920
2921 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2922 {
2923         struct dlm_message *ms;
2924         struct dlm_mhandle *mh;
2925         int to_nodeid, error;
2926
2927         to_nodeid = r->res_nodeid;
2928
2929         error = add_to_waiters(lkb, mstype, to_nodeid);
2930         if (error)
2931                 return error;
2932
2933         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2934         if (error)
2935                 goto fail;
2936
2937         send_args(r, lkb, ms);
2938
2939         error = send_message(mh, ms);
2940         if (error)
2941                 goto fail;
2942         return 0;
2943
2944  fail:
2945         remove_from_waiters(lkb, msg_reply_type(mstype));
2946         return error;
2947 }
2948
2949 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2950 {
2951         return send_common(r, lkb, DLM_MSG_REQUEST);
2952 }
2953
2954 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2955 {
2956         int error;
2957
2958         error = send_common(r, lkb, DLM_MSG_CONVERT);
2959
2960         /* down conversions go without a reply from the master */
2961         if (!error && down_conversion(lkb)) {
2962                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2964                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2965                 r->res_ls->ls_stub_ms.m_result = 0;
2966                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2967         }
2968
2969         return error;
2970 }
2971
2972 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2973    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2974    that the master is still correct. */
2975
2976 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2977 {
2978         return send_common(r, lkb, DLM_MSG_UNLOCK);
2979 }
2980
2981 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2982 {
2983         return send_common(r, lkb, DLM_MSG_CANCEL);
2984 }
2985
2986 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2987 {
2988         struct dlm_message *ms;
2989         struct dlm_mhandle *mh;
2990         int to_nodeid, error;
2991
2992         to_nodeid = lkb->lkb_nodeid;
2993
2994         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2995         if (error)
2996                 goto out;
2997
2998         send_args(r, lkb, ms);
2999
3000         ms->m_result = 0;
3001
3002         error = send_message(mh, ms);
3003  out:
3004         return error;
3005 }
3006
3007 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3008 {
3009         struct dlm_message *ms;
3010         struct dlm_mhandle *mh;
3011         int to_nodeid, error;
3012
3013         to_nodeid = lkb->lkb_nodeid;
3014
3015         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3016         if (error)
3017                 goto out;
3018
3019         send_args(r, lkb, ms);
3020
3021         ms->m_bastmode = mode;
3022
3023         error = send_message(mh, ms);
3024  out:
3025         return error;
3026 }
3027
3028 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3029 {
3030         struct dlm_message *ms;
3031         struct dlm_mhandle *mh;
3032         int to_nodeid, error;
3033
3034         to_nodeid = dlm_dir_nodeid(r);
3035
3036         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3037         if (error)
3038                 return error;
3039
3040         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3041         if (error)
3042                 goto fail;
3043
3044         send_args(r, lkb, ms);
3045
3046         error = send_message(mh, ms);
3047         if (error)
3048                 goto fail;
3049         return 0;
3050
3051  fail:
3052         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3053         return error;
3054 }
3055
3056 static int send_remove(struct dlm_rsb *r)
3057 {
3058         struct dlm_message *ms;
3059         struct dlm_mhandle *mh;
3060         int to_nodeid, error;
3061
3062         to_nodeid = dlm_dir_nodeid(r);
3063
3064         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3065         if (error)
3066                 goto out;
3067
3068         memcpy(ms->m_extra, r->res_name, r->res_length);
3069         ms->m_hash = r->res_hash;
3070
3071         error = send_message(mh, ms);
3072  out:
3073         return error;
3074 }
3075
3076 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3077                              int mstype, int rv)
3078 {
3079         struct dlm_message *ms;
3080         struct dlm_mhandle *mh;
3081         int to_nodeid, error;
3082
3083         to_nodeid = lkb->lkb_nodeid;
3084
3085         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3086         if (error)
3087                 goto out;
3088
3089         send_args(r, lkb, ms);
3090
3091         ms->m_result = rv;
3092
3093         error = send_message(mh, ms);
3094  out:
3095         return error;
3096 }
3097
3098 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3099 {
3100         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3101 }
3102
3103 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3104 {
3105         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3106 }
3107
3108 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3109 {
3110         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3111 }
3112
3113 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3114 {
3115         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3116 }
3117
3118 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3119                              int ret_nodeid, int rv)
3120 {
3121         struct dlm_rsb *r = &ls->ls_stub_rsb;
3122         struct dlm_message *ms;
3123         struct dlm_mhandle *mh;
3124         int error, nodeid = ms_in->m_header.h_nodeid;
3125
3126         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3127         if (error)
3128                 goto out;
3129
3130         ms->m_lkid = ms_in->m_lkid;
3131         ms->m_result = rv;
3132         ms->m_nodeid = ret_nodeid;
3133
3134         error = send_message(mh, ms);
3135  out:
3136         return error;
3137 }
3138
3139 /* which args we save from a received message depends heavily on the type
3140    of message, unlike the send side where we can safely send everything about
3141    the lkb for any type of message */
3142
3143 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3144 {
3145         lkb->lkb_exflags = ms->m_exflags;
3146         lkb->lkb_sbflags = ms->m_sbflags;
3147         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3148                          (ms->m_flags & 0x0000FFFF);
3149 }
3150
3151 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3152 {
3153         if (ms->m_flags == DLM_IFL_STUB_MS)
3154                 return;
3155
3156         lkb->lkb_sbflags = ms->m_sbflags;
3157         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3158                          (ms->m_flags & 0x0000FFFF);
3159 }
3160
3161 static int receive_extralen(struct dlm_message *ms)
3162 {
3163         return (ms->m_header.h_length - sizeof(struct dlm_message));
3164 }
3165
3166 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3167                        struct dlm_message *ms)
3168 {
3169         int len;
3170
3171         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3172                 if (!lkb->lkb_lvbptr)
3173                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3174                 if (!lkb->lkb_lvbptr)
3175                         return -ENOMEM;
3176                 len = receive_extralen(ms);
3177                 if (len > DLM_RESNAME_MAXLEN)
3178                         len = DLM_RESNAME_MAXLEN;
3179                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3180         }
3181         return 0;
3182 }
3183
3184 static void fake_bastfn(void *astparam, int mode)
3185 {
3186         log_print("fake_bastfn should not be called");
3187 }
3188
3189 static void fake_astfn(void *astparam)
3190 {
3191         log_print("fake_astfn should not be called");
3192 }
3193
3194 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3195                                 struct dlm_message *ms)
3196 {
3197         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3198         lkb->lkb_ownpid = ms->m_pid;
3199         lkb->lkb_remid = ms->m_lkid;
3200         lkb->lkb_grmode = DLM_LOCK_IV;
3201         lkb->lkb_rqmode = ms->m_rqmode;
3202
3203         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3204         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3205
3206         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3207                 /* lkb was just created so there won't be an lvb yet */
3208                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3209                 if (!lkb->lkb_lvbptr)
3210                         return -ENOMEM;
3211         }
3212
3213         return 0;
3214 }
3215
3216 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3217                                 struct dlm_message *ms)
3218 {
3219         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3220                 return -EBUSY;
3221
3222         if (receive_lvb(ls, lkb, ms))
3223                 return -ENOMEM;
3224
3225         lkb->lkb_rqmode = ms->m_rqmode;
3226         lkb->lkb_lvbseq = ms->m_lvbseq;
3227
3228         return 0;
3229 }
3230
3231 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3232                                struct dlm_message *ms)
3233 {
3234         if (receive_lvb(ls, lkb, ms))
3235                 return -ENOMEM;
3236         return 0;
3237 }
3238
3239 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3240    uses to send a reply and that the remote end uses to process the reply. */
3241
3242 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3243 {
3244         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3245         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3246         lkb->lkb_remid = ms->m_lkid;
3247 }
3248
3249 /* This is called after the rsb is locked so that we can safely inspect
3250    fields in the lkb. */
3251
3252 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3253 {
3254         int from = ms->m_header.h_nodeid;
3255         int error = 0;
3256
3257         switch (ms->m_type) {
3258         case DLM_MSG_CONVERT:
3259         case DLM_MSG_UNLOCK:
3260         case DLM_MSG_CANCEL:
3261                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3262                         error = -EINVAL;
3263                 break;
3264
3265         case DLM_MSG_CONVERT_REPLY:
3266         case DLM_MSG_UNLOCK_REPLY:
3267         case DLM_MSG_CANCEL_REPLY:
3268         case DLM_MSG_GRANT:
3269         case DLM_MSG_BAST:
3270                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3271                         error = -EINVAL;
3272                 break;
3273
3274         case DLM_MSG_REQUEST_REPLY:
3275                 if (!is_process_copy(lkb))
3276                         error = -EINVAL;
3277                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3278                         error = -EINVAL;
3279                 break;
3280
3281         default:
3282                 error = -EINVAL;
3283         }
3284
3285         if (error)
3286                 log_error(lkb->lkb_resource->res_ls,
3287                           "ignore invalid message %d from %d %x %x %x %d",
3288                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3289                           lkb->lkb_flags, lkb->lkb_nodeid);
3290         return error;
3291 }
3292
3293 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3294 {
3295         struct dlm_lkb *lkb;
3296         struct dlm_rsb *r;
3297         int error, namelen;
3298
3299         error = create_lkb(ls, &lkb);
3300         if (error)
3301                 goto fail;
3302
3303         receive_flags(lkb, ms);
3304         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3305         error = receive_request_args(ls, lkb, ms);
3306         if (error) {
3307                 __put_lkb(ls, lkb);
3308                 goto fail;
3309         }
3310
3311         namelen = receive_extralen(ms);
3312
3313         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3314         if (error) {
3315                 __put_lkb(ls, lkb);
3316                 goto fail;
3317         }
3318
3319         lock_rsb(r);
3320
3321         attach_lkb(r, lkb);
3322         error = do_request(r, lkb);
3323         send_request_reply(r, lkb, error);
3324         do_request_effects(r, lkb, error);
3325
3326         unlock_rsb(r);
3327         put_rsb(r);
3328
3329         if (error == -EINPROGRESS)
3330                 error = 0;
3331         if (error)
3332                 dlm_put_lkb(lkb);
3333         return;
3334
3335  fail:
3336         setup_stub_lkb(ls, ms);
3337         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3338 }
3339
3340 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3341 {
3342         struct dlm_lkb *lkb;
3343         struct dlm_rsb *r;
3344         int error, reply = 1;
3345
3346         error = find_lkb(ls, ms->m_remid, &lkb);
3347         if (error)
3348                 goto fail;
3349
3350         r = lkb->lkb_resource;
3351
3352         hold_rsb(r);
3353         lock_rsb(r);
3354
3355         error = validate_message(lkb, ms);
3356         if (error)
3357                 goto out;
3358
3359         receive_flags(lkb, ms);
3360
3361         error = receive_convert_args(ls, lkb, ms);
3362         if (error) {
3363                 send_convert_reply(r, lkb, error);
3364                 goto out;
3365         }
3366
3367         reply = !down_conversion(lkb);
3368
3369         error = do_convert(r, lkb);
3370         if (reply)
3371                 send_convert_reply(r, lkb, error);
3372         do_convert_effects(r, lkb, error);
3373  out:
3374         unlock_rsb(r);
3375         put_rsb(r);
3376         dlm_put_lkb(lkb);
3377         return;
3378
3379  fail:
3380         setup_stub_lkb(ls, ms);
3381         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3382 }
3383
3384 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3385 {
3386         struct dlm_lkb *lkb;
3387         struct dlm_rsb *r;
3388         int error;
3389
3390         error = find_lkb(ls, ms->m_remid, &lkb);
3391         if (error)
3392                 goto fail;
3393
3394         r = lkb->lkb_resource;
3395
3396         hold_rsb(r);
3397         lock_rsb(r);
3398
3399         error = validate_message(lkb, ms);
3400         if (error)
3401                 goto out;
3402
3403         receive_flags(lkb, ms);
3404
3405         error = receive_unlock_args(ls, lkb, ms);
3406         if (error) {
3407                 send_unlock_reply(r, lkb, error);
3408                 goto out;
3409         }
3410
3411         error = do_unlock(r, lkb);
3412         send_unlock_reply(r, lkb, error);
3413         do_unlock_effects(r, lkb, error);
3414  out:
3415         unlock_rsb(r);
3416         put_rsb(r);
3417         dlm_put_lkb(lkb);
3418         return;
3419
3420  fail:
3421         setup_stub_lkb(ls, ms);
3422         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3423 }
3424
3425 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3426 {
3427         struct dlm_lkb *lkb;
3428         struct dlm_rsb *r;
3429         int error;
3430
3431         error = find_lkb(ls, ms->m_remid, &lkb);
3432         if (error)
3433                 goto fail;
3434
3435         receive_flags(lkb, ms);
3436
3437         r = lkb->lkb_resource;
3438
3439         hold_rsb(r);
3440         lock_rsb(r);
3441
3442         error = validate_message(lkb, ms);
3443         if (error)
3444                 goto out;
3445
3446         error = do_cancel(r, lkb);
3447         send_cancel_reply(r, lkb, error);
3448         do_cancel_effects(r, lkb, error);
3449  out:
3450         unlock_rsb(r);
3451         put_rsb(r);
3452         dlm_put_lkb(lkb);
3453         return;
3454
3455  fail:
3456         setup_stub_lkb(ls, ms);
3457         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3458 }
3459
3460 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3461 {
3462         struct dlm_lkb *lkb;
3463         struct dlm_rsb *r;
3464         int error;
3465
3466         error = find_lkb(ls, ms->m_remid, &lkb);
3467         if (error) {
3468                 log_debug(ls, "receive_grant from %d no lkb %x",
3469                           ms->m_header.h_nodeid, ms->m_remid);
3470                 return;
3471         }
3472
3473         r = lkb->lkb_resource;
3474
3475         hold_rsb(r);
3476         lock_rsb(r);
3477
3478         error = validate_message(lkb, ms);
3479         if (error)
3480                 goto out;
3481
3482         receive_flags_reply(lkb, ms);
3483         if (is_altmode(lkb))
3484                 munge_altmode(lkb, ms);
3485         grant_lock_pc(r, lkb, ms);
3486         queue_cast(r, lkb, 0);
3487  out:
3488         unlock_rsb(r);
3489         put_rsb(r);
3490         dlm_put_lkb(lkb);
3491 }
3492
3493 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3494 {
3495         struct dlm_lkb *lkb;
3496         struct dlm_rsb *r;
3497         int error;
3498
3499         error = find_lkb(ls, ms->m_remid, &lkb);
3500         if (error) {
3501                 log_debug(ls, "receive_bast from %d no lkb %x",
3502                           ms->m_header.h_nodeid, ms->m_remid);
3503                 return;
3504         }
3505
3506         r = lkb->lkb_resource;
3507
3508         hold_rsb(r);
3509         lock_rsb(r);
3510
3511         error = validate_message(lkb, ms);
3512         if (error)
3513                 goto out;
3514
3515         queue_bast(r, lkb, ms->m_bastmode);
3516  out:
3517         unlock_rsb(r);
3518         put_rsb(r);
3519         dlm_put_lkb(lkb);
3520 }
3521
3522 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3523 {
3524         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3525
3526         from_nodeid = ms->m_header.h_nodeid;
3527         our_nodeid = dlm_our_nodeid();
3528
3529         len = receive_extralen(ms);
3530
3531         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3532         if (dir_nodeid != our_nodeid) {
3533                 log_error(ls, "lookup dir_nodeid %d from %d",
3534                           dir_nodeid, from_nodeid);
3535                 error = -EINVAL;
3536                 ret_nodeid = -1;
3537                 goto out;
3538         }
3539
3540         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3541
3542         /* Optimization: we're master so treat lookup as a request */
3543         if (!error && ret_nodeid == our_nodeid) {
3544                 receive_request(ls, ms);
3545                 return;
3546         }
3547  out:
3548         send_lookup_reply(ls, ms, ret_nodeid, error);
3549 }
3550
3551 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3552 {
3553         int len, dir_nodeid, from_nodeid;
3554
3555         from_nodeid = ms->m_header.h_nodeid;
3556
3557         len = receive_extralen(ms);
3558
3559         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3560         if (dir_nodeid != dlm_our_nodeid()) {
3561                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3562                           dir_nodeid, from_nodeid);
3563                 return;
3564         }
3565
3566         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3567 }
3568
3569 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3570 {
3571         do_purge(ls, ms->m_nodeid, ms->m_pid);
3572 }
3573
3574 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3575 {
3576         struct dlm_lkb *lkb;
3577         struct dlm_rsb *r;
3578         int error, mstype, result;
3579
3580         error = find_lkb(ls, ms->m_remid, &lkb);
3581         if (error) {
3582                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3583                           ms->m_header.h_nodeid, ms->m_remid);
3584                 return;
3585         }
3586
3587         r = lkb->lkb_resource;
3588         hold_rsb(r);
3589         lock_rsb(r);
3590
3591         error = validate_message(lkb, ms);
3592         if (error)
3593                 goto out;
3594
3595         mstype = lkb->lkb_wait_type;
3596         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3597         if (error)
3598                 goto out;
3599
3600         /* Optimization: the dir node was also the master, so it took our
3601            lookup as a request and sent request reply instead of lookup reply */
3602         if (mstype == DLM_MSG_LOOKUP) {
3603                 r->res_nodeid = ms->m_header.h_nodeid;
3604                 lkb->lkb_nodeid = r->res_nodeid;
3605         }
3606
3607         /* this is the value returned from do_request() on the master */
3608         result = ms->m_result;
3609
3610         switch (result) {
3611         case -EAGAIN:
3612                 /* request would block (be queued) on remote master */
3613                 queue_cast(r, lkb, -EAGAIN);
3614                 confirm_master(r, -EAGAIN);
3615                 unhold_lkb(lkb); /* undoes create_lkb() */
3616                 break;
3617
3618         case -EINPROGRESS:
3619         case 0:
3620                 /* request was queued or granted on remote master */
3621                 receive_flags_reply(lkb, ms);
3622                 lkb->lkb_remid = ms->m_lkid;
3623                 if (is_altmode(lkb))
3624                         munge_altmode(lkb, ms);
3625                 if (result) {
3626                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3627                         add_timeout(lkb);
3628                 } else {
3629                         grant_lock_pc(r, lkb, ms);
3630                         queue_cast(r, lkb, 0);
3631                 }
3632                 confirm_master(r, result);
3633                 break;
3634
3635         case -EBADR:
3636         case -ENOTBLK:
3637                 /* find_rsb failed to find rsb or rsb wasn't master */
3638                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3639                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3640                 r->res_nodeid = -1;
3641                 lkb->lkb_nodeid = -1;
3642
3643                 if (is_overlap(lkb)) {
3644                         /* we'll ignore error in cancel/unlock reply */
3645                         queue_cast_overlap(r, lkb);
3646                         confirm_master(r, result);
3647                         unhold_lkb(lkb); /* undoes create_lkb() */
3648                 } else
3649                         _request_lock(r, lkb);
3650                 break;
3651
3652         default:
3653                 log_error(ls, "receive_request_reply %x error %d",
3654                           lkb->lkb_id, result);
3655         }
3656
3657         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3658                 log_debug(ls, "receive_request_reply %x result %d unlock",
3659                           lkb->lkb_id, result);
3660                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3661                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3662                 send_unlock(r, lkb);
3663         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3664                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3665                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3666                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3667                 send_cancel(r, lkb);
3668         } else {
3669                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3670                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3671         }
3672  out:
3673         unlock_rsb(r);
3674         put_rsb(r);
3675         dlm_put_lkb(lkb);
3676 }
3677
3678 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3679                                     struct dlm_message *ms)
3680 {
3681         /* this is the value returned from do_convert() on the master */
3682         switch (ms->m_result) {
3683         case -EAGAIN:
3684                 /* convert would block (be queued) on remote master */
3685                 queue_cast(r, lkb, -EAGAIN);
3686                 break;
3687
3688         case -EDEADLK:
3689                 receive_flags_reply(lkb, ms);
3690                 revert_lock_pc(r, lkb);
3691                 queue_cast(r, lkb, -EDEADLK);
3692                 break;
3693
3694         case -EINPROGRESS:
3695                 /* convert was queued on remote master */
3696                 receive_flags_reply(lkb, ms);
3697                 if (is_demoted(lkb))
3698                         munge_demoted(lkb);
3699                 del_lkb(r, lkb);
3700                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3701                 add_timeout(lkb);
3702                 break;
3703
3704         case 0:
3705                 /* convert was granted on remote master */
3706                 receive_flags_reply(lkb, ms);
3707                 if (is_demoted(lkb))
3708                         munge_demoted(lkb);
3709                 grant_lock_pc(r, lkb, ms);
3710                 queue_cast(r, lkb, 0);
3711                 break;
3712
3713         default:
3714                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3715                           lkb->lkb_id, ms->m_result);
3716         }
3717 }
3718
3719 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3720 {
3721         struct dlm_rsb *r = lkb->lkb_resource;
3722         int error;
3723
3724         hold_rsb(r);
3725         lock_rsb(r);
3726
3727         error = validate_message(lkb, ms);
3728         if (error)
3729                 goto out;
3730
3731         /* stub reply can happen with waiters_mutex held */
3732         error = remove_from_waiters_ms(lkb, ms);
3733         if (error)
3734                 goto out;
3735
3736         __receive_convert_reply(r, lkb, ms);
3737  out:
3738         unlock_rsb(r);
3739         put_rsb(r);
3740 }
3741
3742 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3743 {
3744         struct dlm_lkb *lkb;
3745         int error;
3746
3747         error = find_lkb(ls, ms->m_remid, &lkb);
3748         if (error) {
3749                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3750                           ms->m_header.h_nodeid, ms->m_remid);
3751                 return;
3752         }
3753
3754         _receive_convert_reply(lkb, ms);
3755         dlm_put_lkb(lkb);
3756 }
3757
3758 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3759 {
3760         struct dlm_rsb *r = lkb->lkb_resource;
3761         int error;
3762
3763         hold_rsb(r);
3764         lock_rsb(r);
3765
3766         error = validate_message(lkb, ms);
3767         if (error)
3768                 goto out;
3769
3770         /* stub reply can happen with waiters_mutex held */
3771         error = remove_from_waiters_ms(lkb, ms);
3772         if (error)
3773                 goto out;
3774
3775         /* this is the value returned from do_unlock() on the master */
3776
3777         switch (ms->m_result) {
3778         case -DLM_EUNLOCK:
3779                 receive_flags_reply(lkb, ms);
3780                 remove_lock_pc(r, lkb);
3781                 queue_cast(r, lkb, -DLM_EUNLOCK);
3782                 break;
3783         case -ENOENT:
3784                 break;
3785         default:
3786                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3787                           lkb->lkb_id, ms->m_result);
3788         }
3789  out:
3790         unlock_rsb(r);
3791         put_rsb(r);
3792 }
3793
3794 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3795 {
3796         struct dlm_lkb *lkb;
3797         int error;
3798
3799         error = find_lkb(ls, ms->m_remid, &lkb);
3800         if (error) {
3801                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3802                           ms->m_header.h_nodeid, ms->m_remid);
3803                 return;
3804         }
3805
3806         _receive_unlock_reply(lkb, ms);
3807         dlm_put_lkb(lkb);
3808 }
3809
3810 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3811 {
3812         struct dlm_rsb *r = lkb->lkb_resource;
3813         int error;
3814
3815         hold_rsb(r);
3816         lock_rsb(r);
3817
3818         error = validate_message(lkb, ms);
3819         if (error)
3820                 goto out;
3821
3822         /* stub reply can happen with waiters_mutex held */
3823         error = remove_from_waiters_ms(lkb, ms);
3824         if (error)
3825                 goto out;
3826
3827         /* this is the value returned from do_cancel() on the master */
3828
3829         switch (ms->m_result) {
3830         case -DLM_ECANCEL:
3831                 receive_flags_reply(lkb, ms);
3832                 revert_lock_pc(r, lkb);
3833                 queue_cast(r, lkb, -DLM_ECANCEL);
3834                 break;
3835         case 0:
3836                 break;
3837         default:
3838                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3839                           lkb->lkb_id, ms->m_result);
3840         }
3841  out:
3842         unlock_rsb(r);
3843         put_rsb(r);
3844 }
3845
3846 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3847 {
3848         struct dlm_lkb *lkb;
3849         int error;
3850
3851         error = find_lkb(ls, ms->m_remid, &lkb);
3852         if (error) {
3853                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3854                           ms->m_header.h_nodeid, ms->m_remid);
3855                 return;
3856         }
3857
3858         _receive_cancel_reply(lkb, ms);
3859         dlm_put_lkb(lkb);
3860 }
3861
3862 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3863 {
3864         struct dlm_lkb *lkb;
3865         struct dlm_rsb *r;
3866         int error, ret_nodeid;
3867
3868         error = find_lkb(ls, ms->m_lkid, &lkb);
3869         if (error) {
3870                 log_error(ls, "receive_lookup_reply no lkb");
3871                 return;
3872         }
3873
3874         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3875            FIXME: will a non-zero error ever be returned? */
3876
3877         r = lkb->lkb_resource;
3878         hold_rsb(r);
3879         lock_rsb(r);
3880
3881         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3882         if (error)
3883                 goto out;
3884
3885         ret_nodeid = ms->m_nodeid;
3886         if (ret_nodeid == dlm_our_nodeid()) {
3887                 r->res_nodeid = 0;
3888                 ret_nodeid = 0;
3889                 r->res_first_lkid = 0;
3890         } else {
3891                 /* set_master() will copy res_nodeid to lkb_nodeid */
3892                 r->res_nodeid = ret_nodeid;
3893         }
3894
3895         if (is_overlap(lkb)) {
3896                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3897                           lkb->lkb_id, lkb->lkb_flags);
3898                 queue_cast_overlap(r, lkb);
3899                 unhold_lkb(lkb); /* undoes create_lkb() */
3900                 goto out_list;
3901         }
3902
3903         _request_lock(r, lkb);
3904
3905  out_list:
3906         if (!ret_nodeid)
3907                 process_lookup_list(r);
3908  out:
3909         unlock_rsb(r);
3910         put_rsb(r);
3911         dlm_put_lkb(lkb);
3912 }
3913
3914 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3915 {
3916         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3917                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3918                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3919                           ms->m_remid, ms->m_result);
3920                 return;
3921         }
3922
3923         switch (ms->m_type) {
3924
3925         /* messages sent to a master node */
3926
3927         case DLM_MSG_REQUEST:
3928                 receive_request(ls, ms);
3929                 break;
3930
3931         case DLM_MSG_CONVERT:
3932                 receive_convert(ls, ms);
3933                 break;
3934
3935         case DLM_MSG_UNLOCK:
3936                 receive_unlock(ls, ms);
3937                 break;
3938
3939         case DLM_MSG_CANCEL:
3940                 receive_cancel(ls, ms);
3941                 break;
3942
3943         /* messages sent from a master node (replies to above) */
3944
3945         case DLM_MSG_REQUEST_REPLY:
3946                 receive_request_reply(ls, ms);
3947                 break;
3948
3949         case DLM_MSG_CONVERT_REPLY:
3950                 receive_convert_reply(ls, ms);
3951                 break;
3952
3953         case DLM_MSG_UNLOCK_REPLY:
3954                 receive_unlock_reply(ls, ms);
3955                 break;
3956
3957         case DLM_MSG_CANCEL_REPLY:
3958                 receive_cancel_reply(ls, ms);
3959                 break;
3960
3961         /* messages sent from a master node (only two types of async msg) */
3962
3963         case DLM_MSG_GRANT:
3964                 receive_grant(ls, ms);
3965                 break;
3966
3967         case DLM_MSG_BAST:
3968                 receive_bast(ls, ms);
3969                 break;
3970
3971         /* messages sent to a dir node */
3972
3973         case DLM_MSG_LOOKUP:
3974                 receive_lookup(ls, ms);
3975                 break;
3976
3977         case DLM_MSG_REMOVE:
3978                 receive_remove(ls, ms);
3979                 break;
3980
3981         /* messages sent from a dir node (remove has no reply) */
3982
3983         case DLM_MSG_LOOKUP_REPLY:
3984                 receive_lookup_reply(ls, ms);
3985                 break;
3986
3987         /* other messages */
3988
3989         case DLM_MSG_PURGE:
3990                 receive_purge(ls, ms);
3991                 break;
3992
3993         default:
3994                 log_error(ls, "unknown message type %d", ms->m_type);
3995         }
3996
3997         dlm_astd_wake();
3998 }
3999
4000 /* If the lockspace is in recovery mode (locking stopped), then normal
4001    messages are saved on the requestqueue for processing after recovery is
4002    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4003    messages off the requestqueue before we process new ones. This occurs right
4004    after recovery completes when we transition from saving all messages on
4005    requestqueue, to processing all the saved messages, to processing new
4006    messages as they arrive. */
4007
4008 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4009                                 int nodeid)
4010 {
4011         if (dlm_locking_stopped(ls)) {
4012                 dlm_add_requestqueue(ls, nodeid, ms);
4013         } else {
4014                 dlm_wait_requestqueue(ls);
4015                 _receive_message(ls, ms);
4016         }
4017 }
4018
4019 /* This is called by dlm_recoverd to process messages that were saved on
4020    the requestqueue. */
4021
4022 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4023 {
4024         _receive_message(ls, ms);
4025 }
4026
4027 /* This is called by the midcomms layer when something is received for
4028    the lockspace.  It could be either a MSG (normal message sent as part of
4029    standard locking activity) or an RCOM (recovery message sent as part of
4030    lockspace recovery). */
4031
4032 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4033 {
4034         struct dlm_header *hd = &p->header;
4035         struct dlm_ls *ls;
4036         int type = 0;
4037
4038         switch (hd->h_cmd) {
4039         case DLM_MSG:
4040                 dlm_message_in(&p->message);
4041                 type = p->message.m_type;
4042                 break;
4043         case DLM_RCOM:
4044                 dlm_rcom_in(&p->rcom);
4045                 type = p->rcom.rc_type;
4046                 break;
4047         default:
4048                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4049                 return;
4050         }
4051
4052         if (hd->h_nodeid != nodeid) {
4053                 log_print("invalid h_nodeid %d from %d lockspace %x",
4054                           hd->h_nodeid, nodeid, hd->h_lockspace);
4055                 return;
4056         }
4057
4058         ls = dlm_find_lockspace_global(hd->h_lockspace);
4059         if (!ls) {
4060                 if (dlm_config.ci_log_debug)
4061                         log_print("invalid lockspace %x from %d cmd %d type %d",
4062                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
4063
4064                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4065                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4066                 return;
4067         }
4068
4069         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4070            be inactive (in this ls) before transitioning to recovery mode */
4071
4072         down_read(&ls->ls_recv_active);
4073         if (hd->h_cmd == DLM_MSG)
4074                 dlm_receive_message(ls, &p->message, nodeid);
4075         else
4076                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4077         up_read(&ls->ls_recv_active);
4078
4079         dlm_put_lockspace(ls);
4080 }
4081
4082 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083                                    struct dlm_message *ms_stub)
4084 {
4085         if (middle_conversion(lkb)) {
4086                 hold_lkb(lkb);
4087                 memset(ms_stub, 0, sizeof(struct dlm_message));
4088                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4089                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4090                 ms_stub->m_result = -EINPROGRESS;
4091                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092                 _receive_convert_reply(lkb, ms_stub);
4093
4094                 /* Same special case as in receive_rcom_lock_args() */
4095                 lkb->lkb_grmode = DLM_LOCK_IV;
4096                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4097                 unhold_lkb(lkb);
4098
4099         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4100                 lkb->lkb_flags |= DLM_IFL_RESEND;
4101         }
4102
4103         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4104            conversions are async; there's no reply from the remote master */
4105 }
4106
4107 /* A waiting lkb needs recovery if the master node has failed, or
4108    the master node is changing (only when no directory is used) */
4109
4110 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4111 {
4112         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4113                 return 1;
4114
4115         if (!dlm_no_directory(ls))
4116                 return 0;
4117
4118         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4119                 return 1;
4120
4121         return 0;
4122 }
4123
4124 /* Recovery for locks that are waiting for replies from nodes that are now
4125    gone.  We can just complete unlocks and cancels by faking a reply from the
4126    dead node.  Requests and up-conversions we flag to be resent after
4127    recovery.  Down-conversions can just be completed with a fake reply like
4128    unlocks.  Conversions between PR and CW need special attention. */
4129
4130 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4131 {
4132         struct dlm_lkb *lkb, *safe;
4133         struct dlm_message *ms_stub;
4134         int wait_type, stub_unlock_result, stub_cancel_result;
4135
4136         ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4137         if (!ms_stub) {
4138                 log_error(ls, "dlm_recover_waiters_pre no mem");
4139                 return;
4140         }
4141
4142         mutex_lock(&ls->ls_waiters_mutex);
4143
4144         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4145
4146                 /* exclude debug messages about unlocks because there can be so
4147                    many and they aren't very interesting */
4148
4149                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150                         log_debug(ls, "recover_waiter %x nodeid %d "
4151                                   "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152                                   lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4153                 }
4154
4155                 /* all outstanding lookups, regardless of destination  will be
4156                    resent after recovery is done */
4157
4158                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4159                         lkb->lkb_flags |= DLM_IFL_RESEND;
4160                         continue;
4161                 }
4162
4163                 if (!waiter_needs_recovery(ls, lkb))
4164                         continue;
4165
4166                 wait_type = lkb->lkb_wait_type;
4167                 stub_unlock_result = -DLM_EUNLOCK;
4168                 stub_cancel_result = -DLM_ECANCEL;
4169
4170                 /* Main reply may have been received leaving a zero wait_type,
4171                    but a reply for the overlapping op may not have been
4172                    received.  In that case we need to fake the appropriate
4173                    reply for the overlap op. */
4174
4175                 if (!wait_type) {
4176                         if (is_overlap_cancel(lkb)) {
4177                                 wait_type = DLM_MSG_CANCEL;
4178                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4179                                         stub_cancel_result = 0;
4180                         }
4181                         if (is_overlap_unlock(lkb)) {
4182                                 wait_type = DLM_MSG_UNLOCK;
4183                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4184                                         stub_unlock_result = -ENOENT;
4185                         }
4186
4187                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4188                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4189                                   stub_cancel_result, stub_unlock_result);
4190                 }
4191
4192                 switch (wait_type) {
4193
4194                 case DLM_MSG_REQUEST:
4195                         lkb->lkb_flags |= DLM_IFL_RESEND;
4196                         break;
4197
4198                 case DLM_MSG_CONVERT:
4199                         recover_convert_waiter(ls, lkb, ms_stub);
4200                         break;
4201
4202                 case DLM_MSG_UNLOCK:
4203                         hold_lkb(lkb);
4204                         memset(ms_stub, 0, sizeof(struct dlm_message));
4205                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4206                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4207                         ms_stub->m_result = stub_unlock_result;
4208                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209                         _receive_unlock_reply(lkb, ms_stub);
4210                         dlm_put_lkb(lkb);
4211                         break;
4212
4213                 case DLM_MSG_CANCEL:
4214                         hold_lkb(lkb);
4215                         memset(ms_stub, 0, sizeof(struct dlm_message));
4216                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4217                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4218                         ms_stub->m_result = stub_cancel_result;
4219                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220                         _receive_cancel_reply(lkb, ms_stub);
4221                         dlm_put_lkb(lkb);
4222                         break;
4223
4224                 default:
4225                         log_error(ls, "invalid lkb wait_type %d %d",
4226                                   lkb->lkb_wait_type, wait_type);
4227                 }
4228                 schedule();
4229         }
4230         mutex_unlock(&ls->ls_waiters_mutex);
4231         kfree(ms_stub);
4232 }
4233
4234 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4235 {
4236         struct dlm_lkb *lkb;
4237         int found = 0;
4238
4239         mutex_lock(&ls->ls_waiters_mutex);
4240         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4241                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4242                         hold_lkb(lkb);
4243                         found = 1;
4244                         break;
4245                 }
4246         }
4247         mutex_unlock(&ls->ls_waiters_mutex);
4248
4249         if (!found)
4250                 lkb = NULL;
4251         return lkb;
4252 }
4253
4254 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4255    master or dir-node for r.  Processing the lkb may result in it being placed
4256    back on waiters. */
4257
4258 /* We do this after normal locking has been enabled and any saved messages
4259    (in requestqueue) have been processed.  We should be confident that at
4260    this point we won't get or process a reply to any of these waiting
4261    operations.  But, new ops may be coming in on the rsbs/locks here from
4262    userspace or remotely. */
4263
4264 /* there may have been an overlap unlock/cancel prior to recovery or after
4265    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4266    overlap flag would just have been set and nothing new sent.  we can be
4267    confident here than any replies to either the initial op or overlap ops
4268    prior to recovery have been received. */
4269
4270 int dlm_recover_waiters_post(struct dlm_ls *ls)
4271 {
4272         struct dlm_lkb *lkb;
4273         struct dlm_rsb *r;
4274         int error = 0, mstype, err, oc, ou;
4275
4276         while (1) {
4277                 if (dlm_locking_stopped(ls)) {
4278                         log_debug(ls, "recover_waiters_post aborted");
4279                         error = -EINTR;
4280                         break;
4281                 }
4282
4283                 lkb = find_resend_waiter(ls);
4284                 if (!lkb)
4285                         break;
4286
4287                 r = lkb->lkb_resource;
4288                 hold_rsb(r);
4289                 lock_rsb(r);
4290
4291                 mstype = lkb->lkb_wait_type;
4292                 oc = is_overlap_cancel(lkb);
4293                 ou = is_overlap_unlock(lkb);
4294                 err = 0;
4295
4296                 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4297                           lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4298
4299                 /* At this point we assume that we won't get a reply to any
4300                    previous op or overlap op on this lock.  First, do a big
4301                    remove_from_waiters() for all previous ops. */
4302
4303                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4304                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4305                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4306                 lkb->lkb_wait_type = 0;
4307                 lkb->lkb_wait_count = 0;
4308                 mutex_lock(&ls->ls_waiters_mutex);
4309                 list_del_init(&lkb->lkb_wait_reply);
4310                 mutex_unlock(&ls->ls_waiters_mutex);
4311                 unhold_lkb(lkb); /* for waiters list */
4312
4313                 if (oc || ou) {
4314                         /* do an unlock or cancel instead of resending */
4315                         switch (mstype) {
4316                         case DLM_MSG_LOOKUP:
4317                         case DLM_MSG_REQUEST:
4318                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4319                                                         -DLM_ECANCEL);
4320                                 unhold_lkb(lkb); /* undoes create_lkb() */
4321                                 break;
4322                         case DLM_MSG_CONVERT:
4323                                 if (oc) {
4324                                         queue_cast(r, lkb, -DLM_ECANCEL);
4325                                 } else {
4326                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4327                                         _unlock_lock(r, lkb);
4328                                 }
4329                                 break;
4330                         default:
4331                                 err = 1;
4332                         }
4333                 } else {
4334                         switch (mstype) {
4335                         case DLM_MSG_LOOKUP:
4336                         case DLM_MSG_REQUEST:
4337                                 _request_lock(r, lkb);
4338                                 if (is_master(r))
4339                                         confirm_master(r, 0);
4340                                 break;
4341                         case DLM_MSG_CONVERT:
4342                                 _convert_lock(r, lkb);
4343                                 break;
4344                         default:
4345                                 err = 1;
4346                         }
4347                 }
4348
4349                 if (err)
4350                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4351                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4352                 unlock_rsb(r);
4353                 put_rsb(r);
4354                 dlm_put_lkb(lkb);
4355         }
4356
4357         return error;
4358 }
4359
4360 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4361                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4362 {
4363         struct dlm_ls *ls = r->res_ls;
4364         struct dlm_lkb *lkb, *safe;
4365
4366         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4367                 if (test(ls, lkb)) {
4368                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4369                         del_lkb(r, lkb);
4370                         /* this put should free the lkb */
4371                         if (!dlm_put_lkb(lkb))
4372                                 log_error(ls, "purged lkb not released");
4373                 }
4374         }
4375 }
4376
4377 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4378 {
4379         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4380 }
4381
4382 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4383 {
4384         return is_master_copy(lkb);
4385 }
4386
4387 static void purge_dead_locks(struct dlm_rsb *r)
4388 {
4389         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4390         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4391         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4392 }
4393
4394 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4395 {
4396         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4397         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4398         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4399 }
4400
4401 /* Get rid of locks held by nodes that are gone. */
4402
4403 int dlm_purge_locks(struct dlm_ls *ls)
4404 {
4405         struct dlm_rsb *r;
4406
4407         log_debug(ls, "dlm_purge_locks");
4408
4409         down_write(&ls->ls_root_sem);
4410         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4411                 hold_rsb(r);
4412                 lock_rsb(r);
4413                 if (is_master(r))
4414                         purge_dead_locks(r);
4415                 unlock_rsb(r);
4416                 unhold_rsb(r);
4417
4418                 schedule();
4419         }
4420         up_write(&ls->ls_root_sem);
4421
4422         return 0;
4423 }
4424
4425 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4426 {
4427         struct dlm_rsb *r, *r_ret = NULL;
4428
4429         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4430         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4431                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4432                         continue;
4433                 hold_rsb(r);
4434                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4435                 r_ret = r;
4436                 break;
4437         }
4438         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4439         return r_ret;
4440 }
4441
4442 void dlm_grant_after_purge(struct dlm_ls *ls)
4443 {
4444         struct dlm_rsb *r;
4445         int bucket = 0;
4446
4447         while (1) {
4448                 r = find_purged_rsb(ls, bucket);
4449                 if (!r) {
4450                         if (bucket == ls->ls_rsbtbl_size - 1)
4451                                 break;
4452                         bucket++;
4453                         continue;
4454                 }
4455                 lock_rsb(r);
4456                 if (is_master(r)) {
4457                         grant_pending_locks(r);
4458                         confirm_master(r, 0);
4459                 }
4460                 unlock_rsb(r);
4461                 put_rsb(r);
4462                 schedule();
4463         }
4464 }
4465
4466 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4467                                          uint32_t remid)
4468 {
4469         struct dlm_lkb *lkb;
4470
4471         list_for_each_entry(lkb, head, lkb_statequeue) {
4472                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4473                         return lkb;
4474         }
4475         return NULL;
4476 }
4477
4478 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4479                                     uint32_t remid)
4480 {
4481         struct dlm_lkb *lkb;
4482
4483         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4484         if (lkb)
4485                 return lkb;
4486         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4487         if (lkb)
4488                 return lkb;
4489         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4490         if (lkb)
4491                 return lkb;
4492         return NULL;
4493 }
4494
4495 /* needs at least dlm_rcom + rcom_lock */
4496 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4497                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4498 {
4499         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4500
4501         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4502         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4503         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4504         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4505         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4506         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4507         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4508         lkb->lkb_rqmode = rl->rl_rqmode;
4509         lkb->lkb_grmode = rl->rl_grmode;
4510         /* don't set lkb_status because add_lkb wants to itself */
4511
4512         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4513         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4514
4515         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4516                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4517                          sizeof(struct rcom_lock);
4518                 if (lvblen > ls->ls_lvblen)
4519                         return -EINVAL;
4520                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4521                 if (!lkb->lkb_lvbptr)
4522                         return -ENOMEM;
4523                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4524         }
4525
4526         /* Conversions between PR and CW (middle modes) need special handling.
4527            The real granted mode of these converting locks cannot be determined
4528            until all locks have been rebuilt on the rsb (recover_conversion) */
4529
4530         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4531             middle_conversion(lkb)) {
4532                 rl->rl_status = DLM_LKSTS_CONVERT;
4533                 lkb->lkb_grmode = DLM_LOCK_IV;
4534                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4535         }
4536
4537         return 0;
4538 }
4539
4540 /* This lkb may have been recovered in a previous aborted recovery so we need
4541    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4542    If so we just send back a standard reply.  If not, we create a new lkb with
4543    the given values and send back our lkid.  We send back our lkid by sending
4544    back the rcom_lock struct we got but with the remid field filled in. */
4545
4546 /* needs at least dlm_rcom + rcom_lock */
4547 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4548 {
4549         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4550         struct dlm_rsb *r;
4551         struct dlm_lkb *lkb;
4552         int error;
4553
4554         if (rl->rl_parent_lkid) {
4555                 error = -EOPNOTSUPP;
4556                 goto out;
4557         }
4558
4559         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4560                          R_MASTER, &r);
4561         if (error)
4562                 goto out;
4563
4564         lock_rsb(r);
4565
4566         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4567         if (lkb) {
4568                 error = -EEXIST;
4569                 goto out_remid;
4570         }
4571
4572         error = create_lkb(ls, &lkb);
4573         if (error)
4574                 goto out_unlock;
4575
4576         error = receive_rcom_lock_args(ls, lkb, r, rc);
4577         if (error) {
4578                 __put_lkb(ls, lkb);
4579                 goto out_unlock;
4580         }
4581
4582         attach_lkb(r, lkb);
4583         add_lkb(r, lkb, rl->rl_status);
4584         error = 0;
4585
4586  out_remid:
4587         /* this is the new value returned to the lock holder for
4588            saving in its process-copy lkb */
4589         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4590
4591  out_unlock:
4592         unlock_rsb(r);
4593         put_rsb(r);
4594  out:
4595         if (error)
4596                 log_debug(ls, "recover_master_copy %d %x", error,
4597                           le32_to_cpu(rl->rl_lkid));
4598         rl->rl_result = cpu_to_le32(error);
4599         return error;
4600 }
4601
4602 /* needs at least dlm_rcom + rcom_lock */
4603 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4604 {
4605         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4606         struct dlm_rsb *r;
4607         struct dlm_lkb *lkb;
4608         int error;
4609
4610         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4611         if (error) {
4612                 log_error(ls, "recover_process_copy no lkid %x",
4613                                 le32_to_cpu(rl->rl_lkid));
4614                 return error;
4615         }
4616
4617         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4618
4619         error = le32_to_cpu(rl->rl_result);
4620
4621         r = lkb->lkb_resource;
4622         hold_rsb(r);
4623         lock_rsb(r);
4624
4625         switch (error) {
4626         case -EBADR:
4627                 /* There's a chance the new master received our lock before
4628                    dlm_recover_master_reply(), this wouldn't happen if we did
4629                    a barrier between recover_masters and recover_locks. */
4630                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4631                           (unsigned long)r, r->res_name);
4632                 dlm_send_rcom_lock(r, lkb);
4633                 goto out;
4634         case -EEXIST:
4635                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4636                 /* fall through */
4637         case 0:
4638                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4639                 break;
4640         default:
4641                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4642                           error, lkb->lkb_id);
4643         }
4644
4645         /* an ack for dlm_recover_locks() which waits for replies from
4646            all the locks it sends to new masters */
4647         dlm_recovered_lock(r);
4648  out:
4649         unlock_rsb(r);
4650         put_rsb(r);
4651         dlm_put_lkb(lkb);
4652
4653         return 0;
4654 }
4655
4656 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4657                      int mode, uint32_t flags, void *name, unsigned int namelen,
4658                      unsigned long timeout_cs)
4659 {
4660         struct dlm_lkb *lkb;
4661         struct dlm_args args;
4662         int error;
4663
4664         dlm_lock_recovery(ls);
4665
4666         error = create_lkb(ls, &lkb);
4667         if (error) {
4668                 kfree(ua);
4669                 goto out;
4670         }
4671
4672         if (flags & DLM_LKF_VALBLK) {
4673                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4674                 if (!ua->lksb.sb_lvbptr) {
4675                         kfree(ua);
4676                         __put_lkb(ls, lkb);
4677                         error = -ENOMEM;
4678                         goto out;
4679                 }
4680         }
4681
4682         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4683            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4684            lock and that lkb_astparam is the dlm_user_args structure. */
4685
4686         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4687                               fake_astfn, ua, fake_bastfn, &args);
4688         lkb->lkb_flags |= DLM_IFL_USER;
4689
4690         if (error) {
4691                 __put_lkb(ls, lkb);
4692                 goto out;
4693         }
4694
4695         error = request_lock(ls, lkb, name, namelen, &args);
4696
4697         switch (error) {
4698         case 0:
4699                 break;
4700         case -EINPROGRESS:
4701                 error = 0;
4702                 break;
4703         case -EAGAIN:
4704                 error = 0;
4705                 /* fall through */
4706         default:
4707                 __put_lkb(ls, lkb);
4708                 goto out;
4709         }
4710
4711         /* add this new lkb to the per-process list of locks */
4712         spin_lock(&ua->proc->locks_spin);
4713         hold_lkb(lkb);
4714         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4715         spin_unlock(&ua->proc->locks_spin);
4716  out:
4717         dlm_unlock_recovery(ls);
4718         return error;
4719 }
4720
4721 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4722                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4723                      unsigned long timeout_cs)
4724 {
4725         struct dlm_lkb *lkb;
4726         struct dlm_args args;
4727         struct dlm_user_args *ua;
4728         int error;
4729
4730         dlm_lock_recovery(ls);
4731
4732         error = find_lkb(ls, lkid, &lkb);
4733         if (error)
4734                 goto out;
4735
4736         /* user can change the params on its lock when it converts it, or
4737            add an lvb that didn't exist before */
4738
4739         ua = lkb->lkb_ua;
4740
4741         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4742                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4743                 if (!ua->lksb.sb_lvbptr) {
4744                         error = -ENOMEM;
4745                         goto out_put;
4746                 }
4747         }
4748         if (lvb_in && ua->lksb.sb_lvbptr)
4749                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4750
4751         ua->xid = ua_tmp->xid;
4752         ua->castparam = ua_tmp->castparam;
4753         ua->castaddr = ua_tmp->castaddr;
4754         ua->bastparam = ua_tmp->bastparam;
4755         ua->bastaddr = ua_tmp->bastaddr;
4756         ua->user_lksb = ua_tmp->user_lksb;
4757
4758         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4759                               fake_astfn, ua, fake_bastfn, &args);
4760         if (error)
4761                 goto out_put;
4762
4763         error = convert_lock(ls, lkb, &args);
4764
4765         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4766                 error = 0;
4767  out_put:
4768         dlm_put_lkb(lkb);
4769  out:
4770         dlm_unlock_recovery(ls);
4771         kfree(ua_tmp);
4772         return error;
4773 }
4774
4775 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4776                     uint32_t flags, uint32_t lkid, char *lvb_in)
4777 {
4778         struct dlm_lkb *lkb;
4779         struct dlm_args args;
4780         struct dlm_user_args *ua;
4781         int error;
4782
4783         dlm_lock_recovery(ls);
4784
4785         error = find_lkb(ls, lkid, &lkb);
4786         if (error)
4787                 goto out;
4788
4789         ua = lkb->lkb_ua;
4790
4791         if (lvb_in && ua->lksb.sb_lvbptr)
4792                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4793         if (ua_tmp->castparam)
4794                 ua->castparam = ua_tmp->castparam;
4795         ua->user_lksb = ua_tmp->user_lksb;
4796
4797         error = set_unlock_args(flags, ua, &args);
4798         if (error)
4799                 goto out_put;
4800
4801         error = unlock_lock(ls, lkb, &args);
4802
4803         if (error == -DLM_EUNLOCK)
4804                 error = 0;
4805         /* from validate_unlock_args() */
4806         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4807                 error = 0;
4808         if (error)
4809                 goto out_put;
4810
4811         spin_lock(&ua->proc->locks_spin);
4812         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4813         if (!list_empty(&lkb->lkb_ownqueue))
4814                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4815         spin_unlock(&ua->proc->locks_spin);
4816  out_put:
4817         dlm_put_lkb(lkb);
4818  out:
4819         dlm_unlock_recovery(ls);
4820         kfree(ua_tmp);
4821         return error;
4822 }
4823
4824 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4825                     uint32_t flags, uint32_t lkid)
4826 {
4827         struct dlm_lkb *lkb;
4828         struct dlm_args args;
4829         struct dlm_user_args *ua;
4830         int error;
4831
4832         dlm_lock_recovery(ls);
4833
4834         error = find_lkb(ls, lkid, &lkb);
4835         if (error)
4836                 goto out;
4837
4838         ua = lkb->lkb_ua;
4839         if (ua_tmp->castparam)
4840                 ua->castparam = ua_tmp->castparam;
4841         ua->user_lksb = ua_tmp->user_lksb;
4842
4843         error = set_unlock_args(flags, ua, &args);
4844         if (error)
4845                 goto out_put;
4846
4847         error = cancel_lock(ls, lkb, &args);
4848
4849         if (error == -DLM_ECANCEL)
4850                 error = 0;
4851         /* from validate_unlock_args() */
4852         if (error == -EBUSY)
4853                 error = 0;
4854  out_put:
4855         dlm_put_lkb(lkb);
4856  out:
4857         dlm_unlock_recovery(ls);
4858         kfree(ua_tmp);
4859         return error;
4860 }
4861
4862 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4863 {
4864         struct dlm_lkb *lkb;
4865         struct dlm_args args;
4866         struct dlm_user_args *ua;
4867         struct dlm_rsb *r;
4868         int error;
4869
4870         dlm_lock_recovery(ls);
4871
4872         error = find_lkb(ls, lkid, &lkb);
4873         if (error)
4874                 goto out;
4875
4876         ua = lkb->lkb_ua;
4877
4878         error = set_unlock_args(flags, ua, &args);
4879         if (error)
4880                 goto out_put;
4881
4882         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4883
4884         r = lkb->lkb_resource;
4885         hold_rsb(r);
4886         lock_rsb(r);
4887
4888         error = validate_unlock_args(lkb, &args);
4889         if (error)
4890                 goto out_r;
4891         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4892
4893         error = _cancel_lock(r, lkb);
4894  out_r:
4895         unlock_rsb(r);
4896         put_rsb(r);
4897
4898         if (error == -DLM_ECANCEL)
4899                 error = 0;
4900         /* from validate_unlock_args() */
4901         if (error == -EBUSY)
4902                 error = 0;
4903  out_put:
4904         dlm_put_lkb(lkb);
4905  out:
4906         dlm_unlock_recovery(ls);
4907         return error;
4908 }
4909
4910 /* lkb's that are removed from the waiters list by revert are just left on the
4911    orphans list with the granted orphan locks, to be freed by purge */
4912
4913 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4914 {
4915         struct dlm_args args;
4916         int error;
4917
4918         hold_lkb(lkb);
4919         mutex_lock(&ls->ls_orphans_mutex);
4920         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4921         mutex_unlock(&ls->ls_orphans_mutex);
4922
4923         set_unlock_args(0, lkb->lkb_ua, &args);
4924
4925         error = cancel_lock(ls, lkb, &args);
4926         if (error == -DLM_ECANCEL)
4927                 error = 0;
4928         return error;
4929 }
4930
4931 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4932    Regardless of what rsb queue the lock is on, it's removed and freed. */
4933
4934 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4935 {
4936         struct dlm_args args;
4937         int error;
4938
4939         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4940
4941         error = unlock_lock(ls, lkb, &args);
4942         if (error == -DLM_EUNLOCK)
4943                 error = 0;
4944         return error;
4945 }
4946
4947 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4948    (which does lock_rsb) due to deadlock with receiving a message that does
4949    lock_rsb followed by dlm_user_add_ast() */
4950
4951 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4952                                      struct dlm_user_proc *proc)
4953 {
4954         struct dlm_lkb *lkb = NULL;
4955
4956         mutex_lock(&ls->ls_clear_proc_locks);
4957         if (list_empty(&proc->locks))
4958                 goto out;
4959
4960         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4961         list_del_init(&lkb->lkb_ownqueue);
4962
4963         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4964                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4965         else
4966                 lkb->lkb_flags |= DLM_IFL_DEAD;
4967  out:
4968         mutex_unlock(&ls->ls_clear_proc_locks);
4969         return lkb;
4970 }
4971
4972 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4973    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4974    which we clear here. */
4975
4976 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4977    list, and no more device_writes should add lkb's to proc->locks list; so we
4978    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4979    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4980    them ourself. */
4981
4982 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4983 {
4984         struct dlm_lkb *lkb, *safe;
4985
4986         dlm_lock_recovery(ls);
4987
4988         while (1) {
4989                 lkb = del_proc_lock(ls, proc);
4990                 if (!lkb)
4991                         break;
4992                 del_timeout(lkb);
4993                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4994                         orphan_proc_lock(ls, lkb);
4995                 else
4996                         unlock_proc_lock(ls, lkb);
4997
4998                 /* this removes the reference for the proc->locks list
4999                    added by dlm_user_request, it may result in the lkb
5000                    being freed */
5001
5002                 dlm_put_lkb(lkb);
5003         }
5004
5005         mutex_lock(&ls->ls_clear_proc_locks);
5006
5007         /* in-progress unlocks */
5008         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5009                 list_del_init(&lkb->lkb_ownqueue);
5010                 lkb->lkb_flags |= DLM_IFL_DEAD;
5011                 dlm_put_lkb(lkb);
5012         }
5013
5014         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5015                 memset(&lkb->lkb_callbacks, 0,
5016                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5017                 list_del_init(&lkb->lkb_astqueue);
5018                 dlm_put_lkb(lkb);
5019         }
5020
5021         mutex_unlock(&ls->ls_clear_proc_locks);
5022         dlm_unlock_recovery(ls);
5023 }
5024
5025 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5026 {
5027         struct dlm_lkb *lkb, *safe;
5028
5029         while (1) {
5030                 lkb = NULL;
5031                 spin_lock(&proc->locks_spin);
5032                 if (!list_empty(&proc->locks)) {
5033                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5034                                          lkb_ownqueue);
5035                         list_del_init(&lkb->lkb_ownqueue);
5036                 }
5037                 spin_unlock(&proc->locks_spin);
5038
5039                 if (!lkb)
5040                         break;
5041
5042                 lkb->lkb_flags |= DLM_IFL_DEAD;
5043                 unlock_proc_lock(ls, lkb);
5044                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5045         }
5046
5047         spin_lock(&proc->locks_spin);
5048         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5049                 list_del_init(&lkb->lkb_ownqueue);
5050                 lkb->lkb_flags |= DLM_IFL_DEAD;
5051                 dlm_put_lkb(lkb);
5052         }
5053         spin_unlock(&proc->locks_spin);
5054
5055         spin_lock(&proc->asts_spin);
5056         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5057                 memset(&lkb->lkb_callbacks, 0,
5058                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5059                 list_del_init(&lkb->lkb_astqueue);
5060                 dlm_put_lkb(lkb);
5061         }
5062         spin_unlock(&proc->asts_spin);
5063 }
5064
5065 /* pid of 0 means purge all orphans */
5066
5067 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5068 {
5069         struct dlm_lkb *lkb, *safe;
5070
5071         mutex_lock(&ls->ls_orphans_mutex);
5072         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5073                 if (pid && lkb->lkb_ownpid != pid)
5074                         continue;
5075                 unlock_proc_lock(ls, lkb);
5076                 list_del_init(&lkb->lkb_ownqueue);
5077                 dlm_put_lkb(lkb);
5078         }
5079         mutex_unlock(&ls->ls_orphans_mutex);
5080 }
5081
5082 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5083 {
5084         struct dlm_message *ms;
5085         struct dlm_mhandle *mh;
5086         int error;
5087
5088         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5089                                 DLM_MSG_PURGE, &ms, &mh);
5090         if (error)
5091                 return error;
5092         ms->m_nodeid = nodeid;
5093         ms->m_pid = pid;
5094
5095         return send_message(mh, ms);
5096 }
5097
5098 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5099                    int nodeid, int pid)
5100 {
5101         int error = 0;
5102
5103         if (nodeid != dlm_our_nodeid()) {
5104                 error = send_purge(ls, nodeid, pid);
5105         } else {
5106                 dlm_lock_recovery(ls);
5107                 if (pid == current->pid)
5108                         purge_proc_locks(ls, proc);
5109                 else
5110                         do_purge(ls, nodeid, pid);
5111                 dlm_unlock_recovery(ls);
5112         }
5113         return error;
5114 }
5115