]> nv-tegra.nvidia Code Review - linux-2.6.git/blob - fs/dlm/lock.c
d0e43a3da887b8d9f291810c4b69820bb76b63b7
[linux-2.6.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         spin_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         spin_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r = NULL, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         spin_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         spin_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980
981         if (dlm_no_directory(r->res_ls))
982                 return;
983
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030
1031         return count;
1032 }
1033
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050         if (is_master_copy(lkb))
1051                 return;
1052
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166
1167 /* lkb is master or local copy */
1168
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185
1186                 if (!r->res_lvbptr)
1187                         return;
1188
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                 if (!r->res_lvbptr)
1208                         return;
1209
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239         if (!r->res_lvbptr)
1240                 return;
1241
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246
1247 /* lkb is process copy (pc) */
1248
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426
1427         return 0;
1428 }
1429
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645         if (err)
1646                 *err = 0;
1647
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757
1758         return max_t(int, high, hi);
1759 }
1760
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774
1775         return high;
1776 }
1777
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                         queue_bast(r, gr, lkb->lkb_rqmode);
1851                         gr->lkb_highbast = lkb->lkb_rqmode;
1852                 }
1853         }
1854 }
1855
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858         send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863         send_bast_queue(r, &r->res_grantqueue, lkb);
1864         send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888         struct dlm_ls *ls = r->res_ls;
1889         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                 r->res_first_lkid = lkb->lkb_id;
1894                 lkb->lkb_nodeid = r->res_nodeid;
1895                 return 0;
1896         }
1897
1898         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                 return 1;
1901         }
1902
1903         if (r->res_nodeid == 0) {
1904                 lkb->lkb_nodeid = 0;
1905                 return 0;
1906         }
1907
1908         if (r->res_nodeid > 0) {
1909                 lkb->lkb_nodeid = r->res_nodeid;
1910                 return 0;
1911         }
1912
1913         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915         dir_nodeid = dlm_dir_nodeid(r);
1916
1917         if (dir_nodeid != our_nodeid) {
1918                 r->res_first_lkid = lkb->lkb_id;
1919                 send_lookup(r, lkb);
1920                 return 1;
1921         }
1922
1923         for (i = 0; i < 2; i++) {
1924                 /* It's possible for dlm_scand to remove an old rsb for
1925                    this same resource from the toss list, us to create
1926                    a new one, look up the master locally, and find it
1927                    already exists just before dlm_scand does the
1928                    dir_remove() on the previous rsb. */
1929
1930                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                        r->res_length, &ret_nodeid);
1932                 if (!error)
1933                         break;
1934                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                 schedule();
1936         }
1937         if (error && error != -EEXIST)
1938                 return error;
1939
1940         if (ret_nodeid == our_nodeid) {
1941                 r->res_first_lkid = 0;
1942                 r->res_nodeid = 0;
1943                 lkb->lkb_nodeid = 0;
1944         } else {
1945                 r->res_first_lkid = lkb->lkb_id;
1946                 r->res_nodeid = ret_nodeid;
1947                 lkb->lkb_nodeid = ret_nodeid;
1948         }
1949         return 0;
1950 }
1951
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954         struct dlm_lkb *lkb, *safe;
1955
1956         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                 list_del_init(&lkb->lkb_rsb_lookup);
1958                 _request_lock(r, lkb);
1959                 schedule();
1960         }
1961 }
1962
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967         struct dlm_lkb *lkb;
1968
1969         if (!r->res_first_lkid)
1970                 return;
1971
1972         switch (error) {
1973         case 0:
1974         case -EINPROGRESS:
1975                 r->res_first_lkid = 0;
1976                 process_lookup_list(r);
1977                 break;
1978
1979         case -EAGAIN:
1980         case -EBADR:
1981         case -ENOTBLK:
1982                 /* the remote request failed and won't be retried (it was
1983                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                    lkb the first_lkid */
1985
1986                 r->res_first_lkid = 0;
1987
1988                 if (!list_empty(&r->res_lookup)) {
1989                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                          lkb_rsb_lookup);
1991                         list_del_init(&lkb->lkb_rsb_lookup);
1992                         r->res_first_lkid = lkb->lkb_id;
1993                         _request_lock(r, lkb);
1994                 }
1995                 break;
1996
1997         default:
1998                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1999         }
2000 }
2001
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                          int namelen, unsigned long timeout_cs,
2004                          void (*ast) (void *astparam),
2005                          void *astparam,
2006                          void (*bast) (void *astparam, int mode),
2007                          struct dlm_args *args)
2008 {
2009         int rv = -EINVAL;
2010
2011         /* check for invalid arg usage */
2012
2013         if (mode < 0 || mode > DLM_LOCK_EX)
2014                 goto out;
2015
2016         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                 goto out;
2018
2019         if (flags & DLM_LKF_CANCEL)
2020                 goto out;
2021
2022         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                 goto out;
2024
2025         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027
2028         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                 goto out;
2030
2031         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                 goto out;
2033
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                 goto out;
2036
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                 goto out;
2039
2040         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                 goto out;
2042
2043         if (!ast || !lksb)
2044                 goto out;
2045
2046         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                 goto out;
2048
2049         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                 goto out;
2051
2052         /* these args will be copied to the lkb in validate_lock_args,
2053            it cannot be done now because when converting locks, fields in
2054            an active lkb cannot be modified before locking the rsb */
2055
2056         args->flags = flags;
2057         args->astfn = ast;
2058         args->astparam = astparam;
2059         args->bastfn = bast;
2060         args->timeout = timeout_cs;
2061         args->mode = mode;
2062         args->lksb = lksb;
2063         rv = 0;
2064  out:
2065         return rv;
2066 }
2067
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                       DLM_LKF_FORCEUNLOCK))
2072                 return -EINVAL;
2073
2074         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                 return -EINVAL;
2076
2077         args->flags = flags;
2078         args->astparam = astarg;
2079         return 0;
2080 }
2081
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                               struct dlm_args *args)
2084 {
2085         int rv = -EINVAL;
2086
2087         if (args->flags & DLM_LKF_CONVERT) {
2088                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                         goto out;
2090
2091                 if (args->flags & DLM_LKF_QUECVT &&
2092                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                         goto out;
2094
2095                 rv = -EBUSY;
2096                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                         goto out;
2098
2099                 if (lkb->lkb_wait_type)
2100                         goto out;
2101
2102                 if (is_overlap(lkb))
2103                         goto out;
2104         }
2105
2106         lkb->lkb_exflags = args->flags;
2107         lkb->lkb_sbflags = 0;
2108         lkb->lkb_astfn = args->astfn;
2109         lkb->lkb_astparam = args->astparam;
2110         lkb->lkb_bastfn = args->bastfn;
2111         lkb->lkb_rqmode = args->mode;
2112         lkb->lkb_lksb = args->lksb;
2113         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114         lkb->lkb_ownpid = (int) current->pid;
2115         lkb->lkb_timeout_cs = args->timeout;
2116         rv = 0;
2117  out:
2118         if (rv)
2119                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                           lkb->lkb_status, lkb->lkb_wait_type,
2122                           lkb->lkb_resource->res_name);
2123         return rv;
2124 }
2125
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136         int rv = -EINVAL;
2137
2138         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                 dlm_print_lkb(lkb);
2141                 goto out;
2142         }
2143
2144         /* an lkb may still exist even though the lock is EOL'ed due to a
2145            cancel, unlock or failed noqueue request; an app can't use these
2146            locks; return same error as if the lkid had not been found at all */
2147
2148         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                 rv = -ENOENT;
2151                 goto out;
2152         }
2153
2154         /* an lkb may be waiting for an rsb lookup to complete where the
2155            lookup was initiated by another lock */
2156
2157         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                         list_del_init(&lkb->lkb_rsb_lookup);
2161                         queue_cast(lkb->lkb_resource, lkb,
2162                                    args->flags & DLM_LKF_CANCEL ?
2163                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2164                         unhold_lkb(lkb); /* undoes create_lkb() */
2165                 }
2166                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                 rv = -EBUSY;
2168                 goto out;
2169         }
2170
2171         /* cancel not allowed with another cancel/unlock in progress */
2172
2173         if (args->flags & DLM_LKF_CANCEL) {
2174                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                         goto out;
2176
2177                 if (is_overlap(lkb))
2178                         goto out;
2179
2180                 /* don't let scand try to do a cancel */
2181                 del_timeout(lkb);
2182
2183                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                         rv = -EBUSY;
2186                         goto out;
2187                 }
2188
2189                 /* there's nothing to cancel */
2190                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                     !lkb->lkb_wait_type) {
2192                         rv = -EBUSY;
2193                         goto out;
2194                 }
2195
2196                 switch (lkb->lkb_wait_type) {
2197                 case DLM_MSG_LOOKUP:
2198                 case DLM_MSG_REQUEST:
2199                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                         rv = -EBUSY;
2201                         goto out;
2202                 case DLM_MSG_UNLOCK:
2203                 case DLM_MSG_CANCEL:
2204                         goto out;
2205                 }
2206                 /* add_to_waiters() will set OVERLAP_CANCEL */
2207                 goto out_ok;
2208         }
2209
2210         /* do we need to allow a force-unlock if there's a normal unlock
2211            already in progress?  in what conditions could the normal unlock
2212            fail such that we'd want to send a force-unlock to be sure? */
2213
2214         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                         goto out;
2217
2218                 if (is_overlap_unlock(lkb))
2219                         goto out;
2220
2221                 /* don't let scand try to do a cancel */
2222                 del_timeout(lkb);
2223
2224                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                         rv = -EBUSY;
2227                         goto out;
2228                 }
2229
2230                 switch (lkb->lkb_wait_type) {
2231                 case DLM_MSG_LOOKUP:
2232                 case DLM_MSG_REQUEST:
2233                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                         rv = -EBUSY;
2235                         goto out;
2236                 case DLM_MSG_UNLOCK:
2237                         goto out;
2238                 }
2239                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                 goto out_ok;
2241         }
2242
2243         /* normal unlock not allowed if there's any op in progress */
2244         rv = -EBUSY;
2245         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                 goto out;
2247
2248  out_ok:
2249         /* an overlapping op shouldn't blow away exflags from other op */
2250         lkb->lkb_exflags |= args->flags;
2251         lkb->lkb_sbflags = 0;
2252         lkb->lkb_astparam = args->astparam;
2253         rv = 0;
2254  out:
2255         if (rv)
2256                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                           args->flags, lkb->lkb_wait_type,
2259                           lkb->lkb_resource->res_name);
2260         return rv;
2261 }
2262
2263 /*
2264  * Four stage 4 varieties:
2265  * do_request(), do_convert(), do_unlock(), do_cancel()
2266  * These are called on the master node for the given lock and
2267  * from the central locking logic.
2268  */
2269
2270 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271 {
2272         int error = 0;
2273
2274         if (can_be_granted(r, lkb, 1, NULL)) {
2275                 grant_lock(r, lkb);
2276                 queue_cast(r, lkb, 0);
2277                 goto out;
2278         }
2279
2280         if (can_be_queued(lkb)) {
2281                 error = -EINPROGRESS;
2282                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                 add_timeout(lkb);
2284                 goto out;
2285         }
2286
2287         error = -EAGAIN;
2288         queue_cast(r, lkb, -EAGAIN);
2289  out:
2290         return error;
2291 }
2292
2293 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2294                                int error)
2295 {
2296         switch (error) {
2297         case -EAGAIN:
2298                 if (force_blocking_asts(lkb))
2299                         send_blocking_asts_all(r, lkb);
2300                 break;
2301         case -EINPROGRESS:
2302                 send_blocking_asts(r, lkb);
2303                 break;
2304         }
2305 }
2306
2307 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2308 {
2309         int error = 0;
2310         int deadlk = 0;
2311
2312         /* changing an existing lock may allow others to be granted */
2313
2314         if (can_be_granted(r, lkb, 1, &deadlk)) {
2315                 grant_lock(r, lkb);
2316                 queue_cast(r, lkb, 0);
2317                 goto out;
2318         }
2319
2320         /* can_be_granted() detected that this lock would block in a conversion
2321            deadlock, so we leave it on the granted queue and return EDEADLK in
2322            the ast for the convert. */
2323
2324         if (deadlk) {
2325                 /* it's left on the granted queue */
2326                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2327                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2328                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2329                 revert_lock(r, lkb);
2330                 queue_cast(r, lkb, -EDEADLK);
2331                 error = -EDEADLK;
2332                 goto out;
2333         }
2334
2335         /* is_demoted() means the can_be_granted() above set the grmode
2336            to NL, and left us on the granted queue.  This auto-demotion
2337            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2338            now grantable.  We have to try to grant other converting locks
2339            before we try again to grant this one. */
2340
2341         if (is_demoted(lkb)) {
2342                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2343                 if (_can_be_granted(r, lkb, 1)) {
2344                         grant_lock(r, lkb);
2345                         queue_cast(r, lkb, 0);
2346                         goto out;
2347                 }
2348                 /* else fall through and move to convert queue */
2349         }
2350
2351         if (can_be_queued(lkb)) {
2352                 error = -EINPROGRESS;
2353                 del_lkb(r, lkb);
2354                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2355                 add_timeout(lkb);
2356                 goto out;
2357         }
2358
2359         error = -EAGAIN;
2360         queue_cast(r, lkb, -EAGAIN);
2361  out:
2362         return error;
2363 }
2364
2365 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2366                                int error)
2367 {
2368         switch (error) {
2369         case 0:
2370                 grant_pending_locks(r);
2371                 /* grant_pending_locks also sends basts */
2372                 break;
2373         case -EAGAIN:
2374                 if (force_blocking_asts(lkb))
2375                         send_blocking_asts_all(r, lkb);
2376                 break;
2377         case -EINPROGRESS:
2378                 send_blocking_asts(r, lkb);
2379                 break;
2380         }
2381 }
2382
2383 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2384 {
2385         remove_lock(r, lkb);
2386         queue_cast(r, lkb, -DLM_EUNLOCK);
2387         return -DLM_EUNLOCK;
2388 }
2389
2390 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2391                               int error)
2392 {
2393         grant_pending_locks(r);
2394 }
2395
2396 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2397  
2398 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2399 {
2400         int error;
2401
2402         error = revert_lock(r, lkb);
2403         if (error) {
2404                 queue_cast(r, lkb, -DLM_ECANCEL);
2405                 return -DLM_ECANCEL;
2406         }
2407         return 0;
2408 }
2409
2410 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2411                               int error)
2412 {
2413         if (error)
2414                 grant_pending_locks(r);
2415 }
2416
2417 /*
2418  * Four stage 3 varieties:
2419  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2420  */
2421
2422 /* add a new lkb to a possibly new rsb, called by requesting process */
2423
2424 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2425 {
2426         int error;
2427
2428         /* set_master: sets lkb nodeid from r */
2429
2430         error = set_master(r, lkb);
2431         if (error < 0)
2432                 goto out;
2433         if (error) {
2434                 error = 0;
2435                 goto out;
2436         }
2437
2438         if (is_remote(r)) {
2439                 /* receive_request() calls do_request() on remote node */
2440                 error = send_request(r, lkb);
2441         } else {
2442                 error = do_request(r, lkb);
2443                 /* for remote locks the request_reply is sent
2444                    between do_request and do_request_effects */
2445                 do_request_effects(r, lkb, error);
2446         }
2447  out:
2448         return error;
2449 }
2450
2451 /* change some property of an existing lkb, e.g. mode */
2452
2453 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454 {
2455         int error;
2456
2457         if (is_remote(r)) {
2458                 /* receive_convert() calls do_convert() on remote node */
2459                 error = send_convert(r, lkb);
2460         } else {
2461                 error = do_convert(r, lkb);
2462                 /* for remote locks the convert_reply is sent
2463                    between do_convert and do_convert_effects */
2464                 do_convert_effects(r, lkb, error);
2465         }
2466
2467         return error;
2468 }
2469
2470 /* remove an existing lkb from the granted queue */
2471
2472 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2473 {
2474         int error;
2475
2476         if (is_remote(r)) {
2477                 /* receive_unlock() calls do_unlock() on remote node */
2478                 error = send_unlock(r, lkb);
2479         } else {
2480                 error = do_unlock(r, lkb);
2481                 /* for remote locks the unlock_reply is sent
2482                    between do_unlock and do_unlock_effects */
2483                 do_unlock_effects(r, lkb, error);
2484         }
2485
2486         return error;
2487 }
2488
2489 /* remove an existing lkb from the convert or wait queue */
2490
2491 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2492 {
2493         int error;
2494
2495         if (is_remote(r)) {
2496                 /* receive_cancel() calls do_cancel() on remote node */
2497                 error = send_cancel(r, lkb);
2498         } else {
2499                 error = do_cancel(r, lkb);
2500                 /* for remote locks the cancel_reply is sent
2501                    between do_cancel and do_cancel_effects */
2502                 do_cancel_effects(r, lkb, error);
2503         }
2504
2505         return error;
2506 }
2507
2508 /*
2509  * Four stage 2 varieties:
2510  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2511  */
2512
2513 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2514                         int len, struct dlm_args *args)
2515 {
2516         struct dlm_rsb *r;
2517         int error;
2518
2519         error = validate_lock_args(ls, lkb, args);
2520         if (error)
2521                 goto out;
2522
2523         error = find_rsb(ls, name, len, R_CREATE, &r);
2524         if (error)
2525                 goto out;
2526
2527         lock_rsb(r);
2528
2529         attach_lkb(r, lkb);
2530         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2531
2532         error = _request_lock(r, lkb);
2533
2534         unlock_rsb(r);
2535         put_rsb(r);
2536
2537  out:
2538         return error;
2539 }
2540
2541 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2542                         struct dlm_args *args)
2543 {
2544         struct dlm_rsb *r;
2545         int error;
2546
2547         r = lkb->lkb_resource;
2548
2549         hold_rsb(r);
2550         lock_rsb(r);
2551
2552         error = validate_lock_args(ls, lkb, args);
2553         if (error)
2554                 goto out;
2555
2556         error = _convert_lock(r, lkb);
2557  out:
2558         unlock_rsb(r);
2559         put_rsb(r);
2560         return error;
2561 }
2562
2563 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2564                        struct dlm_args *args)
2565 {
2566         struct dlm_rsb *r;
2567         int error;
2568
2569         r = lkb->lkb_resource;
2570
2571         hold_rsb(r);
2572         lock_rsb(r);
2573
2574         error = validate_unlock_args(lkb, args);
2575         if (error)
2576                 goto out;
2577
2578         error = _unlock_lock(r, lkb);
2579  out:
2580         unlock_rsb(r);
2581         put_rsb(r);
2582         return error;
2583 }
2584
2585 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2586                        struct dlm_args *args)
2587 {
2588         struct dlm_rsb *r;
2589         int error;
2590
2591         r = lkb->lkb_resource;
2592
2593         hold_rsb(r);
2594         lock_rsb(r);
2595
2596         error = validate_unlock_args(lkb, args);
2597         if (error)
2598                 goto out;
2599
2600         error = _cancel_lock(r, lkb);
2601  out:
2602         unlock_rsb(r);
2603         put_rsb(r);
2604         return error;
2605 }
2606
2607 /*
2608  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2609  */
2610
2611 int dlm_lock(dlm_lockspace_t *lockspace,
2612              int mode,
2613              struct dlm_lksb *lksb,
2614              uint32_t flags,
2615              void *name,
2616              unsigned int namelen,
2617              uint32_t parent_lkid,
2618              void (*ast) (void *astarg),
2619              void *astarg,
2620              void (*bast) (void *astarg, int mode))
2621 {
2622         struct dlm_ls *ls;
2623         struct dlm_lkb *lkb;
2624         struct dlm_args args;
2625         int error, convert = flags & DLM_LKF_CONVERT;
2626
2627         ls = dlm_find_lockspace_local(lockspace);
2628         if (!ls)
2629                 return -EINVAL;
2630
2631         dlm_lock_recovery(ls);
2632
2633         if (convert)
2634                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2635         else
2636                 error = create_lkb(ls, &lkb);
2637
2638         if (error)
2639                 goto out;
2640
2641         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2642                               astarg, bast, &args);
2643         if (error)
2644                 goto out_put;
2645
2646         if (convert)
2647                 error = convert_lock(ls, lkb, &args);
2648         else
2649                 error = request_lock(ls, lkb, name, namelen, &args);
2650
2651         if (error == -EINPROGRESS)
2652                 error = 0;
2653  out_put:
2654         if (convert || error)
2655                 __put_lkb(ls, lkb);
2656         if (error == -EAGAIN || error == -EDEADLK)
2657                 error = 0;
2658  out:
2659         dlm_unlock_recovery(ls);
2660         dlm_put_lockspace(ls);
2661         return error;
2662 }
2663
2664 int dlm_unlock(dlm_lockspace_t *lockspace,
2665                uint32_t lkid,
2666                uint32_t flags,
2667                struct dlm_lksb *lksb,
2668                void *astarg)
2669 {
2670         struct dlm_ls *ls;
2671         struct dlm_lkb *lkb;
2672         struct dlm_args args;
2673         int error;
2674
2675         ls = dlm_find_lockspace_local(lockspace);
2676         if (!ls)
2677                 return -EINVAL;
2678
2679         dlm_lock_recovery(ls);
2680
2681         error = find_lkb(ls, lkid, &lkb);
2682         if (error)
2683                 goto out;
2684
2685         error = set_unlock_args(flags, astarg, &args);
2686         if (error)
2687                 goto out_put;
2688
2689         if (flags & DLM_LKF_CANCEL)
2690                 error = cancel_lock(ls, lkb, &args);
2691         else
2692                 error = unlock_lock(ls, lkb, &args);
2693
2694         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2695                 error = 0;
2696         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2697                 error = 0;
2698  out_put:
2699         dlm_put_lkb(lkb);
2700  out:
2701         dlm_unlock_recovery(ls);
2702         dlm_put_lockspace(ls);
2703         return error;
2704 }
2705
2706 /*
2707  * send/receive routines for remote operations and replies
2708  *
2709  * send_args
2710  * send_common
2711  * send_request                 receive_request
2712  * send_convert                 receive_convert
2713  * send_unlock                  receive_unlock
2714  * send_cancel                  receive_cancel
2715  * send_grant                   receive_grant
2716  * send_bast                    receive_bast
2717  * send_lookup                  receive_lookup
2718  * send_remove                  receive_remove
2719  *
2720  *                              send_common_reply
2721  * receive_request_reply        send_request_reply
2722  * receive_convert_reply        send_convert_reply
2723  * receive_unlock_reply         send_unlock_reply
2724  * receive_cancel_reply         send_cancel_reply
2725  * receive_lookup_reply         send_lookup_reply
2726  */
2727
2728 static int _create_message(struct dlm_ls *ls, int mb_len,
2729                            int to_nodeid, int mstype,
2730                            struct dlm_message **ms_ret,
2731                            struct dlm_mhandle **mh_ret)
2732 {
2733         struct dlm_message *ms;
2734         struct dlm_mhandle *mh;
2735         char *mb;
2736
2737         /* get_buffer gives us a message handle (mh) that we need to
2738            pass into lowcomms_commit and a message buffer (mb) that we
2739            write our data into */
2740
2741         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2742         if (!mh)
2743                 return -ENOBUFS;
2744
2745         memset(mb, 0, mb_len);
2746
2747         ms = (struct dlm_message *) mb;
2748
2749         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2750         ms->m_header.h_lockspace = ls->ls_global_id;
2751         ms->m_header.h_nodeid = dlm_our_nodeid();
2752         ms->m_header.h_length = mb_len;
2753         ms->m_header.h_cmd = DLM_MSG;
2754
2755         ms->m_type = mstype;
2756
2757         *mh_ret = mh;
2758         *ms_ret = ms;
2759         return 0;
2760 }
2761
2762 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2763                           int to_nodeid, int mstype,
2764                           struct dlm_message **ms_ret,
2765                           struct dlm_mhandle **mh_ret)
2766 {
2767         int mb_len = sizeof(struct dlm_message);
2768
2769         switch (mstype) {
2770         case DLM_MSG_REQUEST:
2771         case DLM_MSG_LOOKUP:
2772         case DLM_MSG_REMOVE:
2773                 mb_len += r->res_length;
2774                 break;
2775         case DLM_MSG_CONVERT:
2776         case DLM_MSG_UNLOCK:
2777         case DLM_MSG_REQUEST_REPLY:
2778         case DLM_MSG_CONVERT_REPLY:
2779         case DLM_MSG_GRANT:
2780                 if (lkb && lkb->lkb_lvbptr)
2781                         mb_len += r->res_ls->ls_lvblen;
2782                 break;
2783         }
2784
2785         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2786                                ms_ret, mh_ret);
2787 }
2788
2789 /* further lowcomms enhancements or alternate implementations may make
2790    the return value from this function useful at some point */
2791
2792 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2793 {
2794         dlm_message_out(ms);
2795         dlm_lowcomms_commit_buffer(mh);
2796         return 0;
2797 }
2798
2799 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2800                       struct dlm_message *ms)
2801 {
2802         ms->m_nodeid   = lkb->lkb_nodeid;
2803         ms->m_pid      = lkb->lkb_ownpid;
2804         ms->m_lkid     = lkb->lkb_id;
2805         ms->m_remid    = lkb->lkb_remid;
2806         ms->m_exflags  = lkb->lkb_exflags;
2807         ms->m_sbflags  = lkb->lkb_sbflags;
2808         ms->m_flags    = lkb->lkb_flags;
2809         ms->m_lvbseq   = lkb->lkb_lvbseq;
2810         ms->m_status   = lkb->lkb_status;
2811         ms->m_grmode   = lkb->lkb_grmode;
2812         ms->m_rqmode   = lkb->lkb_rqmode;
2813         ms->m_hash     = r->res_hash;
2814
2815         /* m_result and m_bastmode are set from function args,
2816            not from lkb fields */
2817
2818         if (lkb->lkb_bastfn)
2819                 ms->m_asts |= AST_BAST;
2820         if (lkb->lkb_astfn)
2821                 ms->m_asts |= AST_COMP;
2822
2823         /* compare with switch in create_message; send_remove() doesn't
2824            use send_args() */
2825
2826         switch (ms->m_type) {
2827         case DLM_MSG_REQUEST:
2828         case DLM_MSG_LOOKUP:
2829                 memcpy(ms->m_extra, r->res_name, r->res_length);
2830                 break;
2831         case DLM_MSG_CONVERT:
2832         case DLM_MSG_UNLOCK:
2833         case DLM_MSG_REQUEST_REPLY:
2834         case DLM_MSG_CONVERT_REPLY:
2835         case DLM_MSG_GRANT:
2836                 if (!lkb->lkb_lvbptr)
2837                         break;
2838                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2839                 break;
2840         }
2841 }
2842
2843 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2844 {
2845         struct dlm_message *ms;
2846         struct dlm_mhandle *mh;
2847         int to_nodeid, error;
2848
2849         error = add_to_waiters(lkb, mstype);
2850         if (error)
2851                 return error;
2852
2853         to_nodeid = r->res_nodeid;
2854
2855         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2856         if (error)
2857                 goto fail;
2858
2859         send_args(r, lkb, ms);
2860
2861         error = send_message(mh, ms);
2862         if (error)
2863                 goto fail;
2864         return 0;
2865
2866  fail:
2867         remove_from_waiters(lkb, msg_reply_type(mstype));
2868         return error;
2869 }
2870
2871 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2872 {
2873         return send_common(r, lkb, DLM_MSG_REQUEST);
2874 }
2875
2876 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2877 {
2878         int error;
2879
2880         error = send_common(r, lkb, DLM_MSG_CONVERT);
2881
2882         /* down conversions go without a reply from the master */
2883         if (!error && down_conversion(lkb)) {
2884                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2885                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2886                 r->res_ls->ls_stub_ms.m_result = 0;
2887                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2888                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2889         }
2890
2891         return error;
2892 }
2893
2894 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2895    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2896    that the master is still correct. */
2897
2898 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2899 {
2900         return send_common(r, lkb, DLM_MSG_UNLOCK);
2901 }
2902
2903 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2904 {
2905         return send_common(r, lkb, DLM_MSG_CANCEL);
2906 }
2907
2908 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2909 {
2910         struct dlm_message *ms;
2911         struct dlm_mhandle *mh;
2912         int to_nodeid, error;
2913
2914         to_nodeid = lkb->lkb_nodeid;
2915
2916         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2917         if (error)
2918                 goto out;
2919
2920         send_args(r, lkb, ms);
2921
2922         ms->m_result = 0;
2923
2924         error = send_message(mh, ms);
2925  out:
2926         return error;
2927 }
2928
2929 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2930 {
2931         struct dlm_message *ms;
2932         struct dlm_mhandle *mh;
2933         int to_nodeid, error;
2934
2935         to_nodeid = lkb->lkb_nodeid;
2936
2937         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2938         if (error)
2939                 goto out;
2940
2941         send_args(r, lkb, ms);
2942
2943         ms->m_bastmode = mode;
2944
2945         error = send_message(mh, ms);
2946  out:
2947         return error;
2948 }
2949
2950 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2951 {
2952         struct dlm_message *ms;
2953         struct dlm_mhandle *mh;
2954         int to_nodeid, error;
2955
2956         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2957         if (error)
2958                 return error;
2959
2960         to_nodeid = dlm_dir_nodeid(r);
2961
2962         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2963         if (error)
2964                 goto fail;
2965
2966         send_args(r, lkb, ms);
2967
2968         error = send_message(mh, ms);
2969         if (error)
2970                 goto fail;
2971         return 0;
2972
2973  fail:
2974         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2975         return error;
2976 }
2977
2978 static int send_remove(struct dlm_rsb *r)
2979 {
2980         struct dlm_message *ms;
2981         struct dlm_mhandle *mh;
2982         int to_nodeid, error;
2983
2984         to_nodeid = dlm_dir_nodeid(r);
2985
2986         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2987         if (error)
2988                 goto out;
2989
2990         memcpy(ms->m_extra, r->res_name, r->res_length);
2991         ms->m_hash = r->res_hash;
2992
2993         error = send_message(mh, ms);
2994  out:
2995         return error;
2996 }
2997
2998 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2999                              int mstype, int rv)
3000 {
3001         struct dlm_message *ms;
3002         struct dlm_mhandle *mh;
3003         int to_nodeid, error;
3004
3005         to_nodeid = lkb->lkb_nodeid;
3006
3007         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3008         if (error)
3009                 goto out;
3010
3011         send_args(r, lkb, ms);
3012
3013         ms->m_result = rv;
3014
3015         error = send_message(mh, ms);
3016  out:
3017         return error;
3018 }
3019
3020 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3021 {
3022         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3023 }
3024
3025 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3026 {
3027         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3028 }
3029
3030 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3031 {
3032         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3033 }
3034
3035 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3036 {
3037         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3038 }
3039
3040 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3041                              int ret_nodeid, int rv)
3042 {
3043         struct dlm_rsb *r = &ls->ls_stub_rsb;
3044         struct dlm_message *ms;
3045         struct dlm_mhandle *mh;
3046         int error, nodeid = ms_in->m_header.h_nodeid;
3047
3048         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3049         if (error)
3050                 goto out;
3051
3052         ms->m_lkid = ms_in->m_lkid;
3053         ms->m_result = rv;
3054         ms->m_nodeid = ret_nodeid;
3055
3056         error = send_message(mh, ms);
3057  out:
3058         return error;
3059 }
3060
3061 /* which args we save from a received message depends heavily on the type
3062    of message, unlike the send side where we can safely send everything about
3063    the lkb for any type of message */
3064
3065 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3066 {
3067         lkb->lkb_exflags = ms->m_exflags;
3068         lkb->lkb_sbflags = ms->m_sbflags;
3069         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3070                          (ms->m_flags & 0x0000FFFF);
3071 }
3072
3073 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3074 {
3075         lkb->lkb_sbflags = ms->m_sbflags;
3076         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3077                          (ms->m_flags & 0x0000FFFF);
3078 }
3079
3080 static int receive_extralen(struct dlm_message *ms)
3081 {
3082         return (ms->m_header.h_length - sizeof(struct dlm_message));
3083 }
3084
3085 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3086                        struct dlm_message *ms)
3087 {
3088         int len;
3089
3090         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3091                 if (!lkb->lkb_lvbptr)
3092                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3093                 if (!lkb->lkb_lvbptr)
3094                         return -ENOMEM;
3095                 len = receive_extralen(ms);
3096                 if (len > DLM_RESNAME_MAXLEN)
3097                         len = DLM_RESNAME_MAXLEN;
3098                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3099         }
3100         return 0;
3101 }
3102
3103 static void fake_bastfn(void *astparam, int mode)
3104 {
3105         log_print("fake_bastfn should not be called");
3106 }
3107
3108 static void fake_astfn(void *astparam)
3109 {
3110         log_print("fake_astfn should not be called");
3111 }
3112
3113 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3114                                 struct dlm_message *ms)
3115 {
3116         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3117         lkb->lkb_ownpid = ms->m_pid;
3118         lkb->lkb_remid = ms->m_lkid;
3119         lkb->lkb_grmode = DLM_LOCK_IV;
3120         lkb->lkb_rqmode = ms->m_rqmode;
3121
3122         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3123         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3124
3125         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3126                 /* lkb was just created so there won't be an lvb yet */
3127                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3128                 if (!lkb->lkb_lvbptr)
3129                         return -ENOMEM;
3130         }
3131
3132         return 0;
3133 }
3134
3135 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3136                                 struct dlm_message *ms)
3137 {
3138         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3139                 return -EBUSY;
3140
3141         if (receive_lvb(ls, lkb, ms))
3142                 return -ENOMEM;
3143
3144         lkb->lkb_rqmode = ms->m_rqmode;
3145         lkb->lkb_lvbseq = ms->m_lvbseq;
3146
3147         return 0;
3148 }
3149
3150 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3151                                struct dlm_message *ms)
3152 {
3153         if (receive_lvb(ls, lkb, ms))
3154                 return -ENOMEM;
3155         return 0;
3156 }
3157
3158 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3159    uses to send a reply and that the remote end uses to process the reply. */
3160
3161 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3162 {
3163         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3164         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3165         lkb->lkb_remid = ms->m_lkid;
3166 }
3167
3168 /* This is called after the rsb is locked so that we can safely inspect
3169    fields in the lkb. */
3170
3171 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3172 {
3173         int from = ms->m_header.h_nodeid;
3174         int error = 0;
3175
3176         switch (ms->m_type) {
3177         case DLM_MSG_CONVERT:
3178         case DLM_MSG_UNLOCK:
3179         case DLM_MSG_CANCEL:
3180                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3181                         error = -EINVAL;
3182                 break;
3183
3184         case DLM_MSG_CONVERT_REPLY:
3185         case DLM_MSG_UNLOCK_REPLY:
3186         case DLM_MSG_CANCEL_REPLY:
3187         case DLM_MSG_GRANT:
3188         case DLM_MSG_BAST:
3189                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3190                         error = -EINVAL;
3191                 break;
3192
3193         case DLM_MSG_REQUEST_REPLY:
3194                 if (!is_process_copy(lkb))
3195                         error = -EINVAL;
3196                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3197                         error = -EINVAL;
3198                 break;
3199
3200         default:
3201                 error = -EINVAL;
3202         }
3203
3204         if (error)
3205                 log_error(lkb->lkb_resource->res_ls,
3206                           "ignore invalid message %d from %d %x %x %x %d",
3207                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3208                           lkb->lkb_flags, lkb->lkb_nodeid);
3209         return error;
3210 }
3211
3212 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3213 {
3214         struct dlm_lkb *lkb;
3215         struct dlm_rsb *r;
3216         int error, namelen;
3217
3218         error = create_lkb(ls, &lkb);
3219         if (error)
3220                 goto fail;
3221
3222         receive_flags(lkb, ms);
3223         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3224         error = receive_request_args(ls, lkb, ms);
3225         if (error) {
3226                 __put_lkb(ls, lkb);
3227                 goto fail;
3228         }
3229
3230         namelen = receive_extralen(ms);
3231
3232         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3233         if (error) {
3234                 __put_lkb(ls, lkb);
3235                 goto fail;
3236         }
3237
3238         lock_rsb(r);
3239
3240         attach_lkb(r, lkb);
3241         error = do_request(r, lkb);
3242         send_request_reply(r, lkb, error);
3243         do_request_effects(r, lkb, error);
3244
3245         unlock_rsb(r);
3246         put_rsb(r);
3247
3248         if (error == -EINPROGRESS)
3249                 error = 0;
3250         if (error)
3251                 dlm_put_lkb(lkb);
3252         return;
3253
3254  fail:
3255         setup_stub_lkb(ls, ms);
3256         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3257 }
3258
3259 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3260 {
3261         struct dlm_lkb *lkb;
3262         struct dlm_rsb *r;
3263         int error, reply = 1;
3264
3265         error = find_lkb(ls, ms->m_remid, &lkb);
3266         if (error)
3267                 goto fail;
3268
3269         r = lkb->lkb_resource;
3270
3271         hold_rsb(r);
3272         lock_rsb(r);
3273
3274         error = validate_message(lkb, ms);
3275         if (error)
3276                 goto out;
3277
3278         receive_flags(lkb, ms);
3279
3280         error = receive_convert_args(ls, lkb, ms);
3281         if (error) {
3282                 send_convert_reply(r, lkb, error);
3283                 goto out;
3284         }
3285
3286         reply = !down_conversion(lkb);
3287
3288         error = do_convert(r, lkb);
3289         if (reply)
3290                 send_convert_reply(r, lkb, error);
3291         do_convert_effects(r, lkb, error);
3292  out:
3293         unlock_rsb(r);
3294         put_rsb(r);
3295         dlm_put_lkb(lkb);
3296         return;
3297
3298  fail:
3299         setup_stub_lkb(ls, ms);
3300         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3301 }
3302
3303 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3304 {
3305         struct dlm_lkb *lkb;
3306         struct dlm_rsb *r;
3307         int error;
3308
3309         error = find_lkb(ls, ms->m_remid, &lkb);
3310         if (error)
3311                 goto fail;
3312
3313         r = lkb->lkb_resource;
3314
3315         hold_rsb(r);
3316         lock_rsb(r);
3317
3318         error = validate_message(lkb, ms);
3319         if (error)
3320                 goto out;
3321
3322         receive_flags(lkb, ms);
3323
3324         error = receive_unlock_args(ls, lkb, ms);
3325         if (error) {
3326                 send_unlock_reply(r, lkb, error);
3327                 goto out;
3328         }
3329
3330         error = do_unlock(r, lkb);
3331         send_unlock_reply(r, lkb, error);
3332         do_unlock_effects(r, lkb, error);
3333  out:
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         dlm_put_lkb(lkb);
3337         return;
3338
3339  fail:
3340         setup_stub_lkb(ls, ms);
3341         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3342 }
3343
3344 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3345 {
3346         struct dlm_lkb *lkb;
3347         struct dlm_rsb *r;
3348         int error;
3349
3350         error = find_lkb(ls, ms->m_remid, &lkb);
3351         if (error)
3352                 goto fail;
3353
3354         receive_flags(lkb, ms);
3355
3356         r = lkb->lkb_resource;
3357
3358         hold_rsb(r);
3359         lock_rsb(r);
3360
3361         error = validate_message(lkb, ms);
3362         if (error)
3363                 goto out;
3364
3365         error = do_cancel(r, lkb);
3366         send_cancel_reply(r, lkb, error);
3367         do_cancel_effects(r, lkb, error);
3368  out:
3369         unlock_rsb(r);
3370         put_rsb(r);
3371         dlm_put_lkb(lkb);
3372         return;
3373
3374  fail:
3375         setup_stub_lkb(ls, ms);
3376         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3377 }
3378
3379 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3380 {
3381         struct dlm_lkb *lkb;
3382         struct dlm_rsb *r;
3383         int error;
3384
3385         error = find_lkb(ls, ms->m_remid, &lkb);
3386         if (error) {
3387                 log_debug(ls, "receive_grant from %d no lkb %x",
3388                           ms->m_header.h_nodeid, ms->m_remid);
3389                 return;
3390         }
3391
3392         r = lkb->lkb_resource;
3393
3394         hold_rsb(r);
3395         lock_rsb(r);
3396
3397         error = validate_message(lkb, ms);
3398         if (error)
3399                 goto out;
3400
3401         receive_flags_reply(lkb, ms);
3402         if (is_altmode(lkb))
3403                 munge_altmode(lkb, ms);
3404         grant_lock_pc(r, lkb, ms);
3405         queue_cast(r, lkb, 0);
3406  out:
3407         unlock_rsb(r);
3408         put_rsb(r);
3409         dlm_put_lkb(lkb);
3410 }
3411
3412 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3413 {
3414         struct dlm_lkb *lkb;
3415         struct dlm_rsb *r;
3416         int error;
3417
3418         error = find_lkb(ls, ms->m_remid, &lkb);
3419         if (error) {
3420                 log_debug(ls, "receive_bast from %d no lkb %x",
3421                           ms->m_header.h_nodeid, ms->m_remid);
3422                 return;
3423         }
3424
3425         r = lkb->lkb_resource;
3426
3427         hold_rsb(r);
3428         lock_rsb(r);
3429
3430         error = validate_message(lkb, ms);
3431         if (error)
3432                 goto out;
3433
3434         queue_bast(r, lkb, ms->m_bastmode);
3435  out:
3436         unlock_rsb(r);
3437         put_rsb(r);
3438         dlm_put_lkb(lkb);
3439 }
3440
3441 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3442 {
3443         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3444
3445         from_nodeid = ms->m_header.h_nodeid;
3446         our_nodeid = dlm_our_nodeid();
3447
3448         len = receive_extralen(ms);
3449
3450         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3451         if (dir_nodeid != our_nodeid) {
3452                 log_error(ls, "lookup dir_nodeid %d from %d",
3453                           dir_nodeid, from_nodeid);
3454                 error = -EINVAL;
3455                 ret_nodeid = -1;
3456                 goto out;
3457         }
3458
3459         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3460
3461         /* Optimization: we're master so treat lookup as a request */
3462         if (!error && ret_nodeid == our_nodeid) {
3463                 receive_request(ls, ms);
3464                 return;
3465         }
3466  out:
3467         send_lookup_reply(ls, ms, ret_nodeid, error);
3468 }
3469
3470 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3471 {
3472         int len, dir_nodeid, from_nodeid;
3473
3474         from_nodeid = ms->m_header.h_nodeid;
3475
3476         len = receive_extralen(ms);
3477
3478         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3479         if (dir_nodeid != dlm_our_nodeid()) {
3480                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3481                           dir_nodeid, from_nodeid);
3482                 return;
3483         }
3484
3485         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3486 }
3487
3488 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3489 {
3490         do_purge(ls, ms->m_nodeid, ms->m_pid);
3491 }
3492
3493 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3494 {
3495         struct dlm_lkb *lkb;
3496         struct dlm_rsb *r;
3497         int error, mstype, result;
3498
3499         error = find_lkb(ls, ms->m_remid, &lkb);
3500         if (error) {
3501                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3502                           ms->m_header.h_nodeid, ms->m_remid);
3503                 return;
3504         }
3505
3506         r = lkb->lkb_resource;
3507         hold_rsb(r);
3508         lock_rsb(r);
3509
3510         error = validate_message(lkb, ms);
3511         if (error)
3512                 goto out;
3513
3514         mstype = lkb->lkb_wait_type;
3515         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3516         if (error)
3517                 goto out;
3518
3519         /* Optimization: the dir node was also the master, so it took our
3520            lookup as a request and sent request reply instead of lookup reply */
3521         if (mstype == DLM_MSG_LOOKUP) {
3522                 r->res_nodeid = ms->m_header.h_nodeid;
3523                 lkb->lkb_nodeid = r->res_nodeid;
3524         }
3525
3526         /* this is the value returned from do_request() on the master */
3527         result = ms->m_result;
3528
3529         switch (result) {
3530         case -EAGAIN:
3531                 /* request would block (be queued) on remote master */
3532                 queue_cast(r, lkb, -EAGAIN);
3533                 confirm_master(r, -EAGAIN);
3534                 unhold_lkb(lkb); /* undoes create_lkb() */
3535                 break;
3536
3537         case -EINPROGRESS:
3538         case 0:
3539                 /* request was queued or granted on remote master */
3540                 receive_flags_reply(lkb, ms);
3541                 lkb->lkb_remid = ms->m_lkid;
3542                 if (is_altmode(lkb))
3543                         munge_altmode(lkb, ms);
3544                 if (result) {
3545                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3546                         add_timeout(lkb);
3547                 } else {
3548                         grant_lock_pc(r, lkb, ms);
3549                         queue_cast(r, lkb, 0);
3550                 }
3551                 confirm_master(r, result);
3552                 break;
3553
3554         case -EBADR:
3555         case -ENOTBLK:
3556                 /* find_rsb failed to find rsb or rsb wasn't master */
3557                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3558                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3559                 r->res_nodeid = -1;
3560                 lkb->lkb_nodeid = -1;
3561
3562                 if (is_overlap(lkb)) {
3563                         /* we'll ignore error in cancel/unlock reply */
3564                         queue_cast_overlap(r, lkb);
3565                         confirm_master(r, result);
3566                         unhold_lkb(lkb); /* undoes create_lkb() */
3567                 } else
3568                         _request_lock(r, lkb);
3569                 break;
3570
3571         default:
3572                 log_error(ls, "receive_request_reply %x error %d",
3573                           lkb->lkb_id, result);
3574         }
3575
3576         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3577                 log_debug(ls, "receive_request_reply %x result %d unlock",