]> nv-tegra.nvidia Code Review - linux-2.6.git/blob - fs/dlm/lock.c
46ffd3eeaaf7350cf1b9b5969722abfe2f9a89b7
[linux-2.6.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb)) {
324                 lkb->lkb_bastmode = rqmode; /* printed by debugfs */
325                 send_bast(r, lkb, rqmode);
326         } else {
327                 dlm_add_ast(lkb, AST_BAST, rqmode);
328         }
329 }
330
331 /*
332  * Basic operations on rsb's and lkb's
333  */
334
335 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
336 {
337         struct dlm_rsb *r;
338
339         r = dlm_allocate_rsb(ls, len);
340         if (!r)
341                 return NULL;
342
343         r->res_ls = ls;
344         r->res_length = len;
345         memcpy(r->res_name, name, len);
346         mutex_init(&r->res_mutex);
347
348         INIT_LIST_HEAD(&r->res_lookup);
349         INIT_LIST_HEAD(&r->res_grantqueue);
350         INIT_LIST_HEAD(&r->res_convertqueue);
351         INIT_LIST_HEAD(&r->res_waitqueue);
352         INIT_LIST_HEAD(&r->res_root_list);
353         INIT_LIST_HEAD(&r->res_recover_list);
354
355         return r;
356 }
357
358 static int search_rsb_list(struct list_head *head, char *name, int len,
359                            unsigned int flags, struct dlm_rsb **r_ret)
360 {
361         struct dlm_rsb *r;
362         int error = 0;
363
364         list_for_each_entry(r, head, res_hashchain) {
365                 if (len == r->res_length && !memcmp(name, r->res_name, len))
366                         goto found;
367         }
368         *r_ret = NULL;
369         return -EBADR;
370
371  found:
372         if (r->res_nodeid && (flags & R_MASTER))
373                 error = -ENOTBLK;
374         *r_ret = r;
375         return error;
376 }
377
378 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
379                        unsigned int flags, struct dlm_rsb **r_ret)
380 {
381         struct dlm_rsb *r;
382         int error;
383
384         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
385         if (!error) {
386                 kref_get(&r->res_ref);
387                 goto out;
388         }
389         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
390         if (error)
391                 goto out;
392
393         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
394
395         if (dlm_no_directory(ls))
396                 goto out;
397
398         if (r->res_nodeid == -1) {
399                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
400                 r->res_first_lkid = 0;
401         } else if (r->res_nodeid > 0) {
402                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
403                 r->res_first_lkid = 0;
404         } else {
405                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
406                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
407         }
408  out:
409         *r_ret = r;
410         return error;
411 }
412
413 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
414                       unsigned int flags, struct dlm_rsb **r_ret)
415 {
416         int error;
417         spin_lock(&ls->ls_rsbtbl[b].lock);
418         error = _search_rsb(ls, name, len, b, flags, r_ret);
419         spin_unlock(&ls->ls_rsbtbl[b].lock);
420         return error;
421 }
422
423 /*
424  * Find rsb in rsbtbl and potentially create/add one
425  *
426  * Delaying the release of rsb's has a similar benefit to applications keeping
427  * NL locks on an rsb, but without the guarantee that the cached master value
428  * will still be valid when the rsb is reused.  Apps aren't always smart enough
429  * to keep NL locks on an rsb that they may lock again shortly; this can lead
430  * to excessive master lookups and removals if we don't delay the release.
431  *
432  * Searching for an rsb means looking through both the normal list and toss
433  * list.  When found on the toss list the rsb is moved to the normal list with
434  * ref count of 1; when found on normal list the ref count is incremented.
435  */
436
437 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
438                     unsigned int flags, struct dlm_rsb **r_ret)
439 {
440         struct dlm_rsb *r = NULL, *tmp;
441         uint32_t hash, bucket;
442         int error = -EINVAL;
443
444         if (namelen > DLM_RESNAME_MAXLEN)
445                 goto out;
446
447         if (dlm_no_directory(ls))
448                 flags |= R_CREATE;
449
450         error = 0;
451         hash = jhash(name, namelen, 0);
452         bucket = hash & (ls->ls_rsbtbl_size - 1);
453
454         error = search_rsb(ls, name, namelen, bucket, flags, &r);
455         if (!error)
456                 goto out;
457
458         if (error == -EBADR && !(flags & R_CREATE))
459                 goto out;
460
461         /* the rsb was found but wasn't a master copy */
462         if (error == -ENOTBLK)
463                 goto out;
464
465         error = -ENOMEM;
466         r = create_rsb(ls, name, namelen);
467         if (!r)
468                 goto out;
469
470         r->res_hash = hash;
471         r->res_bucket = bucket;
472         r->res_nodeid = -1;
473         kref_init(&r->res_ref);
474
475         /* With no directory, the master can be set immediately */
476         if (dlm_no_directory(ls)) {
477                 int nodeid = dlm_dir_nodeid(r);
478                 if (nodeid == dlm_our_nodeid())
479                         nodeid = 0;
480                 r->res_nodeid = nodeid;
481         }
482
483         spin_lock(&ls->ls_rsbtbl[bucket].lock);
484         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
485         if (!error) {
486                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
487                 dlm_free_rsb(r);
488                 r = tmp;
489                 goto out;
490         }
491         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
492         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
493         error = 0;
494  out:
495         *r_ret = r;
496         return error;
497 }
498
499 /* This is only called to add a reference when the code already holds
500    a valid reference to the rsb, so there's no need for locking. */
501
502 static inline void hold_rsb(struct dlm_rsb *r)
503 {
504         kref_get(&r->res_ref);
505 }
506
507 void dlm_hold_rsb(struct dlm_rsb *r)
508 {
509         hold_rsb(r);
510 }
511
512 static void toss_rsb(struct kref *kref)
513 {
514         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515         struct dlm_ls *ls = r->res_ls;
516
517         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518         kref_init(&r->res_ref);
519         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520         r->res_toss_time = jiffies;
521         if (r->res_lvbptr) {
522                 dlm_free_lvb(r->res_lvbptr);
523                 r->res_lvbptr = NULL;
524         }
525 }
526
527 /* When all references to the rsb are gone it's transfered to
528    the tossed list for later disposal. */
529
530 static void put_rsb(struct dlm_rsb *r)
531 {
532         struct dlm_ls *ls = r->res_ls;
533         uint32_t bucket = r->res_bucket;
534
535         spin_lock(&ls->ls_rsbtbl[bucket].lock);
536         kref_put(&r->res_ref, toss_rsb);
537         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
538 }
539
540 void dlm_put_rsb(struct dlm_rsb *r)
541 {
542         put_rsb(r);
543 }
544
545 /* See comment for unhold_lkb */
546
547 static void unhold_rsb(struct dlm_rsb *r)
548 {
549         int rv;
550         rv = kref_put(&r->res_ref, toss_rsb);
551         DLM_ASSERT(!rv, dlm_dump_rsb(r););
552 }
553
554 static void kill_rsb(struct kref *kref)
555 {
556         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
557
558         /* All work is done after the return from kref_put() so we
559            can release the write_lock before the remove and free. */
560
561         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
567 }
568
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570    The rsb must exist as long as any lkb's for it do. */
571
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
573 {
574         hold_rsb(r);
575         lkb->lkb_resource = r;
576 }
577
578 static void detach_lkb(struct dlm_lkb *lkb)
579 {
580         if (lkb->lkb_resource) {
581                 put_rsb(lkb->lkb_resource);
582                 lkb->lkb_resource = NULL;
583         }
584 }
585
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
587 {
588         struct dlm_lkb *lkb, *tmp;
589         uint32_t lkid = 0;
590         uint16_t bucket;
591
592         lkb = dlm_allocate_lkb(ls);
593         if (!lkb)
594                 return -ENOMEM;
595
596         lkb->lkb_nodeid = -1;
597         lkb->lkb_grmode = DLM_LOCK_IV;
598         kref_init(&lkb->lkb_ref);
599         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601         INIT_LIST_HEAD(&lkb->lkb_time_list);
602
603         get_random_bytes(&bucket, sizeof(bucket));
604         bucket &= (ls->ls_lkbtbl_size - 1);
605
606         write_lock(&ls->ls_lkbtbl[bucket].lock);
607
608         /* counter can roll over so we must verify lkid is not in use */
609
610         while (lkid == 0) {
611                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
612
613                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
614                                     lkb_idtbl_list) {
615                         if (tmp->lkb_id != lkid)
616                                 continue;
617                         lkid = 0;
618                         break;
619                 }
620         }
621
622         lkb->lkb_id = lkid;
623         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624         write_unlock(&ls->ls_lkbtbl[bucket].lock);
625
626         *lkb_ret = lkb;
627         return 0;
628 }
629
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
631 {
632         struct dlm_lkb *lkb;
633         uint16_t bucket = (lkid >> 16);
634
635         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636                 if (lkb->lkb_id == lkid)
637                         return lkb;
638         }
639         return NULL;
640 }
641
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
643 {
644         struct dlm_lkb *lkb;
645         uint16_t bucket = (lkid >> 16);
646
647         if (bucket >= ls->ls_lkbtbl_size)
648                 return -EBADSLT;
649
650         read_lock(&ls->ls_lkbtbl[bucket].lock);
651         lkb = __find_lkb(ls, lkid);
652         if (lkb)
653                 kref_get(&lkb->lkb_ref);
654         read_unlock(&ls->ls_lkbtbl[bucket].lock);
655
656         *lkb_ret = lkb;
657         return lkb ? 0 : -ENOENT;
658 }
659
660 static void kill_lkb(struct kref *kref)
661 {
662         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
663
664         /* All work is done after the return from kref_put() so we
665            can release the write_lock before the detach_lkb */
666
667         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
668 }
669
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671    it so we need to provide the lockspace explicitly */
672
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
674 {
675         uint16_t bucket = (lkb->lkb_id >> 16);
676
677         write_lock(&ls->ls_lkbtbl[bucket].lock);
678         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679                 list_del(&lkb->lkb_idtbl_list);
680                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
681
682                 detach_lkb(lkb);
683
684                 /* for local/process lkbs, lvbptr points to caller's lksb */
685                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
686                         dlm_free_lvb(lkb->lkb_lvbptr);
687                 dlm_free_lkb(lkb);
688                 return 1;
689         } else {
690                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
691                 return 0;
692         }
693 }
694
695 int dlm_put_lkb(struct dlm_lkb *lkb)
696 {
697         struct dlm_ls *ls;
698
699         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
701
702         ls = lkb->lkb_resource->res_ls;
703         return __put_lkb(ls, lkb);
704 }
705
706 /* This is only called to add a reference when the code already holds
707    a valid reference to the lkb, so there's no need for locking. */
708
709 static inline void hold_lkb(struct dlm_lkb *lkb)
710 {
711         kref_get(&lkb->lkb_ref);
712 }
713
714 /* This is called when we need to remove a reference and are certain
715    it's not the last ref.  e.g. del_lkb is always called between a
716    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717    put_lkb would work fine, but would involve unnecessary locking */
718
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
720 {
721         int rv;
722         rv = kref_put(&lkb->lkb_ref, kill_lkb);
723         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
724 }
725
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
727                             int mode)
728 {
729         struct dlm_lkb *lkb = NULL;
730
731         list_for_each_entry(lkb, head, lkb_statequeue)
732                 if (lkb->lkb_rqmode < mode)
733                         break;
734
735         if (!lkb)
736                 list_add_tail(new, head);
737         else
738                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
739 }
740
741 /* add/remove lkb to rsb's grant/convert/wait queue */
742
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
744 {
745         kref_get(&lkb->lkb_ref);
746
747         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
748
749         lkb->lkb_timestamp = ktime_get();
750
751         lkb->lkb_status = status;
752
753         switch (status) {
754         case DLM_LKSTS_WAITING:
755                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
756                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 else
758                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
759                 break;
760         case DLM_LKSTS_GRANTED:
761                 /* convention says granted locks kept in order of grmode */
762                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
763                                 lkb->lkb_grmode);
764                 break;
765         case DLM_LKSTS_CONVERT:
766                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
767                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
768                 else
769                         list_add_tail(&lkb->lkb_statequeue,
770                                       &r->res_convertqueue);
771                 break;
772         default:
773                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
774         }
775 }
776
777 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
778 {
779         lkb->lkb_status = 0;
780         list_del(&lkb->lkb_statequeue);
781         unhold_lkb(lkb);
782 }
783
784 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
785 {
786         hold_lkb(lkb);
787         del_lkb(r, lkb);
788         add_lkb(r, lkb, sts);
789         unhold_lkb(lkb);
790 }
791
792 static int msg_reply_type(int mstype)
793 {
794         switch (mstype) {
795         case DLM_MSG_REQUEST:
796                 return DLM_MSG_REQUEST_REPLY;
797         case DLM_MSG_CONVERT:
798                 return DLM_MSG_CONVERT_REPLY;
799         case DLM_MSG_UNLOCK:
800                 return DLM_MSG_UNLOCK_REPLY;
801         case DLM_MSG_CANCEL:
802                 return DLM_MSG_CANCEL_REPLY;
803         case DLM_MSG_LOOKUP:
804                 return DLM_MSG_LOOKUP_REPLY;
805         }
806         return -1;
807 }
808
809 /* add/remove lkb from global waiters list of lkb's waiting for
810    a reply from a remote node */
811
812 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
813 {
814         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
815         int error = 0;
816
817         mutex_lock(&ls->ls_waiters_mutex);
818
819         if (is_overlap_unlock(lkb) ||
820             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
821                 error = -EINVAL;
822                 goto out;
823         }
824
825         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
826                 switch (mstype) {
827                 case DLM_MSG_UNLOCK:
828                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
829                         break;
830                 case DLM_MSG_CANCEL:
831                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
832                         break;
833                 default:
834                         error = -EBUSY;
835                         goto out;
836                 }
837                 lkb->lkb_wait_count++;
838                 hold_lkb(lkb);
839
840                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
841                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
842                           lkb->lkb_wait_count, lkb->lkb_flags);
843                 goto out;
844         }
845
846         DLM_ASSERT(!lkb->lkb_wait_count,
847                    dlm_print_lkb(lkb);
848                    printk("wait_count %d\n", lkb->lkb_wait_count););
849
850         lkb->lkb_wait_count++;
851         lkb->lkb_wait_type = mstype;
852         hold_lkb(lkb);
853         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
854  out:
855         if (error)
856                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
857                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
858                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
859         mutex_unlock(&ls->ls_waiters_mutex);
860         return error;
861 }
862
863 /* We clear the RESEND flag because we might be taking an lkb off the waiters
864    list as part of process_requestqueue (e.g. a lookup that has an optimized
865    request reply on the requestqueue) between dlm_recover_waiters_pre() which
866    set RESEND and dlm_recover_waiters_post() */
867
868 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
869                                 struct dlm_message *ms)
870 {
871         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
872         int overlap_done = 0;
873
874         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
875                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
876                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
877                 overlap_done = 1;
878                 goto out_del;
879         }
880
881         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
882                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
883                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
884                 overlap_done = 1;
885                 goto out_del;
886         }
887
888         /* Cancel state was preemptively cleared by a successful convert,
889            see next comment, nothing to do. */
890
891         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
892             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
893                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
894                           lkb->lkb_id, lkb->lkb_wait_type);
895                 return -1;
896         }
897
898         /* Remove for the convert reply, and premptively remove for the
899            cancel reply.  A convert has been granted while there's still
900            an outstanding cancel on it (the cancel is moot and the result
901            in the cancel reply should be 0).  We preempt the cancel reply
902            because the app gets the convert result and then can follow up
903            with another op, like convert.  This subsequent op would see the
904            lingering state of the cancel and fail with -EBUSY. */
905
906         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
907             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
908             is_overlap_cancel(lkb) && ms && !ms->m_result) {
909                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
910                           lkb->lkb_id);
911                 lkb->lkb_wait_type = 0;
912                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
913                 lkb->lkb_wait_count--;
914                 goto out_del;
915         }
916
917         /* N.B. type of reply may not always correspond to type of original
918            msg due to lookup->request optimization, verify others? */
919
920         if (lkb->lkb_wait_type) {
921                 lkb->lkb_wait_type = 0;
922                 goto out_del;
923         }
924
925         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
926                   lkb->lkb_id, mstype, lkb->lkb_flags);
927         return -1;
928
929  out_del:
930         /* the force-unlock/cancel has completed and we haven't recvd a reply
931            to the op that was in progress prior to the unlock/cancel; we
932            give up on any reply to the earlier op.  FIXME: not sure when/how
933            this would happen */
934
935         if (overlap_done && lkb->lkb_wait_type) {
936                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
937                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
938                 lkb->lkb_wait_count--;
939                 lkb->lkb_wait_type = 0;
940         }
941
942         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
943
944         lkb->lkb_flags &= ~DLM_IFL_RESEND;
945         lkb->lkb_wait_count--;
946         if (!lkb->lkb_wait_count)
947                 list_del_init(&lkb->lkb_wait_reply);
948         unhold_lkb(lkb);
949         return 0;
950 }
951
952 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
953 {
954         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955         int error;
956
957         mutex_lock(&ls->ls_waiters_mutex);
958         error = _remove_from_waiters(lkb, mstype, NULL);
959         mutex_unlock(&ls->ls_waiters_mutex);
960         return error;
961 }
962
963 /* Handles situations where we might be processing a "fake" or "stub" reply in
964    which we can't try to take waiters_mutex again. */
965
966 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
967 {
968         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
969         int error;
970
971         if (ms != &ls->ls_stub_ms)
972                 mutex_lock(&ls->ls_waiters_mutex);
973         error = _remove_from_waiters(lkb, ms->m_type, ms);
974         if (ms != &ls->ls_stub_ms)
975                 mutex_unlock(&ls->ls_waiters_mutex);
976         return error;
977 }
978
979 static void dir_remove(struct dlm_rsb *r)
980 {
981         int to_nodeid;
982
983         if (dlm_no_directory(r->res_ls))
984                 return;
985
986         to_nodeid = dlm_dir_nodeid(r);
987         if (to_nodeid != dlm_our_nodeid())
988                 send_remove(r);
989         else
990                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
991                                      r->res_name, r->res_length);
992 }
993
994 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
995    found since they are in order of newest to oldest? */
996
997 static int shrink_bucket(struct dlm_ls *ls, int b)
998 {
999         struct dlm_rsb *r;
1000         int count = 0, found;
1001
1002         for (;;) {
1003                 found = 0;
1004                 spin_lock(&ls->ls_rsbtbl[b].lock);
1005                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1006                                             res_hashchain) {
1007                         if (!time_after_eq(jiffies, r->res_toss_time +
1008                                            dlm_config.ci_toss_secs * HZ))
1009                                 continue;
1010                         found = 1;
1011                         break;
1012                 }
1013
1014                 if (!found) {
1015                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1016                         break;
1017                 }
1018
1019                 if (kref_put(&r->res_ref, kill_rsb)) {
1020                         list_del(&r->res_hashchain);
1021                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1022
1023                         if (is_master(r))
1024                                 dir_remove(r);
1025                         dlm_free_rsb(r);
1026                         count++;
1027                 } else {
1028                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1029                         log_error(ls, "tossed rsb in use %s", r->res_name);
1030                 }
1031         }
1032
1033         return count;
1034 }
1035
1036 void dlm_scan_rsbs(struct dlm_ls *ls)
1037 {
1038         int i;
1039
1040         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1041                 shrink_bucket(ls, i);
1042                 if (dlm_locking_stopped(ls))
1043                         break;
1044                 cond_resched();
1045         }
1046 }
1047
1048 static void add_timeout(struct dlm_lkb *lkb)
1049 {
1050         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1051
1052         if (is_master_copy(lkb))
1053                 return;
1054
1055         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1056             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1057                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1058                 goto add_it;
1059         }
1060         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1061                 goto add_it;
1062         return;
1063
1064  add_it:
1065         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1066         mutex_lock(&ls->ls_timeout_mutex);
1067         hold_lkb(lkb);
1068         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1069         mutex_unlock(&ls->ls_timeout_mutex);
1070 }
1071
1072 static void del_timeout(struct dlm_lkb *lkb)
1073 {
1074         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1075
1076         mutex_lock(&ls->ls_timeout_mutex);
1077         if (!list_empty(&lkb->lkb_time_list)) {
1078                 list_del_init(&lkb->lkb_time_list);
1079                 unhold_lkb(lkb);
1080         }
1081         mutex_unlock(&ls->ls_timeout_mutex);
1082 }
1083
1084 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1085    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1086    and then lock rsb because of lock ordering in add_timeout.  We may need
1087    to specify some special timeout-related bits in the lkb that are just to
1088    be accessed under the timeout_mutex. */
1089
1090 void dlm_scan_timeout(struct dlm_ls *ls)
1091 {
1092         struct dlm_rsb *r;
1093         struct dlm_lkb *lkb;
1094         int do_cancel, do_warn;
1095         s64 wait_us;
1096
1097         for (;;) {
1098                 if (dlm_locking_stopped(ls))
1099                         break;
1100
1101                 do_cancel = 0;
1102                 do_warn = 0;
1103                 mutex_lock(&ls->ls_timeout_mutex);
1104                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1105
1106                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1107                                                         lkb->lkb_timestamp));
1108
1109                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1110                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1111                                 do_cancel = 1;
1112
1113                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1114                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1115                                 do_warn = 1;
1116
1117                         if (!do_cancel && !do_warn)
1118                                 continue;
1119                         hold_lkb(lkb);
1120                         break;
1121                 }
1122                 mutex_unlock(&ls->ls_timeout_mutex);
1123
1124                 if (!do_cancel && !do_warn)
1125                         break;
1126
1127                 r = lkb->lkb_resource;
1128                 hold_rsb(r);
1129                 lock_rsb(r);
1130
1131                 if (do_warn) {
1132                         /* clear flag so we only warn once */
1133                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1134                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1135                                 del_timeout(lkb);
1136                         dlm_timeout_warn(lkb);
1137                 }
1138
1139                 if (do_cancel) {
1140                         log_debug(ls, "timeout cancel %x node %d %s",
1141                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1142                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1143                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1144                         del_timeout(lkb);
1145                         _cancel_lock(r, lkb);
1146                 }
1147
1148                 unlock_rsb(r);
1149                 unhold_rsb(r);
1150                 dlm_put_lkb(lkb);
1151         }
1152 }
1153
1154 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1155    dlm_recoverd before checking/setting ls_recover_begin. */
1156
1157 void dlm_adjust_timeouts(struct dlm_ls *ls)
1158 {
1159         struct dlm_lkb *lkb;
1160         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1161
1162         ls->ls_recover_begin = 0;
1163         mutex_lock(&ls->ls_timeout_mutex);
1164         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1165                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1166         mutex_unlock(&ls->ls_timeout_mutex);
1167 }
1168
1169 /* lkb is master or local copy */
1170
1171 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1172 {
1173         int b, len = r->res_ls->ls_lvblen;
1174
1175         /* b=1 lvb returned to caller
1176            b=0 lvb written to rsb or invalidated
1177            b=-1 do nothing */
1178
1179         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1180
1181         if (b == 1) {
1182                 if (!lkb->lkb_lvbptr)
1183                         return;
1184
1185                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1186                         return;
1187
1188                 if (!r->res_lvbptr)
1189                         return;
1190
1191                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1192                 lkb->lkb_lvbseq = r->res_lvbseq;
1193
1194         } else if (b == 0) {
1195                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1196                         rsb_set_flag(r, RSB_VALNOTVALID);
1197                         return;
1198                 }
1199
1200                 if (!lkb->lkb_lvbptr)
1201                         return;
1202
1203                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1204                         return;
1205
1206                 if (!r->res_lvbptr)
1207                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1208
1209                 if (!r->res_lvbptr)
1210                         return;
1211
1212                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1213                 r->res_lvbseq++;
1214                 lkb->lkb_lvbseq = r->res_lvbseq;
1215                 rsb_clear_flag(r, RSB_VALNOTVALID);
1216         }
1217
1218         if (rsb_flag(r, RSB_VALNOTVALID))
1219                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1220 }
1221
1222 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1223 {
1224         if (lkb->lkb_grmode < DLM_LOCK_PW)
1225                 return;
1226
1227         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1228                 rsb_set_flag(r, RSB_VALNOTVALID);
1229                 return;
1230         }
1231
1232         if (!lkb->lkb_lvbptr)
1233                 return;
1234
1235         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1236                 return;
1237
1238         if (!r->res_lvbptr)
1239                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1240
1241         if (!r->res_lvbptr)
1242                 return;
1243
1244         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1245         r->res_lvbseq++;
1246         rsb_clear_flag(r, RSB_VALNOTVALID);
1247 }
1248
1249 /* lkb is process copy (pc) */
1250
1251 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1252                             struct dlm_message *ms)
1253 {
1254         int b;
1255
1256         if (!lkb->lkb_lvbptr)
1257                 return;
1258
1259         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1260                 return;
1261
1262         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1263         if (b == 1) {
1264                 int len = receive_extralen(ms);
1265                 if (len > DLM_RESNAME_MAXLEN)
1266                         len = DLM_RESNAME_MAXLEN;
1267                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1268                 lkb->lkb_lvbseq = ms->m_lvbseq;
1269         }
1270 }
1271
1272 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1273    remove_lock -- used for unlock, removes lkb from granted
1274    revert_lock -- used for cancel, moves lkb from convert to granted
1275    grant_lock  -- used for request and convert, adds lkb to granted or
1276                   moves lkb from convert or waiting to granted
1277
1278    Each of these is used for master or local copy lkb's.  There is
1279    also a _pc() variation used to make the corresponding change on
1280    a process copy (pc) lkb. */
1281
1282 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1283 {
1284         del_lkb(r, lkb);
1285         lkb->lkb_grmode = DLM_LOCK_IV;
1286         /* this unhold undoes the original ref from create_lkb()
1287            so this leads to the lkb being freed */
1288         unhold_lkb(lkb);
1289 }
1290
1291 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1292 {
1293         set_lvb_unlock(r, lkb);
1294         _remove_lock(r, lkb);
1295 }
1296
1297 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299         _remove_lock(r, lkb);
1300 }
1301
1302 /* returns: 0 did nothing
1303             1 moved lock to granted
1304            -1 removed lock */
1305
1306 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1307 {
1308         int rv = 0;
1309
1310         lkb->lkb_rqmode = DLM_LOCK_IV;
1311
1312         switch (lkb->lkb_status) {
1313         case DLM_LKSTS_GRANTED:
1314                 break;
1315         case DLM_LKSTS_CONVERT:
1316                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1317                 rv = 1;
1318                 break;
1319         case DLM_LKSTS_WAITING:
1320                 del_lkb(r, lkb);
1321                 lkb->lkb_grmode = DLM_LOCK_IV;
1322                 /* this unhold undoes the original ref from create_lkb()
1323                    so this leads to the lkb being freed */
1324                 unhold_lkb(lkb);
1325                 rv = -1;
1326                 break;
1327         default:
1328                 log_print("invalid status for revert %d", lkb->lkb_status);
1329         }
1330         return rv;
1331 }
1332
1333 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335         return revert_lock(r, lkb);
1336 }
1337
1338 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1339 {
1340         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1341                 lkb->lkb_grmode = lkb->lkb_rqmode;
1342                 if (lkb->lkb_status)
1343                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344                 else
1345                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1346         }
1347
1348         lkb->lkb_rqmode = DLM_LOCK_IV;
1349 }
1350
1351 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1352 {
1353         set_lvb_lock(r, lkb);
1354         _grant_lock(r, lkb);
1355         lkb->lkb_highbast = 0;
1356 }
1357
1358 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1359                           struct dlm_message *ms)
1360 {
1361         set_lvb_lock_pc(r, lkb, ms);
1362         _grant_lock(r, lkb);
1363 }
1364
1365 /* called by grant_pending_locks() which means an async grant message must
1366    be sent to the requesting node in addition to granting the lock if the
1367    lkb belongs to a remote node. */
1368
1369 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1370 {
1371         grant_lock(r, lkb);
1372         if (is_master_copy(lkb))
1373                 send_grant(r, lkb);
1374         else
1375                 queue_cast(r, lkb, 0);
1376 }
1377
1378 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1379    change the granted/requested modes.  We're munging things accordingly in
1380    the process copy.
1381    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1382    conversion deadlock
1383    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1384    compatible with other granted locks */
1385
1386 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1387 {
1388         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1389                 log_print("munge_demoted %x invalid reply type %d",
1390                           lkb->lkb_id, ms->m_type);
1391                 return;
1392         }
1393
1394         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1395                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1396                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1397                 return;
1398         }
1399
1400         lkb->lkb_grmode = DLM_LOCK_NL;
1401 }
1402
1403 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1404 {
1405         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1406             ms->m_type != DLM_MSG_GRANT) {
1407                 log_print("munge_altmode %x invalid reply type %d",
1408                           lkb->lkb_id, ms->m_type);
1409                 return;
1410         }
1411
1412         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1413                 lkb->lkb_rqmode = DLM_LOCK_PR;
1414         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1415                 lkb->lkb_rqmode = DLM_LOCK_CW;
1416         else {
1417                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1418                 dlm_print_lkb(lkb);
1419         }
1420 }
1421
1422 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1423 {
1424         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1425                                            lkb_statequeue);
1426         if (lkb->lkb_id == first->lkb_id)
1427                 return 1;
1428
1429         return 0;
1430 }
1431
1432 /* Check if the given lkb conflicts with another lkb on the queue. */
1433
1434 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1435 {
1436         struct dlm_lkb *this;
1437
1438         list_for_each_entry(this, head, lkb_statequeue) {
1439                 if (this == lkb)
1440                         continue;
1441                 if (!modes_compat(this, lkb))
1442                         return 1;
1443         }
1444         return 0;
1445 }
1446
1447 /*
1448  * "A conversion deadlock arises with a pair of lock requests in the converting
1449  * queue for one resource.  The granted mode of each lock blocks the requested
1450  * mode of the other lock."
1451  *
1452  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1453  * convert queue from being granted, then deadlk/demote lkb.
1454  *
1455  * Example:
1456  * Granted Queue: empty
1457  * Convert Queue: NL->EX (first lock)
1458  *                PR->EX (second lock)
1459  *
1460  * The first lock can't be granted because of the granted mode of the second
1461  * lock and the second lock can't be granted because it's not first in the
1462  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1463  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1464  * flag set and return DEMOTED in the lksb flags.
1465  *
1466  * Originally, this function detected conv-deadlk in a more limited scope:
1467  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1468  * - if lkb1 was the first entry in the queue (not just earlier), and was
1469  *   blocked by the granted mode of lkb2, and there was nothing on the
1470  *   granted queue preventing lkb1 from being granted immediately, i.e.
1471  *   lkb2 was the only thing preventing lkb1 from being granted.
1472  *
1473  * That second condition meant we'd only say there was conv-deadlk if
1474  * resolving it (by demotion) would lead to the first lock on the convert
1475  * queue being granted right away.  It allowed conversion deadlocks to exist
1476  * between locks on the convert queue while they couldn't be granted anyway.
1477  *
1478  * Now, we detect and take action on conversion deadlocks immediately when
1479  * they're created, even if they may not be immediately consequential.  If
1480  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1481  * mode that would prevent lkb1's conversion from being granted, we do a
1482  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1483  * I think this means that the lkb_is_ahead condition below should always
1484  * be zero, i.e. there will never be conv-deadlk between two locks that are
1485  * both already on the convert queue.
1486  */
1487
1488 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1489 {
1490         struct dlm_lkb *lkb1;
1491         int lkb_is_ahead = 0;
1492
1493         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1494                 if (lkb1 == lkb2) {
1495                         lkb_is_ahead = 1;
1496                         continue;
1497                 }
1498
1499                 if (!lkb_is_ahead) {
1500                         if (!modes_compat(lkb2, lkb1))
1501                                 return 1;
1502                 } else {
1503                         if (!modes_compat(lkb2, lkb1) &&
1504                             !modes_compat(lkb1, lkb2))
1505                                 return 1;
1506                 }
1507         }
1508         return 0;
1509 }
1510
1511 /*
1512  * Return 1 if the lock can be granted, 0 otherwise.
1513  * Also detect and resolve conversion deadlocks.
1514  *
1515  * lkb is the lock to be granted
1516  *
1517  * now is 1 if the function is being called in the context of the
1518  * immediate request, it is 0 if called later, after the lock has been
1519  * queued.
1520  *
1521  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1522  */
1523
1524 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1525 {
1526         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1527
1528         /*
1529          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1530          * a new request for a NL mode lock being blocked.
1531          *
1532          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1533          * request, then it would be granted.  In essence, the use of this flag
1534          * tells the Lock Manager to expedite theis request by not considering
1535          * what may be in the CONVERTING or WAITING queues...  As of this
1536          * writing, the EXPEDITE flag can be used only with new requests for NL
1537          * mode locks.  This flag is not valid for conversion requests.
1538          *
1539          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1540          * conversion or used with a non-NL requested mode.  We also know an
1541          * EXPEDITE request is always granted immediately, so now must always
1542          * be 1.  The full condition to grant an expedite request: (now &&
1543          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1544          * therefore be shortened to just checking the flag.
1545          */
1546
1547         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1548                 return 1;
1549
1550         /*
1551          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1552          * added to the remaining conditions.
1553          */
1554
1555         if (queue_conflict(&r->res_grantqueue, lkb))
1556                 goto out;
1557
1558         /*
1559          * 6-3: By default, a conversion request is immediately granted if the
1560          * requested mode is compatible with the modes of all other granted
1561          * locks
1562          */
1563
1564         if (queue_conflict(&r->res_convertqueue, lkb))
1565                 goto out;
1566
1567         /*
1568          * 6-5: But the default algorithm for deciding whether to grant or
1569          * queue conversion requests does not by itself guarantee that such
1570          * requests are serviced on a "first come first serve" basis.  This, in
1571          * turn, can lead to a phenomenon known as "indefinate postponement".
1572          *
1573          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1574          * the system service employed to request a lock conversion.  This flag
1575          * forces certain conversion requests to be queued, even if they are
1576          * compatible with the granted modes of other locks on the same
1577          * resource.  Thus, the use of this flag results in conversion requests
1578          * being ordered on a "first come first servce" basis.
1579          *
1580          * DCT: This condition is all about new conversions being able to occur
1581          * "in place" while the lock remains on the granted queue (assuming
1582          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1583          * doesn't _have_ to go onto the convert queue where it's processed in
1584          * order.  The "now" variable is necessary to distinguish converts
1585          * being received and processed for the first time now, because once a
1586          * convert is moved to the conversion queue the condition below applies
1587          * requiring fifo granting.
1588          */
1589
1590         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1591                 return 1;
1592
1593         /*
1594          * The NOORDER flag is set to avoid the standard vms rules on grant
1595          * order.
1596          */
1597
1598         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1599                 return 1;
1600
1601         /*
1602          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1603          * granted until all other conversion requests ahead of it are granted
1604          * and/or canceled.
1605          */
1606
1607         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1608                 return 1;
1609
1610         /*
1611          * 6-4: By default, a new request is immediately granted only if all
1612          * three of the following conditions are satisfied when the request is
1613          * issued:
1614          * - The queue of ungranted conversion requests for the resource is
1615          *   empty.
1616          * - The queue of ungranted new requests for the resource is empty.
1617          * - The mode of the new request is compatible with the most
1618          *   restrictive mode of all granted locks on the resource.
1619          */
1620
1621         if (now && !conv && list_empty(&r->res_convertqueue) &&
1622             list_empty(&r->res_waitqueue))
1623                 return 1;
1624
1625         /*
1626          * 6-4: Once a lock request is in the queue of ungranted new requests,
1627          * it cannot be granted until the queue of ungranted conversion
1628          * requests is empty, all ungranted new requests ahead of it are
1629          * granted and/or canceled, and it is compatible with the granted mode
1630          * of the most restrictive lock granted on the resource.
1631          */
1632
1633         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1634             first_in_list(lkb, &r->res_waitqueue))
1635                 return 1;
1636  out:
1637         return 0;
1638 }
1639
1640 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1641                           int *err)
1642 {
1643         int rv;
1644         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1645         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1646
1647         if (err)
1648                 *err = 0;
1649
1650         rv = _can_be_granted(r, lkb, now);
1651         if (rv)
1652                 goto out;
1653
1654         /*
1655          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1656          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1657          * cancels one of the locks.
1658          */
1659
1660         if (is_convert && can_be_queued(lkb) &&
1661             conversion_deadlock_detect(r, lkb)) {
1662                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1663                         lkb->lkb_grmode = DLM_LOCK_NL;
1664                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1665                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1666                         if (err)
1667                                 *err = -EDEADLK;
1668                         else {
1669                                 log_print("can_be_granted deadlock %x now %d",
1670                                           lkb->lkb_id, now);
1671                                 dlm_dump_rsb(r);
1672                         }
1673                 }
1674                 goto out;
1675         }
1676
1677         /*
1678          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1679          * to grant a request in a mode other than the normal rqmode.  It's a
1680          * simple way to provide a big optimization to applications that can
1681          * use them.
1682          */
1683
1684         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1685                 alt = DLM_LOCK_PR;
1686         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1687                 alt = DLM_LOCK_CW;
1688
1689         if (alt) {
1690                 lkb->lkb_rqmode = alt;
1691                 rv = _can_be_granted(r, lkb, now);
1692                 if (rv)
1693                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1694                 else
1695                         lkb->lkb_rqmode = rqmode;
1696         }
1697  out:
1698         return rv;
1699 }
1700
1701 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1702    for locks pending on the convert list.  Once verified (watch for these
1703    log_prints), we should be able to just call _can_be_granted() and not
1704    bother with the demote/deadlk cases here (and there's no easy way to deal
1705    with a deadlk here, we'd have to generate something like grant_lock with
1706    the deadlk error.) */
1707
1708 /* Returns the highest requested mode of all blocked conversions; sets
1709    cw if there's a blocked conversion to DLM_LOCK_CW. */
1710
1711 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1712 {
1713         struct dlm_lkb *lkb, *s;
1714         int hi, demoted, quit, grant_restart, demote_restart;
1715         int deadlk;
1716
1717         quit = 0;
1718  restart:
1719         grant_restart = 0;
1720         demote_restart = 0;
1721         hi = DLM_LOCK_IV;
1722
1723         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1724                 demoted = is_demoted(lkb);
1725                 deadlk = 0;
1726
1727                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1728                         grant_lock_pending(r, lkb);
1729                         grant_restart = 1;
1730                         continue;
1731                 }
1732
1733                 if (!demoted && is_demoted(lkb)) {
1734                         log_print("WARN: pending demoted %x node %d %s",
1735                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1736                         demote_restart = 1;
1737                         continue;
1738                 }
1739
1740                 if (deadlk) {
1741                         log_print("WARN: pending deadlock %x node %d %s",
1742                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1743                         dlm_dump_rsb(r);
1744                         continue;
1745                 }
1746
1747                 hi = max_t(int, lkb->lkb_rqmode, hi);
1748
1749                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1750                         *cw = 1;
1751         }
1752
1753         if (grant_restart)
1754                 goto restart;
1755         if (demote_restart && !quit) {
1756                 quit = 1;
1757                 goto restart;
1758         }
1759
1760         return max_t(int, high, hi);
1761 }
1762
1763 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1764 {
1765         struct dlm_lkb *lkb, *s;
1766
1767         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1768                 if (can_be_granted(r, lkb, 0, NULL))
1769                         grant_lock_pending(r, lkb);
1770                 else {
1771                         high = max_t(int, lkb->lkb_rqmode, high);
1772                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1773                                 *cw = 1;
1774                 }
1775         }
1776
1777         return high;
1778 }
1779
1780 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1781    on either the convert or waiting queue.
1782    high is the largest rqmode of all locks blocked on the convert or
1783    waiting queue. */
1784
1785 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1786 {
1787         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1788                 if (gr->lkb_highbast < DLM_LOCK_EX)
1789                         return 1;
1790                 return 0;
1791         }
1792
1793         if (gr->lkb_highbast < high &&
1794             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1795                 return 1;
1796         return 0;
1797 }
1798
1799 static void grant_pending_locks(struct dlm_rsb *r)
1800 {
1801         struct dlm_lkb *lkb, *s;
1802         int high = DLM_LOCK_IV;
1803         int cw = 0;
1804
1805         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1806
1807         high = grant_pending_convert(r, high, &cw);
1808         high = grant_pending_wait(r, high, &cw);
1809
1810         if (high == DLM_LOCK_IV)
1811                 return;
1812
1813         /*
1814          * If there are locks left on the wait/convert queue then send blocking
1815          * ASTs to granted locks based on the largest requested mode (high)
1816          * found above.
1817          */
1818
1819         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1820                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1821                         if (cw && high == DLM_LOCK_PR &&
1822                             lkb->lkb_grmode == DLM_LOCK_PR)
1823                                 queue_bast(r, lkb, DLM_LOCK_CW);
1824                         else
1825                                 queue_bast(r, lkb, high);
1826                         lkb->lkb_highbast = high;
1827                 }
1828         }
1829 }
1830
1831 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1832 {
1833         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1834             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1835                 if (gr->lkb_highbast < DLM_LOCK_EX)
1836                         return 1;
1837                 return 0;
1838         }
1839
1840         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1841                 return 1;
1842         return 0;
1843 }
1844
1845 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1846                             struct dlm_lkb *lkb)
1847 {
1848         struct dlm_lkb *gr;
1849
1850         list_for_each_entry(gr, head, lkb_statequeue) {
1851                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1852                         queue_bast(r, gr, lkb->lkb_rqmode);
1853                         gr->lkb_highbast = lkb->lkb_rqmode;
1854                 }
1855         }
1856 }
1857
1858 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1859 {
1860         send_bast_queue(r, &r->res_grantqueue, lkb);
1861 }
1862
1863 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1864 {
1865         send_bast_queue(r, &r->res_grantqueue, lkb);
1866         send_bast_queue(r, &r->res_convertqueue, lkb);
1867 }
1868
1869 /* set_master(r, lkb) -- set the master nodeid of a resource
1870
1871    The purpose of this function is to set the nodeid field in the given
1872    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1873    known, it can just be copied to the lkb and the function will return
1874    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1875    before it can be copied to the lkb.
1876
1877    When the rsb nodeid is being looked up remotely, the initial lkb
1878    causing the lookup is kept on the ls_waiters list waiting for the
1879    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1880    on the rsb's res_lookup list until the master is verified.
1881
1882    Return values:
1883    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1884    1: the rsb master is not available and the lkb has been placed on
1885       a wait queue
1886 */
1887
1888 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1889 {
1890         struct dlm_ls *ls = r->res_ls;
1891         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1892
1893         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1894                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1895                 r->res_first_lkid = lkb->lkb_id;
1896                 lkb->lkb_nodeid = r->res_nodeid;
1897                 return 0;
1898         }
1899
1900         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1901                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1902                 return 1;
1903         }
1904
1905         if (r->res_nodeid == 0) {
1906                 lkb->lkb_nodeid = 0;
1907                 return 0;
1908         }
1909
1910         if (r->res_nodeid > 0) {
1911                 lkb->lkb_nodeid = r->res_nodeid;
1912                 return 0;
1913         }
1914
1915         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1916
1917         dir_nodeid = dlm_dir_nodeid(r);
1918
1919         if (dir_nodeid != our_nodeid) {
1920                 r->res_first_lkid = lkb->lkb_id;
1921                 send_lookup(r, lkb);
1922                 return 1;
1923         }
1924
1925         for (i = 0; i < 2; i++) {
1926                 /* It's possible for dlm_scand to remove an old rsb for
1927                    this same resource from the toss list, us to create
1928                    a new one, look up the master locally, and find it
1929                    already exists just before dlm_scand does the
1930                    dir_remove() on the previous rsb. */
1931
1932                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1933                                        r->res_length, &ret_nodeid);
1934                 if (!error)
1935                         break;
1936                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1937                 schedule();
1938         }
1939         if (error && error != -EEXIST)
1940                 return error;
1941
1942         if (ret_nodeid == our_nodeid) {
1943                 r->res_first_lkid = 0;
1944                 r->res_nodeid = 0;
1945                 lkb->lkb_nodeid = 0;
1946         } else {
1947                 r->res_first_lkid = lkb->lkb_id;
1948                 r->res_nodeid = ret_nodeid;
1949                 lkb->lkb_nodeid = ret_nodeid;
1950         }
1951         return 0;
1952 }
1953
1954 static void process_lookup_list(struct dlm_rsb *r)
1955 {
1956         struct dlm_lkb *lkb, *safe;
1957
1958         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1959                 list_del_init(&lkb->lkb_rsb_lookup);
1960                 _request_lock(r, lkb);
1961                 schedule();
1962         }
1963 }
1964
1965 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1966
1967 static void confirm_master(struct dlm_rsb *r, int error)
1968 {
1969         struct dlm_lkb *lkb;
1970
1971         if (!r->res_first_lkid)
1972                 return;
1973
1974         switch (error) {
1975         case 0:
1976         case -EINPROGRESS:
1977                 r->res_first_lkid = 0;
1978                 process_lookup_list(r);
1979                 break;
1980
1981         case -EAGAIN:
1982         case -EBADR:
1983         case -ENOTBLK:
1984                 /* the remote request failed and won't be retried (it was
1985                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1986                    lkb the first_lkid */
1987
1988                 r->res_first_lkid = 0;
1989
1990                 if (!list_empty(&r->res_lookup)) {
1991                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1992                                          lkb_rsb_lookup);
1993                         list_del_init(&lkb->lkb_rsb_lookup);
1994                         r->res_first_lkid = lkb->lkb_id;
1995                         _request_lock(r, lkb);
1996                 }
1997                 break;
1998
1999         default:
2000                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2001         }
2002 }
2003
2004 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2005                          int namelen, unsigned long timeout_cs,
2006                          void (*ast) (void *astparam),
2007                          void *astparam,
2008                          void (*bast) (void *astparam, int mode),
2009                          struct dlm_args *args)
2010 {
2011         int rv = -EINVAL;
2012
2013         /* check for invalid arg usage */
2014
2015         if (mode < 0 || mode > DLM_LOCK_EX)
2016                 goto out;
2017
2018         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2019                 goto out;
2020
2021         if (flags & DLM_LKF_CANCEL)
2022                 goto out;
2023
2024         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2025                 goto out;
2026
2027         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2028                 goto out;
2029
2030         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2031                 goto out;
2032
2033         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2034                 goto out;
2035
2036         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2037                 goto out;
2038
2039         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2040                 goto out;
2041
2042         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2043                 goto out;
2044
2045         if (!ast || !lksb)
2046                 goto out;
2047
2048         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2049                 goto out;
2050
2051         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2052                 goto out;
2053
2054         /* these args will be copied to the lkb in validate_lock_args,
2055            it cannot be done now because when converting locks, fields in
2056            an active lkb cannot be modified before locking the rsb */
2057
2058         args->flags = flags;
2059         args->astfn = ast;
2060         args->astparam = astparam;
2061         args->bastfn = bast;
2062         args->timeout = timeout_cs;
2063         args->mode = mode;
2064         args->lksb = lksb;
2065         rv = 0;
2066  out:
2067         return rv;
2068 }
2069
2070 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2071 {
2072         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2073                       DLM_LKF_FORCEUNLOCK))
2074                 return -EINVAL;
2075
2076         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2077                 return -EINVAL;
2078
2079         args->flags = flags;
2080         args->astparam = astarg;
2081         return 0;
2082 }
2083
2084 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2085                               struct dlm_args *args)
2086 {
2087         int rv = -EINVAL;
2088
2089         if (args->flags & DLM_LKF_CONVERT) {
2090                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2091                         goto out;
2092
2093                 if (args->flags & DLM_LKF_QUECVT &&
2094                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2095                         goto out;
2096
2097                 rv = -EBUSY;
2098                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2099                         goto out;
2100
2101                 if (lkb->lkb_wait_type)
2102                         goto out;
2103
2104                 if (is_overlap(lkb))
2105                         goto out;
2106         }
2107
2108         lkb->lkb_exflags = args->flags;
2109         lkb->lkb_sbflags = 0;
2110         lkb->lkb_astfn = args->astfn;
2111         lkb->lkb_astparam = args->astparam;
2112         lkb->lkb_bastfn = args->bastfn;
2113         lkb->lkb_rqmode = args->mode;
2114         lkb->lkb_lksb = args->lksb;
2115         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2116         lkb->lkb_ownpid = (int) current->pid;
2117         lkb->lkb_timeout_cs = args->timeout;
2118         rv = 0;
2119  out:
2120         if (rv)
2121                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2122                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2123                           lkb->lkb_status, lkb->lkb_wait_type,
2124                           lkb->lkb_resource->res_name);
2125         return rv;
2126 }
2127
2128 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2129    for success */
2130
2131 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2132    because there may be a lookup in progress and it's valid to do
2133    cancel/unlockf on it */
2134
2135 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2136 {
2137         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2138         int rv = -EINVAL;
2139
2140         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2141                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2142                 dlm_print_lkb(lkb);
2143                 goto out;
2144         }
2145
2146         /* an lkb may still exist even though the lock is EOL'ed due to a
2147            cancel, unlock or failed noqueue request; an app can't use these
2148            locks; return same error as if the lkid had not been found at all */
2149
2150         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2151                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2152                 rv = -ENOENT;
2153                 goto out;
2154         }
2155
2156         /* an lkb may be waiting for an rsb lookup to complete where the
2157            lookup was initiated by another lock */
2158
2159         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2160                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2161                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2162                         list_del_init(&lkb->lkb_rsb_lookup);
2163                         queue_cast(lkb->lkb_resource, lkb,
2164                                    args->flags & DLM_LKF_CANCEL ?
2165                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2166                         unhold_lkb(lkb); /* undoes create_lkb() */
2167                 }
2168                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2169                 rv = -EBUSY;
2170                 goto out;
2171         }
2172
2173         /* cancel not allowed with another cancel/unlock in progress */
2174
2175         if (args->flags & DLM_LKF_CANCEL) {
2176                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2177                         goto out;
2178
2179                 if (is_overlap(lkb))
2180                         goto out;
2181
2182                 /* don't let scand try to do a cancel */
2183                 del_timeout(lkb);
2184
2185                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2186                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2187                         rv = -EBUSY;
2188                         goto out;
2189                 }
2190
2191                 /* there's nothing to cancel */
2192                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2193                     !lkb->lkb_wait_type) {
2194                         rv = -EBUSY;
2195                         goto out;
2196                 }
2197
2198                 switch (lkb->lkb_wait_type) {
2199                 case DLM_MSG_LOOKUP:
2200                 case DLM_MSG_REQUEST:
2201                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2202                         rv = -EBUSY;
2203                         goto out;
2204                 case DLM_MSG_UNLOCK:
2205                 case DLM_MSG_CANCEL:
2206                         goto out;
2207                 }
2208                 /* add_to_waiters() will set OVERLAP_CANCEL */
2209                 goto out_ok;
2210         }
2211
2212         /* do we need to allow a force-unlock if there's a normal unlock
2213            already in progress?  in what conditions could the normal unlock
2214            fail such that we'd want to send a force-unlock to be sure? */
2215
2216         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2217                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2218                         goto out;
2219
2220                 if (is_overlap_unlock(lkb))
2221                         goto out;
2222
2223                 /* don't let scand try to do a cancel */
2224                 del_timeout(lkb);
2225
2226                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2227                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2228                         rv = -EBUSY;
2229                         goto out;
2230                 }
2231
2232                 switch (lkb->lkb_wait_type) {
2233                 case DLM_MSG_LOOKUP:
2234                 case DLM_MSG_REQUEST:
2235                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2236                         rv = -EBUSY;
2237                         goto out;
2238                 case DLM_MSG_UNLOCK:
2239                         goto out;
2240                 }
2241                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2242                 goto out_ok;
2243         }
2244
2245         /* normal unlock not allowed if there's any op in progress */
2246         rv = -EBUSY;
2247         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2248                 goto out;
2249
2250  out_ok:
2251         /* an overlapping op shouldn't blow away exflags from other op */
2252         lkb->lkb_exflags |= args->flags;
2253         lkb->lkb_sbflags = 0;
2254         lkb->lkb_astparam = args->astparam;
2255         rv = 0;
2256  out:
2257         if (rv)
2258                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2259                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2260                           args->flags, lkb->lkb_wait_type,
2261                           lkb->lkb_resource->res_name);
2262         return rv;
2263 }
2264
2265 /*
2266  * Four stage 4 varieties:
2267  * do_request(), do_convert(), do_unlock(), do_cancel()
2268  * These are called on the master node for the given lock and
2269  * from the central locking logic.
2270  */
2271
2272 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2273 {
2274         int error = 0;
2275
2276         if (can_be_granted(r, lkb, 1, NULL)) {
2277                 grant_lock(r, lkb);
2278                 queue_cast(r, lkb, 0);
2279                 goto out;
2280         }
2281
2282         if (can_be_queued(lkb)) {
2283                 error = -EINPROGRESS;
2284                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2285                 add_timeout(lkb);
2286                 goto out;
2287         }
2288
2289         error = -EAGAIN;
2290         queue_cast(r, lkb, -EAGAIN);
2291  out:
2292         return error;
2293 }
2294
2295 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2296                                int error)
2297 {
2298         switch (error) {
2299         case -EAGAIN:
2300                 if (force_blocking_asts(lkb))
2301                         send_blocking_asts_all(r, lkb);
2302                 break;
2303         case -EINPROGRESS:
2304                 send_blocking_asts(r, lkb);
2305                 break;
2306         }
2307 }
2308
2309 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2310 {
2311         int error = 0;
2312         int deadlk = 0;
2313
2314         /* changing an existing lock may allow others to be granted */
2315
2316         if (can_be_granted(r, lkb, 1, &deadlk)) {
2317                 grant_lock(r, lkb);
2318                 queue_cast(r, lkb, 0);
2319                 goto out;
2320         }
2321
2322         /* can_be_granted() detected that this lock would block in a conversion
2323            deadlock, so we leave it on the granted queue and return EDEADLK in
2324            the ast for the convert. */
2325
2326         if (deadlk) {
2327                 /* it's left on the granted queue */
2328                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2329                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2330                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2331                 revert_lock(r, lkb);
2332                 queue_cast(r, lkb, -EDEADLK);
2333                 error = -EDEADLK;
2334                 goto out;
2335         }
2336
2337         /* is_demoted() means the can_be_granted() above set the grmode
2338            to NL, and left us on the granted queue.  This auto-demotion
2339            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2340            now grantable.  We have to try to grant other converting locks
2341            before we try again to grant this one. */
2342
2343         if (is_demoted(lkb)) {
2344                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2345                 if (_can_be_granted(r, lkb, 1)) {
2346                         grant_lock(r, lkb);
2347                         queue_cast(r, lkb, 0);
2348                         goto out;
2349                 }
2350                 /* else fall through and move to convert queue */
2351         }
2352
2353         if (can_be_queued(lkb)) {
2354                 error = -EINPROGRESS;
2355                 del_lkb(r, lkb);
2356                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2357                 add_timeout(lkb);
2358                 goto out;
2359         }
2360
2361         error = -EAGAIN;
2362         queue_cast(r, lkb, -EAGAIN);
2363  out:
2364         return error;
2365 }
2366
2367 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2368                                int error)
2369 {
2370         switch (error) {
2371         case 0:
2372                 grant_pending_locks(r);
2373                 /* grant_pending_locks also sends basts */
2374                 break;
2375         case -EAGAIN:
2376                 if (force_blocking_asts(lkb))
2377                         send_blocking_asts_all(r, lkb);
2378                 break;
2379         case -EINPROGRESS:
2380                 send_blocking_asts(r, lkb);
2381                 break;
2382         }
2383 }
2384
2385 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387         remove_lock(r, lkb);
2388         queue_cast(r, lkb, -DLM_EUNLOCK);
2389         return -DLM_EUNLOCK;
2390 }
2391
2392 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2393                               int error)
2394 {
2395         grant_pending_locks(r);
2396 }
2397
2398 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2399  
2400 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2401 {
2402         int error;
2403
2404         error = revert_lock(r, lkb);
2405         if (error) {
2406                 queue_cast(r, lkb, -DLM_ECANCEL);
2407                 return -DLM_ECANCEL;
2408         }
2409         return 0;
2410 }
2411
2412 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2413                               int error)
2414 {
2415         if (error)
2416                 grant_pending_locks(r);
2417 }
2418
2419 /*
2420  * Four stage 3 varieties:
2421  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2422  */
2423
2424 /* add a new lkb to a possibly new rsb, called by requesting process */
2425
2426 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2427 {
2428         int error;
2429
2430         /* set_master: sets lkb nodeid from r */
2431
2432         error = set_master(r, lkb);
2433         if (error < 0)
2434                 goto out;
2435         if (error) {
2436                 error = 0;
2437                 goto out;
2438         }
2439
2440         if (is_remote(r)) {
2441                 /* receive_request() calls do_request() on remote node */
2442                 error = send_request(r, lkb);
2443         } else {
2444                 error = do_request(r, lkb);
2445                 /* for remote locks the request_reply is sent
2446                    between do_request and do_request_effects */
2447                 do_request_effects(r, lkb, error);
2448         }
2449  out:
2450         return error;
2451 }
2452
2453 /* change some property of an existing lkb, e.g. mode */
2454
2455 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2456 {
2457         int error;
2458
2459         if (is_remote(r)) {
2460                 /* receive_convert() calls do_convert() on remote node */
2461                 error = send_convert(r, lkb);
2462         } else {
2463                 error = do_convert(r, lkb);
2464                 /* for remote locks the convert_reply is sent
2465                    between do_convert and do_convert_effects */
2466                 do_convert_effects(r, lkb, error);
2467         }
2468
2469         return error;
2470 }
2471
2472 /* remove an existing lkb from the granted queue */
2473
2474 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2475 {
2476         int error;
2477
2478         if (is_remote(r)) {
2479                 /* receive_unlock() calls do_unlock() on remote node */
2480                 error = send_unlock(r, lkb);
2481         } else {
2482                 error = do_unlock(r, lkb);
2483                 /* for remote locks the unlock_reply is sent
2484                    between do_unlock and do_unlock_effects */
2485                 do_unlock_effects(r, lkb, error);
2486         }
2487
2488         return error;
2489 }
2490
2491 /* remove an existing lkb from the convert or wait queue */
2492
2493 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2494 {
2495         int error;
2496
2497         if (is_remote(r)) {
2498                 /* receive_cancel() calls do_cancel() on remote node */
2499                 error = send_cancel(r, lkb);
2500         } else {
2501                 error = do_cancel(r, lkb);
2502                 /* for remote locks the cancel_reply is sent
2503                    between do_cancel and do_cancel_effects */
2504                 do_cancel_effects(r, lkb, error);
2505         }
2506
2507         return error;
2508 }
2509
2510 /*
2511  * Four stage 2 varieties:
2512  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2513  */
2514
2515 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2516                         int len, struct dlm_args *args)
2517 {
2518         struct dlm_rsb *r;
2519         int error;
2520
2521         error = validate_lock_args(ls, lkb, args);
2522         if (error)
2523                 goto out;
2524
2525         error = find_rsb(ls, name, len, R_CREATE, &r);
2526         if (error)
2527                 goto out;
2528
2529         lock_rsb(r);
2530
2531         attach_lkb(r, lkb);
2532         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2533
2534         error = _request_lock(r, lkb);
2535
2536         unlock_rsb(r);
2537         put_rsb(r);
2538
2539  out:
2540         return error;
2541 }
2542
2543 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2544                         struct dlm_args *args)
2545 {
2546         struct dlm_rsb *r;
2547         int error;
2548
2549         r = lkb->lkb_resource;
2550
2551         hold_rsb(r);
2552         lock_rsb(r);
2553
2554         error = validate_lock_args(ls, lkb, args);
2555         if (error)
2556                 goto out;
2557
2558         error = _convert_lock(r, lkb);
2559  out:
2560         unlock_rsb(r);
2561         put_rsb(r);
2562         return error;
2563 }
2564
2565 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2566                        struct dlm_args *args)
2567 {
2568         struct dlm_rsb *r;
2569         int error;
2570
2571         r = lkb->lkb_resource;
2572
2573         hold_rsb(r);
2574         lock_rsb(r);
2575
2576         error = validate_unlock_args(lkb, args);
2577         if (error)
2578                 goto out;
2579
2580         error = _unlock_lock(r, lkb);
2581  out:
2582         unlock_rsb(r);
2583         put_rsb(r);
2584         return error;
2585 }
2586
2587 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2588                        struct dlm_args *args)
2589 {
2590         struct dlm_rsb *r;
2591         int error;
2592
2593         r = lkb->lkb_resource;
2594
2595         hold_rsb(r);
2596         lock_rsb(r);
2597
2598         error = validate_unlock_args(lkb, args);
2599         if (error)
2600                 goto out;
2601
2602         error = _cancel_lock(r, lkb);
2603  out:
2604         unlock_rsb(r);
2605         put_rsb(r);
2606         return error;
2607 }
2608
2609 /*
2610  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2611  */
2612
2613 int dlm_lock(dlm_lockspace_t *lockspace,
2614              int mode,
2615              struct dlm_lksb *lksb,
2616              uint32_t flags,
2617              void *name,
2618              unsigned int namelen,
2619              uint32_t parent_lkid,
2620              void (*ast) (void *astarg),
2621              void *astarg,
2622              void (*bast) (void *astarg, int mode))
2623 {
2624         struct dlm_ls *ls;
2625         struct dlm_lkb *lkb;
2626         struct dlm_args args;
2627         int error, convert = flags & DLM_LKF_CONVERT;
2628
2629         ls = dlm_find_lockspace_local(lockspace);
2630         if (!ls)
2631                 return -EINVAL;
2632
2633         dlm_lock_recovery(ls);
2634
2635         if (convert)
2636                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2637         else
2638                 error = create_lkb(ls, &lkb);
2639
2640         if (error)
2641                 goto out;
2642
2643         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2644                               astarg, bast, &args);
2645         if (error)
2646                 goto out_put;
2647
2648         if (convert)
2649                 error = convert_lock(ls, lkb, &args);
2650         else
2651                 error = request_lock(ls, lkb, name, namelen, &args);
2652
2653         if (error == -EINPROGRESS)
2654                 error = 0;
2655  out_put:
2656         if (convert || error)
2657                 __put_lkb(ls, lkb);
2658         if (error == -EAGAIN || error == -EDEADLK)
2659                 error = 0;
2660  out:
2661         dlm_unlock_recovery(ls);
2662         dlm_put_lockspace(ls);
2663         return error;
2664 }
2665
2666 int dlm_unlock(dlm_lockspace_t *lockspace,
2667                uint32_t lkid,
2668                uint32_t flags,
2669                struct dlm_lksb *lksb,
2670                void *astarg)
2671 {
2672         struct dlm_ls *ls;
2673         struct dlm_lkb *lkb;
2674         struct dlm_args args;
2675         int error;
2676
2677         ls = dlm_find_lockspace_local(lockspace);
2678         if (!ls)
2679                 return -EINVAL;
2680
2681         dlm_lock_recovery(ls);
2682
2683         error = find_lkb(ls, lkid, &lkb);
2684         if (error)
2685                 goto out;
2686
2687         error = set_unlock_args(flags, astarg, &args);
2688         if (error)
2689                 goto out_put;
2690
2691         if (flags & DLM_LKF_CANCEL)
2692                 error = cancel_lock(ls, lkb, &args);
2693         else
2694                 error = unlock_lock(ls, lkb, &args);
2695
2696         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2697                 error = 0;
2698         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2699                 error = 0;
2700  out_put:
2701         dlm_put_lkb(lkb);
2702  out:
2703         dlm_unlock_recovery(ls);
2704         dlm_put_lockspace(ls);
2705         return error;
2706 }
2707
2708 /*
2709  * send/receive routines for remote operations and replies
2710  *
2711  * send_args
2712  * send_common
2713  * send_request                 receive_request
2714  * send_convert                 receive_convert
2715  * send_unlock                  receive_unlock
2716  * send_cancel                  receive_cancel
2717  * send_grant                   receive_grant
2718  * send_bast                    receive_bast
2719  * send_lookup                  receive_lookup
2720  * send_remove                  receive_remove
2721  *
2722  *                              send_common_reply
2723  * receive_request_reply        send_request_reply
2724  * receive_convert_reply        send_convert_reply
2725  * receive_unlock_reply         send_unlock_reply
2726  * receive_cancel_reply         send_cancel_reply
2727  * receive_lookup_reply         send_lookup_reply
2728  */
2729
2730 static int _create_message(struct dlm_ls *ls, int mb_len,
2731                            int to_nodeid, int mstype,
2732                            struct dlm_message **ms_ret,
2733                            struct dlm_mhandle **mh_ret)
2734 {
2735         struct dlm_message *ms;
2736         struct dlm_mhandle *mh;
2737         char *mb;
2738
2739         /* get_buffer gives us a message handle (mh) that we need to
2740            pass into lowcomms_commit and a message buffer (mb) that we
2741            write our data into */
2742
2743         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2744         if (!mh)
2745                 return -ENOBUFS;
2746
2747         memset(mb, 0, mb_len);
2748
2749         ms = (struct dlm_message *) mb;
2750
2751         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2752         ms->m_header.h_lockspace = ls->ls_global_id;
2753         ms->m_header.h_nodeid = dlm_our_nodeid();
2754         ms->m_header.h_length = mb_len;
2755         ms->m_header.h_cmd = DLM_MSG;
2756
2757         ms->m_type = mstype;
2758
2759         *mh_ret = mh;
2760         *ms_ret = ms;
2761         return 0;
2762 }
2763
2764 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2765                           int to_nodeid, int mstype,
2766                           struct dlm_message **ms_ret,
2767                           struct dlm_mhandle **mh_ret)
2768 {
2769         int mb_len = sizeof(struct dlm_message);
2770
2771         switch (mstype) {
2772         case DLM_MSG_REQUEST:
2773         case DLM_MSG_LOOKUP:
2774         case DLM_MSG_REMOVE:
2775                 mb_len += r->res_length;
2776                 break;
2777         case DLM_MSG_CONVERT:
2778         case DLM_MSG_UNLOCK:
2779         case DLM_MSG_REQUEST_REPLY:
2780         case DLM_MSG_CONVERT_REPLY:
2781         case DLM_MSG_GRANT:
2782                 if (lkb && lkb->lkb_lvbptr)
2783                         mb_len += r->res_ls->ls_lvblen;
2784                 break;
2785         }
2786
2787         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2788                                ms_ret, mh_ret);
2789 }
2790
2791 /* further lowcomms enhancements or alternate implementations may make
2792    the return value from this function useful at some point */
2793
2794 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2795 {
2796         dlm_message_out(ms);
2797         dlm_lowcomms_commit_buffer(mh);
2798         return 0;
2799 }
2800
2801 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2802                       struct dlm_message *ms)
2803 {
2804         ms->m_nodeid   = lkb->lkb_nodeid;
2805         ms->m_pid      = lkb->lkb_ownpid;
2806         ms->m_lkid     = lkb->lkb_id;
2807         ms->m_remid    = lkb->lkb_remid;
2808         ms->m_exflags  = lkb->lkb_exflags;
2809         ms->m_sbflags  = lkb->lkb_sbflags;
2810         ms->m_flags    = lkb->lkb_flags;
2811         ms->m_lvbseq   = lkb->lkb_lvbseq;
2812         ms->m_status   = lkb->lkb_status;
2813         ms->m_grmode   = lkb->lkb_grmode;
2814         ms->m_rqmode   = lkb->lkb_rqmode;
2815         ms->m_hash     = r->res_hash;
2816
2817         /* m_result and m_bastmode are set from function args,
2818            not from lkb fields */
2819
2820         if (lkb->lkb_bastfn)
2821                 ms->m_asts |= AST_BAST;
2822         if (lkb->lkb_astfn)
2823                 ms->m_asts |= AST_COMP;
2824
2825         /* compare with switch in create_message; send_remove() doesn't
2826            use send_args() */
2827
2828         switch (ms->m_type) {
2829         case DLM_MSG_REQUEST:
2830         case DLM_MSG_LOOKUP:
2831                 memcpy(ms->m_extra, r->res_name, r->res_length);
2832                 break;
2833         case DLM_MSG_CONVERT:
2834         case DLM_MSG_UNLOCK:
2835         case DLM_MSG_REQUEST_REPLY:
2836         case DLM_MSG_CONVERT_REPLY:
2837         case DLM_MSG_GRANT:
2838                 if (!lkb->lkb_lvbptr)
2839                         break;
2840                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2841                 break;
2842         }
2843 }
2844
2845 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2846 {
2847         struct dlm_message *ms;
2848         struct dlm_mhandle *mh;
2849         int to_nodeid, error;
2850
2851         error = add_to_waiters(lkb, mstype);
2852         if (error)
2853                 return error;
2854
2855         to_nodeid = r->res_nodeid;
2856
2857         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2858         if (error)
2859                 goto fail;
2860
2861         send_args(r, lkb, ms);
2862
2863         error = send_message(mh, ms);
2864         if (error)
2865                 goto fail;
2866         return 0;
2867
2868  fail:
2869         remove_from_waiters(lkb, msg_reply_type(mstype));
2870         return error;
2871 }
2872
2873 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2874 {
2875         return send_common(r, lkb, DLM_MSG_REQUEST);
2876 }
2877
2878 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2879 {
2880         int error;
2881
2882         error = send_common(r, lkb, DLM_MSG_CONVERT);
2883
2884         /* down conversions go without a reply from the master */
2885         if (!error && down_conversion(lkb)) {
2886                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2887                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2888                 r->res_ls->ls_stub_ms.m_result = 0;
2889                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2890                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2891         }
2892
2893         return error;
2894 }
2895
2896 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2897    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2898    that the master is still correct. */
2899
2900 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2901 {
2902         return send_common(r, lkb, DLM_MSG_UNLOCK);
2903 }
2904
2905 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906 {
2907         return send_common(r, lkb, DLM_MSG_CANCEL);
2908 }
2909
2910 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2911 {
2912         struct dlm_message *ms;
2913         struct dlm_mhandle *mh;
2914         int to_nodeid, error;
2915
2916         to_nodeid = lkb->lkb_nodeid;
2917
2918         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2919         if (error)
2920                 goto out;
2921
2922         send_args(r, lkb, ms);
2923
2924         ms->m_result = 0;
2925
2926         error = send_message(mh, ms);
2927  out:
2928         return error;
2929 }
2930
2931 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2932 {
2933         struct dlm_message *ms;
2934         struct dlm_mhandle *mh;
2935         int to_nodeid, error;
2936
2937         to_nodeid = lkb->lkb_nodeid;
2938
2939         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2940         if (error)
2941                 goto out;
2942
2943         send_args(r, lkb, ms);
2944
2945         ms->m_bastmode = mode;
2946
2947         error = send_message(mh, ms);
2948  out:
2949         return error;
2950 }
2951
2952 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2953 {
2954         struct dlm_message *ms;
2955         struct dlm_mhandle *mh;
2956         int to_nodeid, error;
2957
2958         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2959         if (error)
2960                 return error;
2961
2962         to_nodeid = dlm_dir_nodeid(r);
2963
2964         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2965         if (error)
2966                 goto fail;
2967
2968         send_args(r, lkb, ms);
2969
2970         error = send_message(mh, ms);
2971         if (error)
2972                 goto fail;
2973         return 0;
2974
2975  fail:
2976         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2977         return error;
2978 }
2979
2980 static int send_remove(struct dlm_rsb *r)
2981 {
2982         struct dlm_message *ms;
2983         struct dlm_mhandle *mh;
2984         int to_nodeid, error;
2985
2986         to_nodeid = dlm_dir_nodeid(r);
2987
2988         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2989         if (error)
2990                 goto out;
2991
2992         memcpy(ms->m_extra, r->res_name, r->res_length);
2993         ms->m_hash = r->res_hash;
2994
2995         error = send_message(mh, ms);
2996  out:
2997         return error;
2998 }
2999
3000 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3001                              int mstype, int rv)
3002 {
3003         struct dlm_message *ms;
3004         struct dlm_mhandle *mh;
3005         int to_nodeid, error;
3006
3007         to_nodeid = lkb->lkb_nodeid;
3008
3009         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3010         if (error)
3011                 goto out;
3012
3013         send_args(r, lkb, ms);
3014
3015         ms->m_result = rv;
3016
3017         error = send_message(mh, ms);
3018  out:
3019         return error;
3020 }
3021
3022 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3023 {
3024         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3025 }
3026
3027 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3028 {
3029         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3030 }
3031
3032 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3033 {
3034         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3035 }
3036
3037 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3038 {
3039         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3040 }
3041
3042 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3043                              int ret_nodeid, int rv)
3044 {
3045         struct dlm_rsb *r = &ls->ls_stub_rsb;
3046         struct dlm_message *ms;
3047         struct dlm_mhandle *mh;
3048         int error, nodeid = ms_in->m_header.h_nodeid;
3049
3050         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3051         if (error)
3052                 goto out;
3053
3054         ms->m_lkid = ms_in->m_lkid;
3055         ms->m_result = rv;
3056         ms->m_nodeid = ret_nodeid;
3057
3058         error = send_message(mh, ms);
3059  out:
3060         return error;
3061 }
3062
3063 /* which args we save from a received message depends heavily on the type
3064    of message, unlike the send side where we can safely send everything about
3065    the lkb for any type of message */
3066
3067 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3068 {
3069         lkb->lkb_exflags = ms->m_exflags;
3070         lkb->lkb_sbflags = ms->m_sbflags;
3071         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3072                          (ms->m_flags & 0x0000FFFF);
3073 }
3074
3075 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077         lkb->lkb_sbflags = ms->m_sbflags;
3078         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3079                          (ms->m_flags & 0x0000FFFF);
3080 }
3081
3082 static int receive_extralen(struct dlm_message *ms)
3083 {
3084         return (ms->m_header.h_length - sizeof(struct dlm_message));
3085 }
3086
3087 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3088                        struct dlm_message *ms)
3089 {
3090         int len;
3091
3092         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3093                 if (!lkb->lkb_lvbptr)
3094                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3095                 if (!lkb->lkb_lvbptr)
3096                         return -ENOMEM;
3097                 len = receive_extralen(ms);
3098                 if (len > DLM_RESNAME_MAXLEN)
3099                         len = DLM_RESNAME_MAXLEN;
3100                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3101         }
3102         return 0;
3103 }
3104
3105 static void fake_bastfn(void *astparam, int mode)
3106 {
3107         log_print("fake_bastfn should not be called");
3108 }
3109
3110 static void fake_astfn(void *astparam)
3111 {
3112         log_print("fake_astfn should not be called");
3113 }
3114
3115 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3116                                 struct dlm_message *ms)
3117 {
3118         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3119         lkb->lkb_ownpid = ms->m_pid;
3120         lkb->lkb_remid = ms->m_lkid;
3121         lkb->lkb_grmode = DLM_LOCK_IV;
3122         lkb->lkb_rqmode = ms->m_rqmode;
3123
3124         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3125         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3126
3127         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3128                 /* lkb was just created so there won't be an lvb yet */
3129                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3130                 if (!lkb->lkb_lvbptr)
3131                         return -ENOMEM;
3132         }
3133
3134         return 0;
3135 }
3136
3137 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138                                 struct dlm_message *ms)
3139 {
3140         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3141                 return -EBUSY;
3142
3143         if (receive_lvb(ls, lkb, ms))
3144                 return -ENOMEM;
3145
3146         lkb->lkb_rqmode = ms->m_rqmode;
3147         lkb->lkb_lvbseq = ms->m_lvbseq;
3148
3149         return 0;
3150 }
3151
3152 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3153                                struct dlm_message *ms)
3154 {
3155         if (receive_lvb(ls, lkb, ms))
3156                 return -ENOMEM;
3157         return 0;
3158 }
3159
3160 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3161    uses to send a reply and that the remote end uses to process the reply. */
3162
3163 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3166         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3167         lkb->lkb_remid = ms->m_lkid;
3168 }
3169
3170 /* This is called after the rsb is locked so that we can safely inspect
3171    fields in the lkb. */
3172
3173 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3174 {
3175         int from = ms->m_header.h_nodeid;
3176         int error = 0;
3177
3178         switch (ms->m_type) {
3179         case DLM_MSG_CONVERT:
3180         case DLM_MSG_UNLOCK:
3181         case DLM_MSG_CANCEL:
3182                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3183                         error = -EINVAL;
3184                 break;
3185
3186         case DLM_MSG_CONVERT_REPLY:
3187         case DLM_MSG_UNLOCK_REPLY:
3188         case DLM_MSG_CANCEL_REPLY:
3189         case DLM_MSG_GRANT:
3190         case DLM_MSG_BAST:
3191                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3192                         error = -EINVAL;
3193                 break;
3194
3195         case DLM_MSG_REQUEST_REPLY:
3196                 if (!is_process_copy(lkb))
3197                         error = -EINVAL;
3198                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3199                         error = -EINVAL;
3200                 break;
3201
3202         default:
3203                 error = -EINVAL;
3204         }
3205
3206         if (error)
3207                 log_error(lkb->lkb_resource->res_ls,
3208                           "ignore invalid message %d from %d %x %x %x %d",
3209                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3210                           lkb->lkb_flags, lkb->lkb_nodeid);
3211         return error;
3212 }
3213
3214 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3215 {
3216         struct dlm_lkb *lkb;
3217         struct dlm_rsb *r;
3218         int error, namelen;
3219
3220         error = create_lkb(ls, &lkb);
3221         if (error)
3222                 goto fail;
3223
3224         receive_flags(lkb, ms);
3225         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3226         error = receive_request_args(ls, lkb, ms);
3227         if (error) {
3228                 __put_lkb(ls, lkb);
3229                 goto fail;
3230         }
3231
3232         namelen = receive_extralen(ms);
3233
3234         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3235         if (error) {
3236                 __put_lkb(ls, lkb);
3237                 goto fail;
3238         }
3239
3240         lock_rsb(r);
3241
3242         attach_lkb(r, lkb);
3243         error = do_request(r, lkb);
3244         send_request_reply(r, lkb, error);
3245         do_request_effects(r, lkb, error);
3246
3247         unlock_rsb(r);
3248         put_rsb(r);
3249
3250         if (error == -EINPROGRESS)
3251                 error = 0;
3252         if (error)
3253                 dlm_put_lkb(lkb);
3254         return;
3255
3256  fail:
3257         setup_stub_lkb(ls, ms);
3258         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3259 }
3260
3261 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263         struct dlm_lkb *lkb;
3264         struct dlm_rsb *r;
3265         int error, reply = 1;
3266
3267         error = find_lkb(ls, ms->m_remid, &lkb);
3268         if (error)
3269                 goto fail;
3270
3271         r = lkb->lkb_resource;
3272
3273         hold_rsb(r);
3274         lock_rsb(r);
3275
3276         error = validate_message(lkb, ms);
3277         if (error)
3278                 goto out;
3279
3280         receive_flags(lkb, ms);
3281
3282         error = receive_convert_args(ls, lkb, ms);
3283         if (error) {
3284                 send_convert_reply(r, lkb, error);
3285                 goto out;
3286         }
3287
3288         reply = !down_conversion(lkb);
3289
3290         error = do_convert(r, lkb);
3291         if (reply)
3292                 send_convert_reply(r, lkb, error);
3293         do_convert_effects(r, lkb, error);
3294  out:
3295         unlock_rsb(r);
3296         put_rsb(r);
3297         dlm_put_lkb(lkb);
3298         return;
3299
3300  fail:
3301         setup_stub_lkb(ls, ms);
3302         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3303 }
3304
3305 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3306 {
3307         struct dlm_lkb *lkb;
3308         struct dlm_rsb *r;
3309         int error;
3310
3311         error = find_lkb(ls, ms->m_remid, &lkb);
3312         if (error)
3313                 goto fail;
3314
3315         r = lkb->lkb_resource;
3316
3317         hold_rsb(r);
3318         lock_rsb(r);
3319
3320         error = validate_message(lkb, ms);
3321         if (error)
3322                 goto out;
3323
3324         receive_flags(lkb, ms);
3325
3326         error = receive_unlock_args(ls, lkb, ms);
3327         if (error) {
3328                 send_unlock_reply(r, lkb, error);
3329                 goto out;
3330         }
3331
3332         error = do_unlock(r, lkb);
3333         send_unlock_reply(r, lkb, error);
3334         do_unlock_effects(r, lkb, error);
3335  out:
3336         unlock_rsb(r);
3337         put_rsb(r);
3338         dlm_put_lkb(lkb);
3339         return;
3340
3341  fail:
3342         setup_stub_lkb(ls, ms);
3343         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3344 }
3345
3346 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3347 {
3348         struct dlm_lkb *lkb;
3349         struct dlm_rsb *r;
3350         int error;
3351
3352         error = find_lkb(ls, ms->m_remid, &lkb);
3353         if (error)
3354                 goto fail;
3355
3356         receive_flags(lkb, ms);
3357
3358         r = lkb->lkb_resource;
3359
3360         hold_rsb(r);
3361         lock_rsb(r);
3362
3363         error = validate_message(lkb, ms);
3364         if (error)
3365                 goto out;
3366
3367         error = do_cancel(r, lkb);
3368         send_cancel_reply(r, lkb, error);
3369         do_cancel_effects(r, lkb, error);
3370  out:
3371         unlock_rsb(r);
3372         put_rsb(r);
3373         dlm_put_lkb(lkb);
3374         return;
3375
3376  fail:
3377         setup_stub_lkb(ls, ms);
3378         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3379 }
3380
3381 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3382 {
3383         struct dlm_lkb *lkb;
3384         struct dlm_rsb *r;
3385         int error;
3386
3387         error = find_lkb(ls, ms->m_remid, &lkb);
3388         if (error) {
3389                 log_debug(ls, "receive_grant from %d no lkb %x",
3390                           ms->m_header.h_nodeid, ms->m_remid);
3391                 return;
3392         }
3393
3394         r = lkb->lkb_resource;
3395
3396         hold_rsb(r);
3397         lock_rsb(r);
3398
3399         error = validate_message(lkb, ms);
3400         if (error)
3401                 goto out;
3402
3403         receive_flags_reply(lkb, ms);
3404         if (is_altmode(lkb))
3405                 munge_altmode(lkb, ms);
3406         grant_lock_pc(r, lkb, ms);
3407         queue_cast(r, lkb, 0);
3408  out:
3409         unlock_rsb(r);
3410         put_rsb(r);
3411         dlm_put_lkb(lkb);
3412 }
3413
3414 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3415 {
3416         struct dlm_lkb *lkb;
3417         struct dlm_rsb *r;
3418         int error;
3419
3420         error = find_lkb(ls, ms->m_remid, &lkb);
3421         if (error) {
3422                 log_debug(ls, "receive_bast from %d no lkb %x",
3423                           ms->m_header.h_nodeid, ms->m_remid);
3424                 return;
3425         }
3426
3427         r = lkb->lkb_resource;
3428
3429         hold_rsb(r);
3430         lock_rsb(r);
3431
3432         error = validate_message(lkb, ms);
3433         if (error)
3434                 goto out;
3435
3436         queue_bast(r, lkb, ms->m_bastmode);
3437  out:
3438         unlock_rsb(r);
3439         put_rsb(r);
3440         dlm_put_lkb(lkb);
3441 }
3442
3443 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3444 {
3445         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3446
3447         from_nodeid = ms->m_header.h_nodeid;
3448         our_nodeid = dlm_our_nodeid();
3449
3450         len = receive_extralen(ms);
3451
3452         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3453         if (dir_nodeid != our_nodeid) {
3454                 log_error(ls, "lookup dir_nodeid %d from %d",
3455                           dir_nodeid, from_nodeid);
3456                 error = -EINVAL;
3457                 ret_nodeid = -1;
3458                 goto out;
3459         }
3460
3461         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3462
3463         /* Optimization: we're master so treat lookup as a request */
3464         if (!error && ret_nodeid == our_nodeid) {
3465                 receive_request(ls, ms);
3466                 return;
3467         }
3468  out:
3469         send_lookup_reply(ls, ms, ret_nodeid, error);
3470 }
3471
3472 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3473 {
3474         int len, dir_nodeid, from_nodeid;
3475
3476         from_nodeid = ms->m_header.h_nodeid;
3477
3478         len = receive_extralen(ms);
3479
3480         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3481         if (dir_nodeid != dlm_our_nodeid()) {
3482                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3483                           dir_nodeid, from_nodeid);
3484                 return;
3485         }
3486
3487         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3488 }
3489
3490 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3491 {
3492         do_purge(ls, ms->m_nodeid, ms->m_pid);
3493 }
3494
3495 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3496 {
3497         struct dlm_lkb *lkb;
3498         struct dlm_rsb *r;
3499         int error, mstype, result;
3500
3501         error = find_lkb(ls, ms->m_remid, &lkb);
3502         if (error) {
3503                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3504                           ms->m_header.h_nodeid, ms->m_remid);
3505                 return;
3506         }
3507
3508         r = lkb->lkb_resource;
3509         hold_rsb(r);
3510         lock_rsb(r);
3511
3512         error = validate_message(lkb, ms);
3513         if (error)
3514                 goto out;
3515
3516         mstype = lkb->lkb_wait_type;
3517         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3518         if (error)
3519                 goto out;
3520
3521         /* Optimization: the dir node was also the master, so it took our
3522            lookup as a request and sent request reply instead of lookup reply */
3523         if (mstype == DLM_MSG_LOOKUP) {
3524                 r->res_nodeid = ms->m_header.h_nodeid;
3525                 lkb->lkb_nodeid = r->res_nodeid;
3526         }
3527
3528         /* this is the value returned from do_request() on the master */
3529         result = ms->m_result;
3530
3531         switch (result) {
3532         case -EAGAIN:
3533                 /* request would block (be queued) on remote master */
3534                 queue_cast(r, lkb, -EAGAIN);
3535                 confirm_master(r, -EAGAIN);
3536                 unhold_lkb(lkb); /* undoes create_lkb() */
3537                 break;
3538
3539         case -EINPROGRESS:
3540         case 0:
3541                 /* request was queued or granted on remote master */
3542                 receive_flags_reply(lkb, ms);
3543                 lkb->lkb_remid = ms->m_lkid;
3544                 if (is_altmode(lkb))
3545                         munge_altmode(lkb, ms);
3546                 if (result) {
3547                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3548                         add_timeout(lkb);
3549                 } else {
3550                         grant_lock_pc(r, lkb, ms);
3551                         queue_cast(r, lkb, 0);
3552                 }
3553                 confirm_master(r, result);
3554                 break;
3555
3556         case -EBADR:
3557         case -ENOTBLK:
3558                 /* find_rsb failed to find rsb or rsb wasn't master */
3559                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3560                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3561                 r->res_nodeid = -1;
3562                 lkb->lkb_nodeid = -1;
3563
3564                 if (is_overlap(lkb)) {
3565                         /* we'll ignore error in cancel/unlock reply */
3566                         queue_cast_overlap(r, lkb);
3567                         confirm_master(r, result);
3568                         unhold_lkb(lkb); /* undoes create_lkb() */
3569                 } else
3570                         _request_lock(r, lkb);
3571                 break;
3572
3573         default:
3574                 log_error(ls, "receive_request_reply %x error %d",
3575                           lkb->lkb_id, result);
3576         }
3577