dlm: send reply before bast
[linux-2.6.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         spin_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         spin_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r = NULL, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         spin_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         spin_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980
981         if (dlm_no_directory(r->res_ls))
982                 return;
983
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030
1031         return count;
1032 }
1033
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050         if (is_master_copy(lkb))
1051                 return;
1052
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166
1167 /* lkb is master or local copy */
1168
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185
1186                 if (!r->res_lvbptr)
1187                         return;
1188
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                 if (!r->res_lvbptr)
1208                         return;
1209
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239         if (!r->res_lvbptr)
1240                 return;
1241
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246
1247 /* lkb is process copy (pc) */
1248
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426
1427         return 0;
1428 }
1429
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645         if (err)
1646                 *err = 0;
1647
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757
1758         return max_t(int, high, hi);
1759 }
1760
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774
1775         return high;
1776 }
1777
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                         queue_bast(r, gr, lkb->lkb_rqmode);
1851                         gr->lkb_highbast = lkb->lkb_rqmode;
1852                 }
1853         }
1854 }
1855
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858         send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863         send_bast_queue(r, &r->res_grantqueue, lkb);
1864         send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888         struct dlm_ls *ls = r->res_ls;
1889         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                 r->res_first_lkid = lkb->lkb_id;
1894                 lkb->lkb_nodeid = r->res_nodeid;
1895                 return 0;
1896         }
1897
1898         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                 return 1;
1901         }
1902
1903         if (r->res_nodeid == 0) {
1904                 lkb->lkb_nodeid = 0;
1905                 return 0;
1906         }
1907
1908         if (r->res_nodeid > 0) {
1909                 lkb->lkb_nodeid = r->res_nodeid;
1910                 return 0;
1911         }
1912
1913         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915         dir_nodeid = dlm_dir_nodeid(r);
1916
1917         if (dir_nodeid != our_nodeid) {
1918                 r->res_first_lkid = lkb->lkb_id;
1919                 send_lookup(r, lkb);
1920                 return 1;
1921         }
1922
1923         for (i = 0; i < 2; i++) {
1924                 /* It's possible for dlm_scand to remove an old rsb for
1925                    this same resource from the toss list, us to create
1926                    a new one, look up the master locally, and find it
1927                    already exists just before dlm_scand does the
1928                    dir_remove() on the previous rsb. */
1929
1930                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                        r->res_length, &ret_nodeid);
1932                 if (!error)
1933                         break;
1934                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                 schedule();
1936         }
1937         if (error && error != -EEXIST)
1938                 return error;
1939
1940         if (ret_nodeid == our_nodeid) {
1941                 r->res_first_lkid = 0;
1942                 r->res_nodeid = 0;
1943                 lkb->lkb_nodeid = 0;
1944         } else {
1945                 r->res_first_lkid = lkb->lkb_id;
1946                 r->res_nodeid = ret_nodeid;
1947                 lkb->lkb_nodeid = ret_nodeid;
1948         }
1949         return 0;
1950 }
1951
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954         struct dlm_lkb *lkb, *safe;
1955
1956         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                 list_del_init(&lkb->lkb_rsb_lookup);
1958                 _request_lock(r, lkb);
1959                 schedule();
1960         }
1961 }
1962
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967         struct dlm_lkb *lkb;
1968
1969         if (!r->res_first_lkid)
1970                 return;
1971
1972         switch (error) {
1973         case 0:
1974         case -EINPROGRESS:
1975                 r->res_first_lkid = 0;
1976                 process_lookup_list(r);
1977                 break;
1978
1979         case -EAGAIN:
1980         case -EBADR:
1981         case -ENOTBLK:
1982                 /* the remote request failed and won't be retried (it was
1983                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                    lkb the first_lkid */
1985
1986                 r->res_first_lkid = 0;
1987
1988                 if (!list_empty(&r->res_lookup)) {
1989                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                          lkb_rsb_lookup);
1991                         list_del_init(&lkb->lkb_rsb_lookup);
1992                         r->res_first_lkid = lkb->lkb_id;
1993                         _request_lock(r, lkb);
1994                 }
1995                 break;
1996
1997         default:
1998                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1999         }
2000 }
2001
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                          int namelen, unsigned long timeout_cs,
2004                          void (*ast) (void *astparam),
2005                          void *astparam,
2006                          void (*bast) (void *astparam, int mode),
2007                          struct dlm_args *args)
2008 {
2009         int rv = -EINVAL;
2010
2011         /* check for invalid arg usage */
2012
2013         if (mode < 0 || mode > DLM_LOCK_EX)
2014                 goto out;
2015
2016         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                 goto out;
2018
2019         if (flags & DLM_LKF_CANCEL)
2020                 goto out;
2021
2022         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                 goto out;
2024
2025         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027
2028         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                 goto out;
2030
2031         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                 goto out;
2033
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                 goto out;
2036
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                 goto out;
2039
2040         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                 goto out;
2042
2043         if (!ast || !lksb)
2044                 goto out;
2045
2046         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                 goto out;
2048
2049         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                 goto out;
2051
2052         /* these args will be copied to the lkb in validate_lock_args,
2053            it cannot be done now because when converting locks, fields in
2054            an active lkb cannot be modified before locking the rsb */
2055
2056         args->flags = flags;
2057         args->astfn = ast;
2058         args->astparam = astparam;
2059         args->bastfn = bast;
2060         args->timeout = timeout_cs;
2061         args->mode = mode;
2062         args->lksb = lksb;
2063         rv = 0;
2064  out:
2065         return rv;
2066 }
2067
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                       DLM_LKF_FORCEUNLOCK))
2072                 return -EINVAL;
2073
2074         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                 return -EINVAL;
2076
2077         args->flags = flags;
2078         args->astparam = astarg;
2079         return 0;
2080 }
2081
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                               struct dlm_args *args)
2084 {
2085         int rv = -EINVAL;
2086
2087         if (args->flags & DLM_LKF_CONVERT) {
2088                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                         goto out;
2090
2091                 if (args->flags & DLM_LKF_QUECVT &&
2092                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                         goto out;
2094
2095                 rv = -EBUSY;
2096                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                         goto out;
2098
2099                 if (lkb->lkb_wait_type)
2100                         goto out;
2101
2102                 if (is_overlap(lkb))
2103                         goto out;
2104         }
2105
2106         lkb->lkb_exflags = args->flags;
2107         lkb->lkb_sbflags = 0;
2108         lkb->lkb_astfn = args->astfn;
2109         lkb->lkb_astparam = args->astparam;
2110         lkb->lkb_bastfn = args->bastfn;
2111         lkb->lkb_rqmode = args->mode;
2112         lkb->lkb_lksb = args->lksb;
2113         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114         lkb->lkb_ownpid = (int) current->pid;
2115         lkb->lkb_timeout_cs = args->timeout;
2116         rv = 0;
2117  out:
2118         if (rv)
2119                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                           lkb->lkb_status, lkb->lkb_wait_type,
2122                           lkb->lkb_resource->res_name);
2123         return rv;
2124 }
2125
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136         int rv = -EINVAL;
2137
2138         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                 dlm_print_lkb(lkb);
2141                 goto out;
2142         }
2143
2144         /* an lkb may still exist even though the lock is EOL'ed due to a
2145            cancel, unlock or failed noqueue request; an app can't use these
2146            locks; return same error as if the lkid had not been found at all */
2147
2148         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                 rv = -ENOENT;
2151                 goto out;
2152         }
2153
2154         /* an lkb may be waiting for an rsb lookup to complete where the
2155            lookup was initiated by another lock */
2156
2157         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                         list_del_init(&lkb->lkb_rsb_lookup);
2161                         queue_cast(lkb->lkb_resource, lkb,
2162                                    args->flags & DLM_LKF_CANCEL ?
2163                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2164                         unhold_lkb(lkb); /* undoes create_lkb() */
2165                 }
2166                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                 rv = -EBUSY;
2168                 goto out;
2169         }
2170
2171         /* cancel not allowed with another cancel/unlock in progress */
2172
2173         if (args->flags & DLM_LKF_CANCEL) {
2174                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                         goto out;
2176
2177                 if (is_overlap(lkb))
2178                         goto out;
2179
2180                 /* don't let scand try to do a cancel */
2181                 del_timeout(lkb);
2182
2183                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                         rv = -EBUSY;
2186                         goto out;
2187                 }
2188
2189                 /* there's nothing to cancel */
2190                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                     !lkb->lkb_wait_type) {
2192                         rv = -EBUSY;
2193                         goto out;
2194                 }
2195
2196                 switch (lkb->lkb_wait_type) {
2197                 case DLM_MSG_LOOKUP:
2198                 case DLM_MSG_REQUEST:
2199                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                         rv = -EBUSY;
2201                         goto out;
2202                 case DLM_MSG_UNLOCK:
2203                 case DLM_MSG_CANCEL:
2204                         goto out;
2205                 }
2206                 /* add_to_waiters() will set OVERLAP_CANCEL */
2207                 goto out_ok;
2208         }
2209
2210         /* do we need to allow a force-unlock if there's a normal unlock
2211            already in progress?  in what conditions could the normal unlock
2212            fail such that we'd want to send a force-unlock to be sure? */
2213
2214         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                         goto out;
2217
2218                 if (is_overlap_unlock(lkb))
2219                         goto out;
2220
2221                 /* don't let scand try to do a cancel */
2222                 del_timeout(lkb);
2223
2224                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                         rv = -EBUSY;
2227                         goto out;
2228                 }
2229
2230                 switch (lkb->lkb_wait_type) {
2231                 case DLM_MSG_LOOKUP:
2232                 case DLM_MSG_REQUEST:
2233                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                         rv = -EBUSY;
2235                         goto out;
2236                 case DLM_MSG_UNLOCK:
2237                         goto out;
2238                 }
2239                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                 goto out_ok;
2241         }
2242
2243         /* normal unlock not allowed if there's any op in progress */
2244         rv = -EBUSY;
2245         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                 goto out;
2247
2248  out_ok:
2249         /* an overlapping op shouldn't blow away exflags from other op */
2250         lkb->lkb_exflags |= args->flags;
2251         lkb->lkb_sbflags = 0;
2252         lkb->lkb_astparam = args->astparam;
2253         rv = 0;
2254  out:
2255         if (rv)
2256                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                           args->flags, lkb->lkb_wait_type,
2259                           lkb->lkb_resource->res_name);
2260         return rv;
2261 }
2262
2263 /*
2264  * Four stage 4 varieties:
2265  * do_request(), do_convert(), do_unlock(), do_cancel()
2266  * These are called on the master node for the given lock and
2267  * from the central locking logic.
2268  */
2269
2270 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271 {
2272         int error = 0;
2273
2274         if (can_be_granted(r, lkb, 1, NULL)) {
2275                 grant_lock(r, lkb);
2276                 queue_cast(r, lkb, 0);
2277                 goto out;
2278         }
2279
2280         if (can_be_queued(lkb)) {
2281                 error = -EINPROGRESS;
2282                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                 add_timeout(lkb);
2284                 goto out;
2285         }
2286
2287         error = -EAGAIN;
2288         queue_cast(r, lkb, -EAGAIN);
2289  out:
2290         return error;
2291 }
2292
2293 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2294                                int error)
2295 {
2296         switch (error) {
2297         case -EAGAIN:
2298                 if (force_blocking_asts(lkb))
2299                         send_blocking_asts_all(r, lkb);
2300                 break;
2301         case -EINPROGRESS:
2302                 send_blocking_asts(r, lkb);
2303                 break;
2304         }
2305 }
2306
2307 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2308 {
2309         int error = 0;
2310         int deadlk = 0;
2311
2312         /* changing an existing lock may allow others to be granted */
2313
2314         if (can_be_granted(r, lkb, 1, &deadlk)) {
2315                 grant_lock(r, lkb);
2316                 queue_cast(r, lkb, 0);
2317                 goto out;
2318         }
2319
2320         /* can_be_granted() detected that this lock would block in a conversion
2321            deadlock, so we leave it on the granted queue and return EDEADLK in
2322            the ast for the convert. */
2323
2324         if (deadlk) {
2325                 /* it's left on the granted queue */
2326                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2327                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2328                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2329                 revert_lock(r, lkb);
2330                 queue_cast(r, lkb, -EDEADLK);
2331                 error = -EDEADLK;
2332                 goto out;
2333         }
2334
2335         /* is_demoted() means the can_be_granted() above set the grmode
2336            to NL, and left us on the granted queue.  This auto-demotion
2337            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2338            now grantable.  We have to try to grant other converting locks
2339            before we try again to grant this one. */
2340
2341         if (is_demoted(lkb)) {
2342                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2343                 if (_can_be_granted(r, lkb, 1)) {
2344                         grant_lock(r, lkb);
2345                         queue_cast(r, lkb, 0);
2346                         goto out;
2347                 }
2348                 /* else fall through and move to convert queue */
2349         }
2350
2351         if (can_be_queued(lkb)) {
2352                 error = -EINPROGRESS;
2353                 del_lkb(r, lkb);
2354                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2355                 add_timeout(lkb);
2356                 goto out;
2357         }
2358
2359         error = -EAGAIN;
2360         queue_cast(r, lkb, -EAGAIN);
2361  out:
2362         return error;
2363 }
2364
2365 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2366                                int error)
2367 {
2368         switch (error) {
2369         case 0:
2370                 grant_pending_locks(r);
2371                 /* grant_pending_locks also sends basts */
2372                 break;
2373         case -EAGAIN:
2374                 if (force_blocking_asts(lkb))
2375                         send_blocking_asts_all(r, lkb);
2376                 break;
2377         case -EINPROGRESS:
2378                 send_blocking_asts(r, lkb);
2379                 break;
2380         }
2381 }
2382
2383 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2384 {
2385         remove_lock(r, lkb);
2386         queue_cast(r, lkb, -DLM_EUNLOCK);
2387         return -DLM_EUNLOCK;
2388 }
2389
2390 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2391                               int error)
2392 {
2393         grant_pending_locks(r);
2394 }
2395
2396 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2397  
2398 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2399 {
2400         int error;
2401
2402         error = revert_lock(r, lkb);
2403         if (error) {
2404                 queue_cast(r, lkb, -DLM_ECANCEL);
2405                 return -DLM_ECANCEL;
2406         }
2407         return 0;
2408 }
2409
2410 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2411                               int error)
2412 {
2413         if (error)
2414                 grant_pending_locks(r);
2415 }
2416
2417 /*
2418  * Four stage 3 varieties:
2419  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2420  */
2421
2422 /* add a new lkb to a possibly new rsb, called by requesting process */
2423
2424 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2425 {
2426         int error;
2427
2428         /* set_master: sets lkb nodeid from r */
2429
2430         error = set_master(r, lkb);
2431         if (error < 0)
2432                 goto out;
2433         if (error) {
2434                 error = 0;
2435                 goto out;
2436         }
2437
2438         if (is_remote(r)) {
2439                 /* receive_request() calls do_request() on remote node */
2440                 error = send_request(r, lkb);
2441         } else {
2442                 error = do_request(r, lkb);
2443                 /* for remote locks the request_reply is sent
2444                    between do_request and do_request_effects */
2445                 do_request_effects(r, lkb, error);
2446         }
2447  out:
2448         return error;
2449 }
2450
2451 /* change some property of an existing lkb, e.g. mode */
2452
2453 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454 {
2455         int error;
2456
2457         if (is_remote(r)) {
2458                 /* receive_convert() calls do_convert() on remote node */
2459                 error = send_convert(r, lkb);
2460         } else {
2461                 error = do_convert(r, lkb);
2462                 /* for remote locks the convert_reply is sent
2463                    between do_convert and do_convert_effects */
2464                 do_convert_effects(r, lkb, error);
2465         }
2466
2467         return error;
2468 }
2469
2470 /* remove an existing lkb from the granted queue */
2471
2472 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2473 {
2474         int error;
2475
2476         if (is_remote(r)) {
2477                 /* receive_unlock() calls do_unlock() on remote node */
2478                 error = send_unlock(r, lkb);
2479         } else {
2480                 error = do_unlock(r, lkb);
2481                 /* for remote locks the unlock_reply is sent
2482                    between do_unlock and do_unlock_effects */
2483                 do_unlock_effects(r, lkb, error);
2484         }
2485
2486         return error;
2487 }
2488
2489 /* remove an existing lkb from the convert or wait queue */
2490
2491 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2492 {
2493         int error;
2494
2495         if (is_remote(r)) {
2496                 /* receive_cancel() calls do_cancel() on remote node */
2497                 error = send_cancel(r, lkb);
2498         } else {
2499                 error = do_cancel(r, lkb);
2500                 /* for remote locks the cancel_reply is sent
2501                    between do_cancel and do_cancel_effects */
2502                 do_cancel_effects(r, lkb, error);
2503         }
2504
2505         return error;
2506 }
2507
2508 /*
2509  * Four stage 2 varieties:
2510  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2511  */
2512
2513 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2514                         int len, struct dlm_args *args)
2515 {
2516         struct dlm_rsb *r;
2517         int error;
2518
2519         error = validate_lock_args(ls, lkb, args);
2520         if (error)
2521                 goto out;
2522
2523         error = find_rsb(ls, name, len, R_CREATE, &r);
2524         if (error)
2525                 goto out;
2526
2527         lock_rsb(r);
2528
2529         attach_lkb(r, lkb);
2530         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2531
2532         error = _request_lock(r, lkb);
2533
2534         unlock_rsb(r);
2535         put_rsb(r);
2536
2537  out:
2538         return error;
2539 }
2540
2541 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2542                         struct dlm_args *args)
2543 {
2544         struct dlm_rsb *r;
2545         int error;
2546
2547         r = lkb->lkb_resource;
2548
2549         hold_rsb(r);
2550         lock_rsb(r);
2551
2552         error = validate_lock_args(ls, lkb, args);
2553         if (error)
2554                 goto out;
2555
2556         error = _convert_lock(r, lkb);
2557  out:
2558         unlock_rsb(r);
2559         put_rsb(r);
2560         return error;
2561 }
2562
2563 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2564                        struct dlm_args *args)
2565 {
2566         struct dlm_rsb *r;
2567         int error;
2568
2569         r = lkb->lkb_resource;
2570
2571         hold_rsb(r);
2572         lock_rsb(r);
2573
2574         error = validate_unlock_args(lkb, args);
2575         if (error)
2576                 goto out;
2577
2578         error = _unlock_lock(r, lkb);
2579  out:
2580         unlock_rsb(r);
2581         put_rsb(r);
2582         return error;
2583 }
2584
2585 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2586                        struct dlm_args *args)
2587 {
2588         struct dlm_rsb *r;
2589         int error;
2590
2591         r = lkb->lkb_resource;
2592
2593         hold_rsb(r);
2594         lock_rsb(r);
2595
2596         error = validate_unlock_args(lkb, args);
2597         if (error)
2598                 goto out;
2599
2600         error = _cancel_lock(r, lkb);
2601  out:
2602         unlock_rsb(r);
2603         put_rsb(r);
2604         return error;
2605 }
2606
2607 /*
2608  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2609  */
2610
2611 int dlm_lock(dlm_lockspace_t *lockspace,
2612              int mode,
2613              struct dlm_lksb *lksb,
2614              uint32_t flags,
2615              void *name,
2616              unsigned int namelen,
2617              uint32_t parent_lkid,
2618              void (*ast) (void *astarg),
2619              void *astarg,
2620              void (*bast) (void *astarg, int mode))
2621 {
2622         struct dlm_ls *ls;
2623         struct dlm_lkb *lkb;
2624         struct dlm_args args;
2625         int error, convert = flags & DLM_LKF_CONVERT;
2626
2627         ls = dlm_find_lockspace_local(lockspace);
2628         if (!ls)
2629                 return -EINVAL;
2630
2631         dlm_lock_recovery(ls);
2632
2633         if (convert)
2634                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2635         else
2636                 error = create_lkb(ls, &lkb);
2637
2638         if (error)
2639                 goto out;
2640
2641         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2642                               astarg, bast, &args);
2643         if (error)
2644                 goto out_put;
2645
2646         if (convert)
2647                 error = convert_lock(ls, lkb, &args);
2648         else
2649                 error = request_lock(ls, lkb, name, namelen, &args);
2650
2651         if (error == -EINPROGRESS)
2652                 error = 0;
2653  out_put:
2654         if (convert || error)
2655                 __put_lkb(ls, lkb);
2656         if (error == -EAGAIN || error == -EDEADLK)
2657                 error = 0;
2658  out:
2659         dlm_unlock_recovery(ls);
2660         dlm_put_lockspace(ls);
2661         return error;
2662 }
2663
2664 int dlm_unlock(dlm_lockspace_t *lockspace,
2665                uint32_t lkid,
2666                uint32_t flags,
2667                struct dlm_lksb *lksb,
2668                void *astarg)
2669 {
2670         struct dlm_ls *ls;
2671         struct dlm_lkb *lkb;
2672         struct dlm_args args;
2673         int error;
2674
2675         ls = dlm_find_lockspace_local(lockspace);
2676         if (!ls)
2677                 return -EINVAL;
2678
2679         dlm_lock_recovery(ls);
2680
2681         error = find_lkb(ls, lkid, &lkb);
2682         if (error)
2683                 goto out;
2684
2685         error = set_unlock_args(flags, astarg, &args);
2686         if (error)
2687                 goto out_put;
2688
2689         if (flags & DLM_LKF_CANCEL)
2690                 error = cancel_lock(ls, lkb, &args);
2691         else
2692                 error = unlock_lock(ls, lkb, &args);
2693
2694         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2695                 error = 0;
2696         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2697                 error = 0;
2698  out_put:
2699         dlm_put_lkb(lkb);
2700  out:
2701         dlm_unlock_recovery(ls);
2702         dlm_put_lockspace(ls);
2703         return error;
2704 }
2705
2706 /*
2707  * send/receive routines for remote operations and replies
2708  *
2709  * send_args
2710  * send_common
2711  * send_request                 receive_request
2712  * send_convert                 receive_convert
2713  * send_unlock                  receive_unlock
2714  * send_cancel                  receive_cancel
2715  * send_grant                   receive_grant
2716  * send_bast                    receive_bast
2717  * send_lookup                  receive_lookup
2718  * send_remove                  receive_remove
2719  *
2720  *                              send_common_reply
2721  * receive_request_reply        send_request_reply
2722  * receive_convert_reply        send_convert_reply
2723  * receive_unlock_reply         send_unlock_reply
2724  * receive_cancel_reply         send_cancel_reply
2725  * receive_lookup_reply         send_lookup_reply
2726  */
2727
2728 static int _create_message(struct dlm_ls *ls, int mb_len,
2729                            int to_nodeid, int mstype,
2730                            struct dlm_message **ms_ret,
2731                            struct dlm_mhandle **mh_ret)
2732 {
2733         struct dlm_message *ms;
2734         struct dlm_mhandle *mh;
2735         char *mb;
2736
2737         /* get_buffer gives us a message handle (mh) that we need to
2738            pass into lowcomms_commit and a message buffer (mb) that we
2739            write our data into */
2740
2741         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2742         if (!mh)
2743                 return -ENOBUFS;
2744
2745         memset(mb, 0, mb_len);
2746
2747         ms = (struct dlm_message *) mb;
2748
2749         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2750         ms->m_header.h_lockspace = ls->ls_global_id;
2751         ms->m_header.h_nodeid = dlm_our_nodeid();
2752         ms->m_header.h_length = mb_len;
2753         ms->m_header.h_cmd = DLM_MSG;
2754
2755         ms->m_type = mstype;
2756
2757         *mh_ret = mh;
2758         *ms_ret = ms;
2759         return 0;
2760 }
2761
2762 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2763                           int to_nodeid, int mstype,
2764                           struct dlm_message **ms_ret,
2765                           struct dlm_mhandle **mh_ret)
2766 {
2767         int mb_len = sizeof(struct dlm_message);
2768
2769         switch (mstype) {
2770         case DLM_MSG_REQUEST:
2771         case DLM_MSG_LOOKUP:
2772         case DLM_MSG_REMOVE:
2773                 mb_len += r->res_length;
2774                 break;
2775         case DLM_MSG_CONVERT:
2776         case DLM_MSG_UNLOCK:
2777         case DLM_MSG_REQUEST_REPLY:
2778         case DLM_MSG_CONVERT_REPLY:
2779         case DLM_MSG_GRANT:
2780                 if (lkb && lkb->lkb_lvbptr)
2781                         mb_len += r->res_ls->ls_lvblen;
2782                 break;
2783         }
2784
2785         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2786                                ms_ret, mh_ret);
2787 }
2788
2789 /* further lowcomms enhancements or alternate implementations may make
2790    the return value from this function useful at some point */
2791
2792 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2793 {
2794         dlm_message_out(ms);
2795         dlm_lowcomms_commit_buffer(mh);
2796         return 0;
2797 }
2798
2799 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2800                       struct dlm_message *ms)
2801 {
2802         ms->m_nodeid   = lkb->lkb_nodeid;
2803         ms->m_pid      = lkb->lkb_ownpid;
2804         ms->m_lkid     = lkb->lkb_id;
2805         ms->m_remid    = lkb->lkb_remid;
2806         ms->m_exflags  = lkb->lkb_exflags;
2807         ms->m_sbflags  = lkb->lkb_sbflags;
2808         ms->m_flags    = lkb->lkb_flags;
2809         ms->m_lvbseq   = lkb->lkb_lvbseq;
2810         ms->m_status   = lkb->lkb_status;
2811         ms->m_grmode   = lkb->lkb_grmode;
2812         ms->m_rqmode   = lkb->lkb_rqmode;
2813         ms->m_hash     = r->res_hash;
2814
2815         /* m_result and m_bastmode are set from function args,
2816            not from lkb fields */
2817
2818         if (lkb->lkb_bastfn)
2819                 ms->m_asts |= AST_BAST;
2820         if (lkb->lkb_astfn)
2821                 ms->m_asts |= AST_COMP;
2822
2823         /* compare with switch in create_message; send_remove() doesn't
2824            use send_args() */
2825
2826         switch (ms->m_type) {
2827         case DLM_MSG_REQUEST:
2828         case DLM_MSG_LOOKUP:
2829                 memcpy(ms->m_extra, r->res_name, r->res_length);
2830                 break;
2831         case DLM_MSG_CONVERT:
2832         case DLM_MSG_UNLOCK:
2833         case DLM_MSG_REQUEST_REPLY:
2834         case DLM_MSG_CONVERT_REPLY:
2835         case DLM_MSG_GRANT:
2836                 if (!lkb->lkb_lvbptr)
2837                         break;
2838                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2839                 break;
2840         }
2841 }
2842
2843 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2844 {
2845         struct dlm_message *ms;
2846         struct dlm_mhandle *mh;
2847         int to_nodeid, error;
2848
2849         error = add_to_waiters(lkb, mstype);
2850         if (error)
2851                 return error;
2852
2853         to_nodeid = r->res_nodeid;
2854
2855         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2856         if (error)
2857                 goto fail;
2858
2859         send_args(r, lkb, ms);
2860
2861         error = send_message(mh, ms);
2862         if (error)
2863                 goto fail;
2864         return 0;
2865
2866  fail:
2867         remove_from_waiters(lkb, msg_reply_type(mstype));
2868         return error;
2869 }
2870
2871 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2872 {
2873         return send_common(r, lkb, DLM_MSG_REQUEST);
2874 }
2875
2876 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2877 {
2878         int error;
2879
2880         error = send_common(r, lkb, DLM_MSG_CONVERT);
2881
2882         /* down conversions go without a reply from the master */
2883         if (!error && down_conversion(lkb)) {
2884                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2885                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2886                 r->res_ls->ls_stub_ms.m_result = 0;
2887                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2888                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2889         }
2890
2891         return error;
2892 }
2893
2894 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2895    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2896    that the master is still correct. */
2897
2898 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2899 {
2900         return send_common(r, lkb, DLM_MSG_UNLOCK);
2901 }
2902
2903 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2904 {
2905         return send_common(r, lkb, DLM_MSG_CANCEL);
2906 }
2907
2908 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2909 {
2910         struct dlm_message *ms;
2911         struct dlm_mhandle *mh;
2912         int to_nodeid, error;
2913
2914         to_nodeid = lkb->lkb_nodeid;
2915
2916         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2917         if (error)
2918                 goto out;
2919
2920         send_args(r, lkb, ms);
2921
2922         ms->m_result = 0;
2923
2924         error = send_message(mh, ms);
2925  out:
2926         return error;
2927 }
2928
2929 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2930 {
2931         struct dlm_message *ms;
2932         struct dlm_mhandle *mh;
2933         int to_nodeid, error;
2934
2935         to_nodeid = lkb->lkb_nodeid;
2936
2937         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2938         if (error)
2939                 goto out;
2940
2941         send_args(r, lkb, ms);
2942
2943         ms->m_bastmode = mode;
2944
2945         error = send_message(mh, ms);
2946  out:
2947         return error;
2948 }
2949
2950 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2951 {
2952         struct dlm_message *ms;
2953         struct dlm_mhandle *mh;
2954         int to_nodeid, error;
2955
2956         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2957         if (error)
2958                 return error;
2959
2960         to_nodeid = dlm_dir_nodeid(r);
2961
2962         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2963         if (error)
2964                 goto fail;
2965
2966         send_args(r, lkb, ms);
2967
2968         error = send_message(mh, ms);
2969         if (error)
2970                 goto fail;
2971         return 0;
2972
2973  fail:
2974         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2975         return error;
2976 }
2977
2978 static int send_remove(struct dlm_rsb *r)
2979 {
2980         struct dlm_message *ms;
2981         struct dlm_mhandle *mh;
2982         int to_nodeid, error;
2983
2984         to_nodeid = dlm_dir_nodeid(r);
2985
2986         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2987         if (error)
2988                 goto out;
2989
2990         memcpy(ms->m_extra, r->res_name, r->res_length);
2991         ms->m_hash = r->res_hash;
2992
2993         error = send_message(mh, ms);
2994  out:
2995         return error;
2996 }
2997
2998 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2999                              int mstype, int rv)
3000 {
3001         struct dlm_message *ms;
3002         struct dlm_mhandle *mh;
3003         int to_nodeid, error;
3004
3005         to_nodeid = lkb->lkb_nodeid;
3006
3007         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3008         if (error)
3009                 goto out;
3010
3011         send_args(r, lkb, ms);
3012
3013         ms->m_result = rv;
3014
3015         error = send_message(mh, ms);
3016  out:
3017         return error;
3018 }
3019
3020 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3021 {
3022         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3023 }
3024
3025 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3026 {
3027         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3028 }
3029
3030 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3031 {
3032         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3033 }
3034
3035 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3036 {
3037         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3038 }
3039
3040 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3041                              int ret_nodeid, int rv)
3042 {
3043         struct dlm_rsb *r = &ls->ls_stub_rsb;
3044         struct dlm_message *ms;
3045         struct dlm_mhandle *mh;
3046         int error, nodeid = ms_in->m_header.h_nodeid;
3047
3048         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3049         if (error)
3050                 goto out;
3051
3052         ms->m_lkid = ms_in->m_lkid;
3053         ms->m_result = rv;
3054         ms->m_nodeid = ret_nodeid;
3055
3056         error = send_message(mh, ms);
3057  out:
3058         return error;
3059 }
3060
3061 /* which args we save from a received message depends heavily on the type
3062    of message, unlike the send side where we can safely send everything about
3063    the lkb for any type of message */
3064
3065 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3066 {
3067         lkb->lkb_exflags = ms->m_exflags;
3068         lkb->lkb_sbflags = ms->m_sbflags;
3069         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3070                          (ms->m_flags & 0x0000FFFF);
3071 }
3072
3073 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3074 {
3075         lkb->lkb_sbflags = ms->m_sbflags;
3076         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3077                          (ms->m_flags & 0x0000FFFF);
3078 }
3079
3080 static int receive_extralen(struct dlm_message *ms)
3081 {
3082         return (ms->m_header.h_length - sizeof(struct dlm_message));
3083 }
3084
3085 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3086                        struct dlm_message *ms)
3087 {
3088         int len;
3089
3090         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3091                 if (!lkb->lkb_lvbptr)
3092                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3093                 if (!lkb->lkb_lvbptr)
3094                         return -ENOMEM;
3095                 len = receive_extralen(ms);
3096                 if (len > DLM_RESNAME_MAXLEN)
3097                         len = DLM_RESNAME_MAXLEN;
3098                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3099         }
3100         return 0;
3101 }
3102
3103 static void fake_bastfn(void *astparam, int mode)
3104 {
3105         log_print("fake_bastfn should not be called");
3106 }
3107
3108 static void fake_astfn(void *astparam)
3109 {
3110         log_print("fake_astfn should not be called");
3111 }
3112
3113 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3114                                 struct dlm_message *ms)
3115 {
3116         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3117         lkb->lkb_ownpid = ms->m_pid;
3118         lkb->lkb_remid = ms->m_lkid;
3119         lkb->lkb_grmode = DLM_LOCK_IV;
3120         lkb->lkb_rqmode = ms->m_rqmode;
3121
3122         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3123         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3124
3125         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3126                 /* lkb was just created so there won't be an lvb yet */
3127                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3128                 if (!lkb->lkb_lvbptr)
3129                         return -ENOMEM;
3130         }
3131
3132         return 0;
3133 }
3134
3135 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3136                                 struct dlm_message *ms)
3137 {
3138         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3139                 return -EBUSY;
3140
3141         if (receive_lvb(ls, lkb, ms))
3142                 return -ENOMEM;
3143
3144         lkb->lkb_rqmode = ms->m_rqmode;
3145         lkb->lkb_lvbseq = ms->m_lvbseq;
3146
3147         return 0;
3148 }
3149
3150 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3151                                struct dlm_message *ms)
3152 {
3153         if (receive_lvb(ls, lkb, ms))
3154                 return -ENOMEM;
3155         return 0;
3156 }
3157
3158 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3159    uses to send a reply and that the remote end uses to process the reply. */
3160
3161 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3162 {
3163         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3164         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3165         lkb->lkb_remid = ms->m_lkid;
3166 }
3167
3168 /* This is called after the rsb is locked so that we can safely inspect
3169    fields in the lkb. */
3170
3171 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3172 {
3173         int from = ms->m_header.h_nodeid;
3174         int error = 0;
3175
3176         switch (ms->m_type) {
3177         case DLM_MSG_CONVERT:
3178         case DLM_MSG_UNLOCK:
3179         case DLM_MSG_CANCEL:
3180                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3181                         error = -EINVAL;
3182                 break;
3183
3184         case DLM_MSG_CONVERT_REPLY:
3185         case DLM_MSG_UNLOCK_REPLY:
3186         case DLM_MSG_CANCEL_REPLY:
3187         case DLM_MSG_GRANT:
3188         case DLM_MSG_BAST:
3189                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3190                         error = -EINVAL;
3191                 break;
3192
3193         case DLM_MSG_REQUEST_REPLY:
3194                 if (!is_process_copy(lkb))
3195                         error = -EINVAL;
3196                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3197                         error = -EINVAL;
3198                 break;
3199
3200         default:
3201                 error = -EINVAL;
3202         }
3203
3204         if (error)
3205                 log_error(lkb->lkb_resource->res_ls,
3206                           "ignore invalid message %d from %d %x %x %x %d",
3207                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3208                           lkb->lkb_flags, lkb->lkb_nodeid);
3209         return error;
3210 }
3211
3212 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3213 {
3214         struct dlm_lkb *lkb;
3215         struct dlm_rsb *r;
3216         int error, namelen;
3217
3218         error = create_lkb(ls, &lkb);
3219         if (error)
3220                 goto fail;
3221
3222         receive_flags(lkb, ms);
3223         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3224         error = receive_request_args(ls, lkb, ms);
3225         if (error) {
3226                 __put_lkb(ls, lkb);
3227                 goto fail;
3228         }
3229
3230         namelen = receive_extralen(ms);
3231
3232         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3233         if (error) {
3234                 __put_lkb(ls, lkb);
3235                 goto fail;
3236         }
3237
3238         lock_rsb(r);
3239
3240         attach_lkb(r, lkb);
3241         error = do_request(r, lkb);
3242         send_request_reply(r, lkb, error);
3243         do_request_effects(r, lkb, error);
3244
3245         unlock_rsb(r);
3246         put_rsb(r);
3247
3248         if (error == -EINPROGRESS)
3249                 error = 0;
3250         if (error)
3251                 dlm_put_lkb(lkb);
3252         return;
3253
3254  fail:
3255         setup_stub_lkb(ls, ms);
3256         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3257 }
3258
3259 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3260 {
3261         struct dlm_lkb *lkb;
3262         struct dlm_rsb *r;
3263         int error, reply = 1;
3264
3265         error = find_lkb(ls, ms->m_remid, &lkb);
3266         if (error)
3267                 goto fail;
3268
3269         r = lkb->lkb_resource;
3270
3271         hold_rsb(r);
3272         lock_rsb(r);
3273
3274         error = validate_message(lkb, ms);
3275         if (error)
3276                 goto out;
3277
3278         receive_flags(lkb, ms);
3279
3280         error = receive_convert_args(ls, lkb, ms);
3281         if (error) {
3282                 send_convert_reply(r, lkb, error);
3283                 goto out;
3284         }
3285
3286         reply = !down_conversion(lkb);
3287
3288         error = do_convert(r, lkb);
3289         if (reply)
3290                 send_convert_reply(r, lkb, error);
3291         do_convert_effects(r, lkb, error);
3292  out:
3293         unlock_rsb(r);
3294         put_rsb(r);
3295         dlm_put_lkb(lkb);
3296         return;
3297
3298  fail:
3299         setup_stub_lkb(ls, ms);
3300         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3301 }
3302
3303 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3304 {
3305         struct dlm_lkb *lkb;
3306         struct dlm_rsb *r;
3307         int error;
3308
3309         error = find_lkb(ls, ms->m_remid, &lkb);
3310         if (error)
3311                 goto fail;
3312
3313         r = lkb->lkb_resource;
3314
3315         hold_rsb(r);
3316         lock_rsb(r);
3317
3318         error = validate_message(lkb, ms);
3319         if (error)
3320                 goto out;
3321
3322         receive_flags(lkb, ms);
3323
3324         error = receive_unlock_args(ls, lkb, ms);
3325         if (error) {
3326                 send_unlock_reply(r, lkb, error);
3327                 goto out;
3328         }
3329
3330         error = do_unlock(r, lkb);
3331         send_unlock_reply(r, lkb, error);
3332         do_unlock_effects(r, lkb, error);
3333  out:
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         dlm_put_lkb(lkb);
3337         return;
3338
3339  fail:
3340         setup_stub_lkb(ls, ms);
3341         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3342 }
3343
3344 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3345 {
3346         struct dlm_lkb *lkb;
3347         struct dlm_rsb *r;
3348         int error;
3349
3350         error = find_lkb(ls, ms->m_remid, &lkb);
3351         if (error)
3352                 goto fail;
3353
3354         receive_flags(lkb, ms);
3355
3356         r = lkb->lkb_resource;
3357
3358         hold_rsb(r);
3359         lock_rsb(r);
3360
3361         error = validate_message(lkb, ms);
3362         if (error)
3363                 goto out;
3364
3365         error = do_cancel(r, lkb);
3366         send_cancel_reply(r, lkb, error);
3367         do_cancel_effects(r, lkb, error);
3368  out:
3369         unlock_rsb(r);
3370         put_rsb(r);
3371         dlm_put_lkb(lkb);
3372         return;
3373
3374  fail:
3375         setup_stub_lkb(ls, ms);
3376         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3377 }
3378
3379 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3380 {
3381         struct dlm_lkb *lkb;
3382         struct dlm_rsb *r;
3383         int error;
3384
3385         error = find_lkb(ls, ms->m_remid, &lkb);
3386         if (error) {
3387                 log_debug(ls, "receive_grant from %d no lkb %x",
3388                           ms->m_header.h_nodeid, ms->m_remid);
3389                 return;
3390         }
3391
3392         r = lkb->lkb_resource;
3393
3394         hold_rsb(r);
3395         lock_rsb(r);
3396
3397         error = validate_message(lkb, ms);
3398         if (error)
3399                 goto out;
3400
3401         receive_flags_reply(lkb, ms);
3402         if (is_altmode(lkb))
3403                 munge_altmode(lkb, ms);
3404         grant_lock_pc(r, lkb, ms);
3405         queue_cast(r, lkb, 0);
3406  out:
3407         unlock_rsb(r);
3408         put_rsb(r);
3409         dlm_put_lkb(lkb);
3410 }
3411
3412 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3413 {
3414         struct dlm_lkb *lkb;
3415         struct dlm_rsb *r;
3416         int error;
3417
3418         error = find_lkb(ls, ms->m_remid, &lkb);
3419         if (error) {
3420                 log_debug(ls, "receive_bast from %d no lkb %x",
3421                           ms->m_header.h_nodeid, ms->m_remid);
3422                 return;
3423         }
3424
3425         r = lkb->lkb_resource;
3426
3427         hold_rsb(r);
3428         lock_rsb(r);
3429
3430         error = validate_message(lkb, ms);
3431         if (error)
3432                 goto out;
3433
3434         queue_bast(r, lkb, ms->m_bastmode);
3435  out:
3436         unlock_rsb(r);
3437         put_rsb(r);
3438         dlm_put_lkb(lkb);
3439 }
3440
3441 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3442 {
3443         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3444
3445         from_nodeid = ms->m_header.h_nodeid;
3446         our_nodeid = dlm_our_nodeid();
3447
3448         len = receive_extralen(ms);
3449
3450         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3451         if (dir_nodeid != our_nodeid) {
3452                 log_error(ls, "lookup dir_nodeid %d from %d",
3453                           dir_nodeid, from_nodeid);
3454                 error = -EINVAL;
3455                 ret_nodeid = -1;
3456                 goto out;
3457         }
3458
3459         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3460
3461         /* Optimization: we're master so treat lookup as a request */
3462         if (!error && ret_nodeid == our_nodeid) {
3463                 receive_request(ls, ms);
3464                 return;
3465         }
3466  out:
3467         send_lookup_reply(ls, ms, ret_nodeid, error);
3468 }
3469
3470 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3471 {
3472         int len, dir_nodeid, from_nodeid;
3473
3474         from_nodeid = ms->m_header.h_nodeid;
3475
3476         len = receive_extralen(ms);
3477
3478         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3479         if (dir_nodeid != dlm_our_nodeid()) {
3480                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3481                           dir_nodeid, from_nodeid);
3482                 return;
3483         }
3484
3485         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3486 }
3487
3488 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3489 {
3490         do_purge(ls, ms->m_nodeid, ms->m_pid);
3491 }
3492
3493 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3494 {
3495         struct dlm_lkb *lkb;
3496         struct dlm_rsb *r;
3497         int error, mstype, result;
3498
3499         error = find_lkb(ls, ms->m_remid, &lkb);
3500         if (error) {
3501                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3502                           ms->m_header.h_nodeid, ms->m_remid);
3503                 return;
3504         }
3505
3506         r = lkb->lkb_resource;
3507         hold_rsb(r);
3508         lock_rsb(r);
3509
3510         error = validate_message(lkb, ms);
3511         if (error)
3512                 goto out;
3513
3514         mstype = lkb->lkb_wait_type;
3515         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3516         if (error)
3517                 goto out;
3518
3519         /* Optimization: the dir node was also the master, so it took our
3520            lookup as a request and sent request reply instead of lookup reply */
3521         if (mstype == DLM_MSG_LOOKUP) {
3522                 r->res_nodeid = ms->m_header.h_nodeid;
3523                 lkb->lkb_nodeid = r->res_nodeid;
3524         }
3525
3526         /* this is the value returned from do_request() on the master */
3527         result = ms->m_result;
3528
3529         switch (result) {
3530         case -EAGAIN:
3531                 /* request would block (be queued) on remote master */
3532                 queue_cast(r, lkb, -EAGAIN);
3533                 confirm_master(r, -EAGAIN);
3534                 unhold_lkb(lkb); /* undoes create_lkb() */
3535                 break;
3536
3537         case -EINPROGRESS:
3538         case 0:
3539                 /* request was queued or granted on remote master */
3540                 receive_flags_reply(lkb, ms);
3541                 lkb->lkb_remid = ms->m_lkid;
3542                 if (is_altmode(lkb))
3543                         munge_altmode(lkb, ms);
3544                 if (result) {
3545                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3546                         add_timeout(lkb);
3547                 } else {
3548                         grant_lock_pc(r, lkb, ms);
3549                         queue_cast(r, lkb, 0);
3550                 }
3551                 confirm_master(r, result);
3552                 break;
3553
3554         case -EBADR:
3555         case -ENOTBLK:
3556                 /* find_rsb failed to find rsb or rsb wasn't master */
3557                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3558                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3559                 r->res_nodeid = -1;
3560                 lkb->lkb_nodeid = -1;
3561
3562                 if (is_overlap(lkb)) {
3563                         /* we'll ignore error in cancel/unlock reply */
3564                         queue_cast_overlap(r, lkb);
3565                         confirm_master(r, result);
3566                         unhold_lkb(lkb); /* undoes create_lkb() */
3567                 } else
3568                         _request_lock(r, lkb);
3569                 break;
3570
3571         default:
3572                 log_error(ls, "receive_request_reply %x error %d",
3573                           lkb->lkb_id, result);
3574         }
3575
3576         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3577                 log_debug(ls, "receive_request_reply %x result %d unlock",
3578                           lkb->lkb_id, result);
3579                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3580                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3581                 send_unlock(r, lkb);
3582         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3583                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3584                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3585                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3586                 send_cancel(r, lkb);
3587         } else {
3588                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3589                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3590         }
3591  out:
3592         unlock_rsb(r);
3593         put_rsb(r);
3594         dlm_put_lkb(lkb);
3595 }
3596
3597 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3598                                     struct dlm_message *ms)
3599 {
3600         /* this is the value returned from do_convert() on the master */
3601         switch (ms->m_result) {
3602         case -EAGAIN:
3603                 /* convert would block (be queued) on remote master */
3604                 queue_cast(r, lkb, -EAGAIN);
3605                 break;
3606
3607         case -EDEADLK:
3608                 receive_flags_reply(lkb, ms);
3609                 revert_lock_pc(r, lkb);
3610                 queue_cast(r, lkb, -EDEADLK);
3611                 break;
3612
3613         case -EINPROGRESS:
3614                 /* convert was queued on remote master */
3615                 receive_flags_reply(lkb, ms);
3616                 if (is_demoted(lkb))
3617                         munge_demoted(lkb, ms);
3618                 del_lkb(r, lkb);
3619                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3620                 add_timeout(lkb);
3621                 break;
3622
3623         case 0:
3624                 /* convert was granted on remote master */
3625                 receive_flags_reply(lkb, ms);
3626                 if (is_demoted(lkb))
3627                         munge_demoted(lkb, ms);
3628                 grant_lock_pc(r, lkb, ms);
3629                 queue_cast(r, lkb, 0);
3630                 break;
3631
3632         default:
3633                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3634                           lkb->lkb_id, ms->m_result);
3635         }
3636 }
3637
3638 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3639 {
3640         struct dlm_rsb *r = lkb->lkb_resource;
3641         int error;
3642
3643         hold_rsb(r);
3644         lock_rsb(r);
3645
3646         error = validate_message(lkb, ms);
3647         if (error)
3648                 goto out;
3649
3650         /* stub reply can happen with waiters_mutex held */
3651         error = remove_from_waiters_ms(lkb, ms);
3652         if (error)
3653                 goto out;
3654
3655         __receive_convert_reply(r, lkb, ms);
3656  out:
3657         unlock_rsb(r);
3658         put_rsb(r);
3659 }
3660
3661 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3662 {
3663         struct dlm_lkb *lkb;
3664         int error;
3665
3666         error = find_lkb(ls, ms->m_remid, &lkb);
3667         if (error) {
3668                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3669                           ms->m_header.h_nodeid, ms->m_remid);
3670                 return;
3671         }
3672
3673         _receive_convert_reply(lkb, ms);
3674         dlm_put_lkb(lkb);
3675 }
3676
3677 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3678 {
3679         struct dlm_rsb *r = lkb->lkb_resource;
3680         int error;
3681
3682         hold_rsb(r);
3683         lock_rsb(r);
3684
3685         error = validate_message(lkb, ms);
3686         if (error)
3687                 goto out;
3688
3689         /* stub reply can happen with waiters_mutex held */
3690         error = remove_from_waiters_ms(lkb, ms);
3691         if (error)
3692                 goto out;
3693
3694         /* this is the value returned from do_unlock() on the master */
3695
3696         switch (ms->m_result) {
3697         case -DLM_EUNLOCK:
3698                 receive_flags_reply(lkb, ms);
3699                 remove_lock_pc(r, lkb);
3700                 queue_cast(r, lkb, -DLM_EUNLOCK);
3701                 break;
3702         case -ENOENT:
3703                 break;
3704         default:
3705                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3706                           lkb->lkb_id, ms->m_result);
3707         }
3708  out:
3709         unlock_rsb(r);
3710         put_rsb(r);
3711 }
3712
3713 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3714 {
3715         struct dlm_lkb *lkb;
3716         int error;
3717
3718         error = find_lkb(ls, ms->m_remid, &lkb);
3719         if (error) {
3720                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3721                           ms->m_header.h_nodeid, ms->m_remid);
3722                 return;
3723         }
3724
3725         _receive_unlock_reply(lkb, ms);
3726         dlm_put_lkb(lkb);
3727 }
3728
3729 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3730 {
3731         struct dlm_rsb *r = lkb->lkb_resource;
3732         int error;
3733
3734         hold_rsb(r);
3735         lock_rsb(r);
3736
3737         error = validate_message(lkb, ms);
3738         if (error)
3739                 goto out;
3740
3741         /* stub reply can happen with waiters_mutex held */
3742         error = remove_from_waiters_ms(lkb, ms);
3743         if (error)
3744                 goto out;
3745
3746         /* this is the value returned from do_cancel() on the master */
3747
3748         switch (ms->m_result) {
3749         case -DLM_ECANCEL:
3750                 receive_flags_reply(lkb, ms);
3751                 revert_lock_pc(r, lkb);
3752                 queue_cast(r, lkb, -DLM_ECANCEL);
3753                 break;
3754         case 0:
3755                 break;
3756         default:
3757                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3758                           lkb->lkb_id, ms->m_result);
3759         }
3760  out:
3761         unlock_rsb(r);
3762         put_rsb(r);
3763 }
3764
3765 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3766 {
3767         struct dlm_lkb *lkb;
3768         int error;
3769
3770         error = find_lkb(ls, ms->m_remid, &lkb);
3771         if (error) {
3772                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3773                           ms->m_header.h_nodeid, ms->m_remid);
3774                 return;
3775         }
3776
3777         _receive_cancel_reply(lkb, ms);
3778         dlm_put_lkb(lkb);
3779 }
3780
3781 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3782 {
3783         struct dlm_lkb *lkb;
3784         struct dlm_rsb *r;
3785         int error, ret_nodeid;
3786
3787         error = find_lkb(ls, ms->m_lkid, &lkb);
3788         if (error) {
3789                 log_error(ls, "receive_lookup_reply no lkb");
3790                 return;
3791         }
3792
3793         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3794            FIXME: will a non-zero error ever be returned? */
3795
3796         r = lkb->lkb_resource;
3797         hold_rsb(r);
3798         lock_rsb(r);
3799
3800         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3801         if (error)
3802                 goto out;
3803
3804         ret_nodeid = ms->m_nodeid;
3805         if (ret_nodeid == dlm_our_nodeid()) {
3806                 r->res_nodeid = 0;
3807                 ret_nodeid = 0;
3808                 r->res_first_lkid = 0;
3809         } else {
3810                 /* set_master() will copy res_nodeid to lkb_nodeid */
3811                 r->res_nodeid = ret_nodeid;
3812         }
3813
3814         if (is_overlap(lkb)) {
3815                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3816                           lkb->lkb_id, lkb->lkb_flags);
3817                 queue_cast_overlap(r, lkb);
3818                 unhold_lkb(lkb); /* undoes create_lkb() */
3819                 goto out_list;
3820         }
3821
3822         _request_lock(r, lkb);
3823
3824  out_list:
3825         if (!ret_nodeid)
3826                 process_lookup_list(r);
3827  out:
3828         unlock_rsb(r);
3829         put_rsb(r);
3830         dlm_put_lkb(lkb);
3831 }
3832
3833 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3834 {
3835         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3836                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3837                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3838                           ms->m_remid, ms->m_result);
3839                 return;
3840         }
3841
3842         switch (ms->m_type) {
3843
3844         /* messages sent to a master node */
3845
3846         case DLM_MSG_REQUEST:
3847                 receive_request(ls, ms);
3848                 break;
3849
3850         case DLM_MSG_CONVERT:
3851                 receive_convert(ls, ms);
3852                 break;
3853
3854         case DLM_MSG_UNLOCK:
3855                 receive_unlock(ls, ms);
3856                 break;
3857
3858         case DLM_MSG_CANCEL:
3859                 receive_cancel(ls, ms);
3860                 break;
3861
3862         /* messages sent from a master node (replies to above) */
3863
3864         case DLM_MSG_REQUEST_REPLY:
3865                 receive_request_reply(ls, ms);
3866                 break;
3867
3868         case DLM_MSG_CONVERT_REPLY:
3869                 receive_convert_reply(ls, ms);
3870                 break;
3871
3872         case DLM_MSG_UNLOCK_REPLY:
3873                 receive_unlock_reply(ls, ms);
3874                 break;
3875
3876         case DLM_MSG_CANCEL_REPLY:
3877                 receive_cancel_reply(ls, ms);
3878                 break;
3879
3880         /* messages sent from a master node (only two types of async msg) */
3881
3882         case DLM_MSG_GRANT:
3883                 receive_grant(ls, ms);
3884                 break;
3885
3886         case DLM_MSG_BAST:
3887                 receive_bast(ls, ms);
3888                 break;
3889
3890         /* messages sent to a dir node */
3891
3892         case DLM_MSG_LOOKUP:
3893                 receive_lookup(ls, ms);
3894                 break;
3895
3896         case DLM_MSG_REMOVE:
3897                 receive_remove(ls, ms);
3898                 break;
3899
3900         /* messages sent from a dir node (remove has no reply) */
3901
3902         case DLM_MSG_LOOKUP_REPLY:
3903                 receive_lookup_reply(ls, ms);
3904                 break;
3905
3906         /* other messages */
3907
3908         case DLM_MSG_PURGE:
3909                 receive_purge(ls, ms);
3910                 break;
3911
3912         default:
3913                 log_error(ls, "unknown message type %d", ms->m_type);
3914         }
3915
3916         dlm_astd_wake();
3917 }
3918
3919 /* If the lockspace is in recovery mode (locking stopped), then normal
3920    messages are saved on the requestqueue for processing after recovery is
3921    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3922    messages off the requestqueue before we process new ones. This occurs right
3923    after recovery completes when we transition from saving all messages on
3924    requestqueue, to processing all the saved messages, to processing new
3925    messages as they arrive. */
3926
3927 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3928                                 int nodeid)
3929 {
3930         if (dlm_locking_stopped(ls)) {
3931                 dlm_add_requestqueue(ls, nodeid, ms);
3932         } else {
3933                 dlm_wait_requestqueue(ls);
3934                 _receive_message(ls, ms);
3935         }
3936 }
3937
3938 /* This is called by dlm_recoverd to process messages that were saved on
3939    the requestqueue. */
3940
3941 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3942 {
3943         _receive_message(ls, ms);
3944 }
3945
3946 /* This is called by the midcomms layer when something is received for
3947    the lockspace.  It could be either a MSG (normal message sent as part of
3948    standard locking activity) or an RCOM (recovery message sent as part of
3949    lockspace recovery). */
3950
3951 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3952 {
3953         struct dlm_header *hd = &p->header;
3954         struct dlm_ls *ls;
3955         int type = 0;
3956
3957         switch (hd->h_cmd) {
3958         case DLM_MSG:
3959                 dlm_message_in(&p->message);
3960                 type = p->message.m_type;
3961                 break;
3962         case DLM_RCOM:
3963                 dlm_rcom_in(&p->rcom);
3964                 type = p->rcom.rc_type;
3965                 break;
3966         default:
3967                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3968                 return;
3969         }
3970
3971         if (hd->h_nodeid != nodeid) {
3972                 log_print("invalid h_nodeid %d from %d lockspace %x",
3973                           hd->h_nodeid, nodeid, hd->h_lockspace);
3974                 return;
3975         }
3976
3977         ls = dlm_find_lockspace_global(hd->h_lockspace);
3978         if (!ls) {
3979                 if (dlm_config.ci_log_debug)
3980                         log_print("invalid lockspace %x from %d cmd %d type %d",
3981                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3982
3983                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3984                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3985                 return;
3986         }
3987
3988         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3989            be inactive (in this ls) before transitioning to recovery mode */
3990
3991         down_read(&ls->ls_recv_active);
3992         if (hd->h_cmd == DLM_MSG)
3993                 dlm_receive_message(ls, &p->message, nodeid);
3994         else
3995                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3996         up_read(&ls->ls_recv_active);
3997
3998         dlm_put_lockspace(ls);
3999 }
4000
4001 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4002 {
4003         if (middle_conversion(lkb)) {
4004                 hold_lkb(lkb);
4005                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4006                 ls->ls_stub_ms.m_result = -EINPROGRESS;
4007                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4008                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4009                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4010
4011                 /* Same special case as in receive_rcom_lock_args() */
4012                 lkb->lkb_grmode = DLM_LOCK_IV;
4013                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4014                 unhold_lkb(lkb);
4015
4016         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4017                 lkb->lkb_flags |= DLM_IFL_RESEND;
4018         }
4019
4020         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4021            conversions are async; there's no reply from the remote master */
4022 }
4023
4024 /* A waiting lkb needs recovery if the master node has failed, or
4025    the master node is changing (only when no directory is used) */
4026
4027 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4028 {
4029         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4030                 return 1;
4031
4032         if (!dlm_no_directory(ls))
4033                 return 0;
4034
4035         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4036                 return 1;
4037
4038         return 0;
4039 }
4040
4041 /* Recovery for locks that are waiting for replies from nodes that are now
4042    gone.  We can just complete unlocks and cancels by faking a reply from the
4043    dead node.  Requests and up-conversions we flag to be resent after
4044    recovery.  Down-conversions can just be completed with a fake reply like
4045    unlocks.  Conversions between PR and CW need special attention. */
4046
4047 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4048 {
4049         struct dlm_lkb *lkb, *safe;
4050         int wait_type, stub_unlock_result, stub_cancel_result;
4051
4052         mutex_lock(&ls->ls_waiters_mutex);
4053
4054         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4055                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4056                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4057
4058                 /* all outstanding lookups, regardless of destination  will be
4059                    resent after recovery is done */
4060
4061                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4062                         lkb->lkb_flags |= DLM_IFL_RESEND;
4063                         continue;
4064                 }
4065
4066                 if (!waiter_needs_recovery(ls, lkb))
4067                         continue;
4068
4069                 wait_type = lkb->lkb_wait_type;
4070                 stub_unlock_result = -DLM_EUNLOCK;
4071                 stub_cancel_result = -DLM_ECANCEL;
4072
4073                 /* Main reply may have been received leaving a zero wait_type,
4074                    but a reply for the overlapping op may not have been
4075                    received.  In that case we need to fake the appropriate
4076                    reply for the overlap op. */
4077
4078                 if (!wait_type) {
4079                         if (is_overlap_cancel(lkb)) {
4080                                 wait_type = DLM_MSG_CANCEL;
4081                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4082                                         stub_cancel_result = 0;
4083                         }
4084                         if (is_overlap_unlock(lkb)) {
4085                                 wait_type = DLM_MSG_UNLOCK;
4086                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4087                                         stub_unlock_result = -ENOENT;
4088                         }
4089
4090                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4091                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4092                                   stub_cancel_result, stub_unlock_result);
4093                 }
4094
4095                 switch (wait_type) {
4096
4097                 case DLM_MSG_REQUEST:
4098                         lkb->lkb_flags |= DLM_IFL_RESEND;
4099                         break;
4100
4101                 case DLM_MSG_CONVERT:
4102                         recover_convert_waiter(ls, lkb);
4103                         break;
4104
4105                 case DLM_MSG_UNLOCK:
4106                         hold_lkb(lkb);
4107                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4108                         ls->ls_stub_ms.m_result = stub_unlock_result;
4109                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4110                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4111                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4112                         dlm_put_lkb(lkb);
4113                         break;
4114
4115                 case DLM_MSG_CANCEL:
4116                         hold_lkb(lkb);
4117                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4118                         ls->ls_stub_ms.m_result = stub_cancel_result;
4119                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4120                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4121                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4122                         dlm_put_lkb(lkb);
4123                         break;
4124
4125                 default:
4126                         log_error(ls, "invalid lkb wait_type %d %d",
4127                                   lkb->lkb_wait_type, wait_type);
4128                 }
4129                 schedule();
4130         }
4131         mutex_unlock(&ls->ls_waiters_mutex);
4132 }
4133
4134 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4135 {
4136         struct dlm_lkb *lkb;
4137         int found = 0;
4138
4139         mutex_lock(&ls->ls_waiters_mutex);
4140         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4141                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4142                         hold_lkb(lkb);
4143                         found = 1;
4144                         break;
4145                 }
4146         }
4147         mutex_unlock(&ls->ls_waiters_mutex);
4148
4149         if (!found)
4150                 lkb = NULL;
4151         return lkb;
4152 }
4153
4154 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4155    master or dir-node for r.  Processing the lkb may result in it being placed
4156    back on waiters. */
4157
4158 /* We do this after normal locking has been enabled and any saved messages
4159    (in requestqueue) have been processed.  We should be confident that at
4160    this point we won't get or process a reply to any of these waiting
4161    operations.  But, new ops may be coming in on the rsbs/locks here from
4162    userspace or remotely. */
4163
4164 /* there may have been an overlap unlock/cancel prior to recovery or after
4165    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4166    overlap flag would just have been set and nothing new sent.  we can be
4167    confident here than any replies to either the initial op or overlap ops
4168    prior to recovery have been received. */
4169
4170 int dlm_recover_waiters_post(struct dlm_ls *ls)
4171 {
4172         struct dlm_lkb *lkb;
4173         struct dlm_rsb *r;
4174         int error = 0, mstype, err, oc, ou;
4175
4176         while (1) {
4177                 if (dlm_locking_stopped(ls)) {
4178                         log_debug(ls, "recover_waiters_post aborted");
4179                         error = -EINTR;
4180                         break;
4181                 }
4182
4183                 lkb = find_resend_waiter(ls);
4184                 if (!lkb)
4185                         break;
4186
4187                 r = lkb->lkb_resource;
4188                 hold_rsb(r);
4189                 lock_rsb(r);
4190
4191                 mstype = lkb->lkb_wait_type;
4192                 oc = is_overlap_cancel(lkb);
4193                 ou = is_overlap_unlock(lkb);
4194                 err = 0;
4195
4196                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4197                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4198
4199                 /* At this point we assume that we won't get a reply to any
4200                    previous op or overlap op on this lock.  First, do a big
4201                    remove_from_waiters() for all previous ops. */
4202
4203                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4204                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4205                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4206                 lkb->lkb_wait_type = 0;
4207                 lkb->lkb_wait_count = 0;
4208                 mutex_lock(&ls->ls_waiters_mutex);
4209                 list_del_init(&lkb->lkb_wait_reply);
4210                 mutex_unlock(&ls->ls_waiters_mutex);
4211                 unhold_lkb(lkb); /* for waiters list */
4212
4213                 if (oc || ou) {
4214                         /* do an unlock or cancel instead of resending */
4215                         switch (mstype) {
4216                         case DLM_MSG_LOOKUP:
4217                         case DLM_MSG_REQUEST:
4218                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4219                                                         -DLM_ECANCEL);
4220                                 unhold_lkb(lkb); /* undoes create_lkb() */
4221                                 break;
4222                         case DLM_MSG_CONVERT:
4223                                 if (oc) {
4224                                         queue_cast(r, lkb, -DLM_ECANCEL);
4225                                 } else {
4226                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4227                                         _unlock_lock(r, lkb);
4228                                 }
4229                                 break;
4230                         default:
4231                                 err = 1;
4232                         }
4233                 } else {
4234                         switch (mstype) {
4235                         case DLM_MSG_LOOKUP:
4236                         case DLM_MSG_REQUEST:
4237                                 _request_lock(r, lkb);
4238                                 if (is_master(r))
4239                                         confirm_master(r, 0);
4240                                 break;
4241                         case DLM_MSG_CONVERT:
4242                                 _convert_lock(r, lkb);
4243                                 break;
4244                         default:
4245                                 err = 1;
4246                         }
4247                 }
4248
4249                 if (err)
4250                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4251                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4252                 unlock_rsb(r);
4253                 put_rsb(r);
4254                 dlm_put_lkb(lkb);
4255         }
4256
4257         return error;
4258 }
4259
4260 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4261                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4262 {
4263         struct dlm_ls *ls = r->res_ls;
4264         struct dlm_lkb *lkb, *safe;
4265
4266         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4267                 if (test(ls, lkb)) {
4268                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4269                         del_lkb(r, lkb);
4270                         /* this put should free the lkb */
4271                         if (!dlm_put_lkb(lkb))
4272                                 log_error(ls, "purged lkb not released");
4273                 }
4274         }
4275 }
4276
4277 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4278 {
4279         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4280 }
4281
4282 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4283 {
4284         return is_master_copy(lkb);
4285 }
4286
4287 static void purge_dead_locks(struct dlm_rsb *r)
4288 {
4289         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4290         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4291         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4292 }
4293
4294 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4295 {
4296         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4297         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4298         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4299 }
4300
4301 /* Get rid of locks held by nodes that are gone. */
4302
4303 int dlm_purge_locks(struct dlm_ls *ls)
4304 {
4305         struct dlm_rsb *r;
4306
4307         log_debug(ls, "dlm_purge_locks");
4308
4309         down_write(&ls->ls_root_sem);
4310         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4311                 hold_rsb(r);
4312                 lock_rsb(r);
4313                 if (is_master(r))
4314                         purge_dead_locks(r);
4315                 unlock_rsb(r);
4316                 unhold_rsb(r);
4317
4318                 schedule();
4319         }
4320         up_write(&ls->ls_root_sem);
4321
4322         return 0;
4323 }
4324
4325 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4326 {
4327         struct dlm_rsb *r, *r_ret = NULL;
4328
4329         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4330         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4331                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4332                         continue;
4333                 hold_rsb(r);
4334                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4335                 r_ret = r;
4336                 break;
4337         }
4338         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4339         return r_ret;
4340 }
4341
4342 void dlm_grant_after_purge(struct dlm_ls *ls)
4343 {
4344         struct dlm_rsb *r;
4345         int bucket = 0;
4346
4347         while (1) {
4348                 r = find_purged_rsb(ls, bucket);
4349                 if (!r) {
4350                         if (bucket == ls->ls_rsbtbl_size - 1)
4351                                 break;
4352                         bucket++;
4353                         continue;
4354                 }
4355                 lock_rsb(r);
4356                 if (is_master(r)) {
4357                         grant_pending_locks(r);
4358                         confirm_master(r, 0);
4359                 }
4360                 unlock_rsb(r);
4361                 put_rsb(r);
4362                 schedule();
4363         }
4364 }
4365
4366 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4367                                          uint32_t remid)
4368 {
4369         struct dlm_lkb *lkb;
4370
4371         list_for_each_entry(lkb, head, lkb_statequeue) {
4372                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4373                         return lkb;
4374         }
4375         return NULL;
4376 }
4377
4378 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4379                                     uint32_t remid)
4380 {
4381         struct dlm_lkb *lkb;
4382
4383         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4384         if (lkb)
4385                 return lkb;
4386         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4387         if (lkb)
4388                 return lkb;
4389         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4390         if (lkb)
4391                 return lkb;
4392         return NULL;
4393 }
4394
4395 /* needs at least dlm_rcom + rcom_lock */
4396 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4397                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4398 {
4399         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4400
4401         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4402         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4403         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4404         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4405         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4406         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4407         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4408         lkb->lkb_rqmode = rl->rl_rqmode;
4409         lkb->lkb_grmode = rl->rl_grmode;
4410         /* don't set lkb_status because add_lkb wants to itself */
4411
4412         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4413         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4414
4415         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4416                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4417                          sizeof(struct rcom_lock);
4418                 if (lvblen > ls->ls_lvblen)
4419                         return -EINVAL;
4420                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4421                 if (!lkb->lkb_lvbptr)
4422                         return -ENOMEM;
4423                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4424         }
4425
4426         /* Conversions between PR and CW (middle modes) need special handling.
4427            The real granted mode of these converting locks cannot be determined
4428            until all locks have been rebuilt on the rsb (recover_conversion) */
4429
4430         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4431             middle_conversion(lkb)) {
4432                 rl->rl_status = DLM_LKSTS_CONVERT;
4433                 lkb->lkb_grmode = DLM_LOCK_IV;
4434                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4435         }
4436
4437         return 0;
4438 }
4439
4440 /* This lkb may have been recovered in a previous aborted recovery so we need
4441    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4442    If so we just send back a standard reply.  If not, we create a new lkb with
4443    the given values and send back our lkid.  We send back our lkid by sending
4444    back the rcom_lock struct we got but with the remid field filled in. */
4445
4446 /* needs at least dlm_rcom + rcom_lock */
4447 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4448 {
4449         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4450         struct dlm_rsb *r;
4451         struct dlm_lkb *lkb;
4452         int error;
4453
4454         if (rl->rl_parent_lkid) {
4455                 error = -EOPNOTSUPP;
4456                 goto out;
4457         }
4458
4459         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4460                          R_MASTER, &r);
4461         if (error)
4462                 goto out;
4463
4464         lock_rsb(r);
4465
4466         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4467         if (lkb) {
4468                 error = -EEXIST;
4469                 goto out_remid;
4470         }
4471
4472         error = create_lkb(ls, &lkb);
4473         if (error)
4474                 goto out_unlock;
4475
4476         error = receive_rcom_lock_args(ls, lkb, r, rc);
4477         if (error) {
4478                 __put_lkb(ls, lkb);
4479                 goto out_unlock;
4480         }
4481
4482         attach_lkb(r, lkb);
4483         add_lkb(r, lkb, rl->rl_status);
4484         error = 0;
4485
4486  out_remid:
4487         /* this is the new value returned to the lock holder for
4488            saving in its process-copy lkb */
4489         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4490
4491  out_unlock:
4492         unlock_rsb(r);
4493         put_rsb(r);
4494  out:
4495         if (error)
4496                 log_debug(ls, "recover_master_copy %d %x", error,
4497                           le32_to_cpu(rl->rl_lkid));
4498         rl->rl_result = cpu_to_le32(error);
4499         return error;
4500 }
4501
4502 /* needs at least dlm_rcom + rcom_lock */
4503 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4504 {
4505         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4506         struct dlm_rsb *r;
4507         struct dlm_lkb *lkb;
4508         int error;
4509
4510         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4511         if (error) {
4512                 log_error(ls, "recover_process_copy no lkid %x",
4513                                 le32_to_cpu(rl->rl_lkid));
4514                 return error;
4515         }
4516
4517         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4518
4519         error = le32_to_cpu(rl->rl_result);
4520
4521         r = lkb->lkb_resource;
4522         hold_rsb(r);
4523         lock_rsb(r);
4524
4525         switch (error) {
4526         case -EBADR:
4527                 /* There's a chance the new master received our lock before
4528                    dlm_recover_master_reply(), this wouldn't happen if we did
4529                    a barrier between recover_masters and recover_locks. */
4530                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4531                           (unsigned long)r, r->res_name);
4532                 dlm_send_rcom_lock(r, lkb);
4533                 goto out;
4534         case -EEXIST:
4535                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4536                 /* fall through */
4537         case 0:
4538                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4539                 break;
4540         default:
4541                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4542                           error, lkb->lkb_id);
4543         }
4544
4545         /* an ack for dlm_recover_locks() which waits for replies from
4546            all the locks it sends to new masters */
4547         dlm_recovered_lock(r);
4548  out:
4549         unlock_rsb(r);
4550         put_rsb(r);
4551         dlm_put_lkb(lkb);
4552
4553         return 0;
4554 }
4555
4556 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4557                      int mode, uint32_t flags, void *name, unsigned int namelen,
4558                      unsigned long timeout_cs)
4559 {
4560         struct dlm_lkb *lkb;
4561         struct dlm_args args;
4562         int error;
4563
4564         dlm_lock_recovery(ls);
4565
4566         error = create_lkb(ls, &lkb);
4567         if (error) {
4568                 kfree(ua);
4569                 goto out;
4570         }
4571
4572         if (flags & DLM_LKF_VALBLK) {
4573                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4574                 if (!ua->lksb.sb_lvbptr) {
4575                         kfree(ua);
4576                         __put_lkb(ls, lkb);
4577                         error = -ENOMEM;
4578                         goto out;
4579                 }
4580         }
4581
4582         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4583            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4584            lock and that lkb_astparam is the dlm_user_args structure. */
4585
4586         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4587                               fake_astfn, ua, fake_bastfn, &args);
4588         lkb->lkb_flags |= DLM_IFL_USER;
4589         ua->old_mode = DLM_LOCK_IV;
4590
4591         if (error) {
4592                 __put_lkb(ls, lkb);
4593                 goto out;
4594         }
4595
4596         error = request_lock(ls, lkb, name, namelen, &args);
4597
4598         switch (error) {
4599         case 0:
4600                 break;
4601         case -EINPROGRESS:
4602                 error = 0;
4603                 break;
4604         case -EAGAIN:
4605                 error = 0;
4606                 /* fall through */
4607         default:
4608                 __put_lkb(ls, lkb);
4609                 goto out;
4610         }
4611
4612         /* add this new lkb to the per-process list of locks */
4613         spin_lock(&ua->proc->locks_spin);
4614         hold_lkb(lkb);
4615         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4616         spin_unlock(&ua->proc->locks_spin);
4617  out:
4618         dlm_unlock_recovery(ls);
4619         return error;
4620 }
4621
4622 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4623                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4624                      unsigned long timeout_cs)
4625 {
4626         struct dlm_lkb *lkb;
4627         struct dlm_args args;
4628         struct dlm_user_args *ua;
4629         int error;
4630
4631         dlm_lock_recovery(ls);
4632
4633         error = find_lkb(ls, lkid, &lkb);
4634         if (error)
4635                 goto out;
4636
4637         /* user can change the params on its lock when it converts it, or
4638            add an lvb that didn't exist before */
4639
4640         ua = lkb->lkb_ua;
4641
4642         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4643                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4644                 if (!ua->lksb.sb_lvbptr) {
4645                         error = -ENOMEM;
4646                         goto out_put;
4647                 }
4648         }
4649         if (lvb_in && ua->lksb.sb_lvbptr)
4650                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4651
4652         ua->xid = ua_tmp->xid;
4653         ua->castparam = ua_tmp->castparam;
4654         ua->castaddr = ua_tmp->castaddr;
4655         ua->bastparam = ua_tmp->bastparam;
4656         ua->bastaddr = ua_tmp->bastaddr;
4657         ua->user_lksb = ua_tmp->user_lksb;
4658         ua->old_mode = lkb->lkb_grmode;
4659
4660         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4661                               fake_astfn, ua, fake_bastfn, &args);
4662         if (error)
4663                 goto out_put;
4664
4665         error = convert_lock(ls, lkb, &args);
4666
4667         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4668                 error = 0;
4669  out_put:
4670         dlm_put_lkb(lkb);
4671  out:
4672         dlm_unlock_recovery(ls);
4673         kfree(ua_tmp);
4674         return error;
4675 }
4676
4677 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4678                     uint32_t flags, uint32_t lkid, char *lvb_in)
4679 {
4680         struct dlm_lkb *lkb;
4681         struct dlm_args args;
4682         struct dlm_user_args *ua;
4683         int error;
4684
4685         dlm_lock_recovery(ls);
4686
4687         error = find_lkb(ls, lkid, &lkb);
4688         if (error)
4689                 goto out;
4690
4691         ua = lkb->lkb_ua;
4692
4693         if (lvb_in && ua->lksb.sb_lvbptr)
4694                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4695         if (ua_tmp->castparam)
4696                 ua->castparam = ua_tmp->castparam;
4697         ua->user_lksb = ua_tmp->user_lksb;
4698
4699         error = set_unlock_args(flags, ua, &args);
4700         if (error)
4701                 goto out_put;
4702
4703         error = unlock_lock(ls, lkb, &args);
4704
4705         if (error == -DLM_EUNLOCK)
4706                 error = 0;
4707         /* from validate_unlock_args() */
4708         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4709                 error = 0;
4710         if (error)
4711                 goto out_put;
4712
4713         spin_lock(&ua->proc->locks_spin);
4714         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4715         if (!list_empty(&lkb->lkb_ownqueue))
4716                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4717         spin_unlock(&ua->proc->locks_spin);
4718  out_put:
4719         dlm_put_lkb(lkb);
4720  out:
4721         dlm_unlock_recovery(ls);
4722         kfree(ua_tmp);
4723         return error;
4724 }
4725
4726 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4727                     uint32_t flags, uint32_t lkid)
4728 {
4729         struct dlm_lkb *lkb;
4730         struct dlm_args args;
4731         struct dlm_user_args *ua;
4732         int error;
4733
4734         dlm_lock_recovery(ls);
4735
4736         error = find_lkb(ls, lkid, &lkb);
4737         if (error)
4738                 goto out;
4739
4740         ua = lkb->lkb_ua;
4741         if (ua_tmp->castparam)
4742                 ua->castparam = ua_tmp->castparam;
4743         ua->user_lksb = ua_tmp->user_lksb;
4744
4745         error = set_unlock_args(flags, ua, &args);
4746         if (error)
4747                 goto out_put;
4748
4749         error = cancel_lock(ls, lkb, &args);
4750
4751         if (error == -DLM_ECANCEL)
4752                 error = 0;
4753         /* from validate_unlock_args() */
4754         if (error == -EBUSY)
4755                 error = 0;
4756  out_put:
4757         dlm_put_lkb(lkb);
4758  out:
4759         dlm_unlock_recovery(ls);
4760         kfree(ua_tmp);
4761         return error;
4762 }
4763
4764 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4765 {
4766         struct dlm_lkb *lkb;
4767         struct dlm_args args;
4768         struct dlm_user_args *ua;
4769         struct dlm_rsb *r;
4770         int error;
4771
4772         dlm_lock_recovery(ls);
4773
4774         error = find_lkb(ls, lkid, &lkb);
4775         if (error)
4776                 goto out;
4777
4778         ua = lkb->lkb_ua;
4779
4780         error = set_unlock_args(flags, ua, &args);
4781         if (error)
4782                 goto out_put;
4783
4784         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4785
4786         r = lkb->lkb_resource;
4787         hold_rsb(r);
4788         lock_rsb(r);
4789
4790         error = validate_unlock_args(lkb, &args);
4791         if (error)
4792                 goto out_r;
4793         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4794
4795         error = _cancel_lock(r, lkb);
4796  out_r:
4797         unlock_rsb(r);
4798         put_rsb(r);
4799
4800         if (error == -DLM_ECANCEL)
4801                 error = 0;
4802         /* from validate_unlock_args() */
4803         if (error == -EBUSY)
4804                 error = 0;
4805  out_put:
4806         dlm_put_lkb(lkb);
4807  out:
4808         dlm_unlock_recovery(ls);
4809         return error;
4810 }
4811
4812 /* lkb's that are removed from the waiters list by revert are just left on the
4813    orphans list with the granted orphan locks, to be freed by purge */
4814
4815 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4816 {
4817         struct dlm_args args;
4818         int error;
4819
4820         hold_lkb(lkb);
4821         mutex_lock(&ls->ls_orphans_mutex);
4822         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4823         mutex_unlock(&ls->ls_orphans_mutex);
4824
4825         set_unlock_args(0, lkb->lkb_ua, &args);
4826
4827         error = cancel_lock(ls, lkb, &args);
4828         if (error == -DLM_ECANCEL)
4829                 error = 0;
4830         return error;
4831 }
4832
4833 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4834    Regardless of what rsb queue the lock is on, it's removed and freed. */
4835
4836 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4837 {
4838         struct dlm_args args;
4839         int error;
4840
4841         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4842
4843         error = unlock_lock(ls, lkb, &args);
4844         if (error == -DLM_EUNLOCK)
4845                 error = 0;
4846         return error;
4847 }
4848
4849 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4850    (which does lock_rsb) due to deadlock with receiving a message that does
4851    lock_rsb followed by dlm_user_add_ast() */
4852
4853 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4854                                      struct dlm_user_proc *proc)
4855 {
4856         struct dlm_lkb *lkb = NULL;
4857
4858         mutex_lock(&ls->ls_clear_proc_locks);
4859         if (list_empty(&proc->locks))
4860                 goto out;
4861
4862         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4863         list_del_init(&lkb->lkb_ownqueue);
4864
4865         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4866                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4867         else
4868                 lkb->lkb_flags |= DLM_IFL_DEAD;
4869  out:
4870         mutex_unlock(&ls->ls_clear_proc_locks);
4871         return lkb;
4872 }
4873
4874 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4875    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4876    which we clear here. */
4877
4878 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4879    list, and no more device_writes should add lkb's to proc->locks list; so we
4880    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4881    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4882    them ourself. */
4883
4884 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4885 {
4886         struct dlm_lkb *lkb, *safe;
4887
4888         dlm_lock_recovery(ls);
4889
4890         while (1) {
4891                 lkb = del_proc_lock(ls, proc);
4892                 if (!lkb)
4893                         break;
4894                 del_timeout(lkb);
4895                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4896                         orphan_proc_lock(ls, lkb);
4897                 else
4898                         unlock_proc_lock(ls, lkb);
4899
4900                 /* this removes the reference for the proc->locks list
4901                    added by dlm_user_request, it may result in the lkb
4902                    being freed */
4903
4904                 dlm_put_lkb(lkb);
4905         }
4906
4907         mutex_lock(&ls->ls_clear_proc_locks);
4908
4909         /* in-progress unlocks */
4910         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4911                 list_del_init(&lkb->lkb_ownqueue);
4912                 lkb->lkb_flags |= DLM_IFL_DEAD;
4913                 dlm_put_lkb(lkb);
4914         }
4915
4916         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4917                 lkb->lkb_ast_type = 0;
4918                 list_del(&lkb->lkb_astqueue);
4919                 dlm_put_lkb(lkb);
4920         }
4921
4922         mutex_unlock(&ls->ls_clear_proc_locks);
4923         dlm_unlock_recovery(ls);
4924 }
4925
4926 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4927 {
4928         struct dlm_lkb *lkb, *safe;
4929
4930         while (1) {
4931                 lkb = NULL;
4932                 spin_lock(&proc->locks_spin);
4933                 if (!list_empty(&proc->locks)) {
4934                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4935                                          lkb_ownqueue);
4936                         list_del_init(&lkb->lkb_ownqueue);
4937                 }
4938                 spin_unlock(&proc->locks_spin);
4939
4940                 if (!lkb)
4941                         break;
4942
4943                 lkb->lkb_flags |= DLM_IFL_DEAD;
4944                 unlock_proc_lock(ls, lkb);
4945                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4946         }
4947
4948         spin_lock(&proc->locks_spin);
4949         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4950                 list_del_init(&lkb->lkb_ownqueue);
4951                 lkb->lkb_flags |= DLM_IFL_DEAD;
4952                 dlm_put_lkb(lkb);
4953         }
4954         spin_unlock(&proc->locks_spin);
4955
4956         spin_lock(&proc->asts_spin);
4957         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4958                 list_del(&lkb->lkb_astqueue);
4959                 dlm_put_lkb(lkb);
4960         }
4961         spin_unlock(&proc->asts_spin);
4962 }
4963
4964 /* pid of 0 means purge all orphans */
4965
4966 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4967 {
4968         struct dlm_lkb *lkb, *safe;
4969
4970         mutex_lock(&ls->ls_orphans_mutex);
4971         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4972                 if (pid && lkb->lkb_ownpid != pid)
4973                         continue;
4974                 unlock_proc_lock(ls, lkb);
4975                 list_del_init(&lkb->lkb_ownqueue);
4976                 dlm_put_lkb(lkb);
4977         }
4978         mutex_unlock(&ls->ls_orphans_mutex);
4979 }
4980
4981 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4982 {
4983         struct dlm_message *ms;
4984         struct dlm_mhandle *mh;
4985         int error;
4986
4987         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4988                                 DLM_MSG_PURGE, &ms, &mh);
4989         if (error)
4990                 return error;
4991         ms->m_nodeid = nodeid;
4992         ms->m_pid = pid;
4993
4994         return send_message(mh, ms);
4995 }
4996
4997 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4998                    int nodeid, int pid)
4999 {
5000         int error = 0;
5001
5002         if (nodeid != dlm_our_nodeid()) {
5003                 error = send_purge(ls, nodeid, pid);
5004         } else {
5005                 dlm_lock_recovery(ls);
5006                 if (pid == current->pid)
5007                         purge_proc_locks(ls, proc);
5008                 else
5009                         do_purge(ls, nodeid, pid);
5010                 dlm_unlock_recovery(ls);
5011         }
5012         return error;
5013 }
5014