ocfs2: Create the lock status block union.
[linux-2.6.git] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/crc32.h>
31 #include <linux/kthread.h>
32 #include <linux/pagemap.h>
33 #include <linux/debugfs.h>
34 #include <linux/seq_file.h>
35
36 #include <cluster/heartbeat.h>
37 #include <cluster/nodemanager.h>
38 #include <cluster/tcp.h>
39
40 #define MLOG_MASK_PREFIX ML_DLM_GLUE
41 #include <cluster/masklog.h>
42
43 #include "ocfs2.h"
44 #include "ocfs2_lockingver.h"
45
46 #include "alloc.h"
47 #include "dcache.h"
48 #include "dlmglue.h"
49 #include "extent_map.h"
50 #include "file.h"
51 #include "heartbeat.h"
52 #include "inode.h"
53 #include "journal.h"
54 #include "stackglue.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58
59 #include "buffer_head_io.h"
60
61 struct ocfs2_mask_waiter {
62         struct list_head        mw_item;
63         int                     mw_status;
64         struct completion       mw_complete;
65         unsigned long           mw_mask;
66         unsigned long           mw_goal;
67 };
68
69 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
70 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ->downconvert_worker functions.
75  *
76  * These control the precise actions of ocfs2_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
94                                         int new_level);
95 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
96
97 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
98                                      int blocking);
99
100 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
101                                        int blocking);
102
103 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
104                                      struct ocfs2_lock_res *lockres);
105
106
107 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
108
109 /* This aids in debugging situations where a bad LVB might be involved. */
110 static void ocfs2_dump_meta_lvb_info(u64 level,
111                                      const char *function,
112                                      unsigned int line,
113                                      struct ocfs2_lock_res *lockres)
114 {
115         struct ocfs2_meta_lvb *lvb =
116                 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
117
118         mlog(level, "LVB information for %s (called from %s:%u):\n",
119              lockres->l_name, function, line);
120         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
121              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
122              be32_to_cpu(lvb->lvb_igeneration));
123         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
124              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
125              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
126              be16_to_cpu(lvb->lvb_imode));
127         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
128              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
129              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
130              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
131              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
132              be32_to_cpu(lvb->lvb_iattr));
133 }
134
135
136 /*
137  * OCFS2 Lock Resource Operations
138  *
139  * These fine tune the behavior of the generic dlmglue locking infrastructure.
140  *
141  * The most basic of lock types can point ->l_priv to their respective
142  * struct ocfs2_super and allow the default actions to manage things.
143  *
144  * Right now, each lock type also needs to implement an init function,
145  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
146  * should be called when the lock is no longer needed (i.e., object
147  * destruction time).
148  */
149 struct ocfs2_lock_res_ops {
150         /*
151          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
152          * this callback if ->l_priv is not an ocfs2_super pointer
153          */
154         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
155
156         /*
157          * Optionally called in the downconvert thread after a
158          * successful downconvert. The lockres will not be referenced
159          * after this callback is called, so it is safe to free
160          * memory, etc.
161          *
162          * The exact semantics of when this is called are controlled
163          * by ->downconvert_worker()
164          */
165         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
166
167         /*
168          * Allow a lock type to add checks to determine whether it is
169          * safe to downconvert a lock. Return 0 to re-queue the
170          * downconvert at a later time, nonzero to continue.
171          *
172          * For most locks, the default checks that there are no
173          * incompatible holders are sufficient.
174          *
175          * Called with the lockres spinlock held.
176          */
177         int (*check_downconvert)(struct ocfs2_lock_res *, int);
178
179         /*
180          * Allows a lock type to populate the lock value block. This
181          * is called on downconvert, and when we drop a lock.
182          *
183          * Locks that want to use this should set LOCK_TYPE_USES_LVB
184          * in the flags field.
185          *
186          * Called with the lockres spinlock held.
187          */
188         void (*set_lvb)(struct ocfs2_lock_res *);
189
190         /*
191          * Called from the downconvert thread when it is determined
192          * that a lock will be downconverted. This is called without
193          * any locks held so the function can do work that might
194          * schedule (syncing out data, etc).
195          *
196          * This should return any one of the ocfs2_unblock_action
197          * values, depending on what it wants the thread to do.
198          */
199         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
200
201         /*
202          * LOCK_TYPE_* flags which describe the specific requirements
203          * of a lock type. Descriptions of each individual flag follow.
204          */
205         int flags;
206 };
207
208 /*
209  * Some locks want to "refresh" potentially stale data when a
210  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
211  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
212  * individual lockres l_flags member from the ast function. It is
213  * expected that the locking wrapper will clear the
214  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
215  */
216 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
217
218 /*
219  * Indicate that a lock type makes use of the lock value block. The
220  * ->set_lvb lock type callback must be defined.
221  */
222 #define LOCK_TYPE_USES_LVB              0x2
223
224 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
225         .get_osb        = ocfs2_get_inode_osb,
226         .flags          = 0,
227 };
228
229 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
230         .get_osb        = ocfs2_get_inode_osb,
231         .check_downconvert = ocfs2_check_meta_downconvert,
232         .set_lvb        = ocfs2_set_meta_lvb,
233         .downconvert_worker = ocfs2_data_convert_worker,
234         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
235 };
236
237 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
238         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
239 };
240
241 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
242         .flags          = 0,
243 };
244
245 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
246         .get_osb        = ocfs2_get_dentry_osb,
247         .post_unlock    = ocfs2_dentry_post_unlock,
248         .downconvert_worker = ocfs2_dentry_convert_worker,
249         .flags          = 0,
250 };
251
252 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
253         .get_osb        = ocfs2_get_inode_osb,
254         .flags          = 0,
255 };
256
257 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
258         .get_osb        = ocfs2_get_file_osb,
259         .flags          = 0,
260 };
261
262 /*
263  * This is the filesystem locking protocol version.
264  *
265  * Whenever the filesystem does new things with locks (adds or removes a
266  * lock, orders them differently, does different things underneath a lock),
267  * the version must be changed.  The protocol is negotiated when joining
268  * the dlm domain.  A node may join the domain if its major version is
269  * identical to all other nodes and its minor version is greater than
270  * or equal to all other nodes.  When its minor version is greater than
271  * the other nodes, it will run at the minor version specified by the
272  * other nodes.
273  *
274  * If a locking change is made that will not be compatible with older
275  * versions, the major number must be increased and the minor version set
276  * to zero.  If a change merely adds a behavior that can be disabled when
277  * speaking to older versions, the minor version must be increased.  If a
278  * change adds a fully backwards compatible change (eg, LVB changes that
279  * are just ignored by older versions), the version does not need to be
280  * updated.
281  */
282 const struct dlm_protocol_version ocfs2_locking_protocol = {
283         .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
284         .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
285 };
286
287 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
288 {
289         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
290                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
291                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
292 }
293
294 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
295 {
296         BUG_ON(!ocfs2_is_inode_lock(lockres));
297
298         return (struct inode *) lockres->l_priv;
299 }
300
301 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
302 {
303         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
304
305         return (struct ocfs2_dentry_lock *)lockres->l_priv;
306 }
307
308 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
309 {
310         if (lockres->l_ops->get_osb)
311                 return lockres->l_ops->get_osb(lockres);
312
313         return (struct ocfs2_super *)lockres->l_priv;
314 }
315
316 static int ocfs2_lock_create(struct ocfs2_super *osb,
317                              struct ocfs2_lock_res *lockres,
318                              int level,
319                              u32 dlm_flags);
320 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
321                                                      int wanted);
322 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
323                                  struct ocfs2_lock_res *lockres,
324                                  int level);
325 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
326 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
327 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
328 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
329 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
330                                         struct ocfs2_lock_res *lockres);
331 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
332                                                 int convert);
333 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                 \
334         mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
335              _err, _func, _lockres->l_name);                            \
336 } while (0)
337 static int ocfs2_downconvert_thread(void *arg);
338 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
339                                         struct ocfs2_lock_res *lockres);
340 static int ocfs2_inode_lock_update(struct inode *inode,
341                                   struct buffer_head **bh);
342 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
343 static inline int ocfs2_highest_compat_lock_level(int level);
344 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
345                                       int new_level);
346 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
347                                   struct ocfs2_lock_res *lockres,
348                                   int new_level,
349                                   int lvb);
350 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
351                                         struct ocfs2_lock_res *lockres);
352 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
353                                 struct ocfs2_lock_res *lockres);
354
355
356 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
357                                   u64 blkno,
358                                   u32 generation,
359                                   char *name)
360 {
361         int len;
362
363         mlog_entry_void();
364
365         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
366
367         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
368                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
369                        (long long)blkno, generation);
370
371         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
372
373         mlog(0, "built lock resource with name: %s\n", name);
374
375         mlog_exit_void();
376 }
377
378 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
379
380 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
381                                        struct ocfs2_dlm_debug *dlm_debug)
382 {
383         mlog(0, "Add tracking for lockres %s\n", res->l_name);
384
385         spin_lock(&ocfs2_dlm_tracking_lock);
386         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
387         spin_unlock(&ocfs2_dlm_tracking_lock);
388 }
389
390 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
391 {
392         spin_lock(&ocfs2_dlm_tracking_lock);
393         if (!list_empty(&res->l_debug_list))
394                 list_del_init(&res->l_debug_list);
395         spin_unlock(&ocfs2_dlm_tracking_lock);
396 }
397
398 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
399                                        struct ocfs2_lock_res *res,
400                                        enum ocfs2_lock_type type,
401                                        struct ocfs2_lock_res_ops *ops,
402                                        void *priv)
403 {
404         res->l_type          = type;
405         res->l_ops           = ops;
406         res->l_priv          = priv;
407
408         res->l_level         = DLM_LOCK_IV;
409         res->l_requested     = DLM_LOCK_IV;
410         res->l_blocking      = DLM_LOCK_IV;
411         res->l_action        = OCFS2_AST_INVALID;
412         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
413
414         res->l_flags         = OCFS2_LOCK_INITIALIZED;
415
416         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
417 }
418
419 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
420 {
421         /* This also clears out the lock status block */
422         memset(res, 0, sizeof(struct ocfs2_lock_res));
423         spin_lock_init(&res->l_lock);
424         init_waitqueue_head(&res->l_event);
425         INIT_LIST_HEAD(&res->l_blocked_list);
426         INIT_LIST_HEAD(&res->l_mask_waiters);
427 }
428
429 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
430                                enum ocfs2_lock_type type,
431                                unsigned int generation,
432                                struct inode *inode)
433 {
434         struct ocfs2_lock_res_ops *ops;
435
436         switch(type) {
437                 case OCFS2_LOCK_TYPE_RW:
438                         ops = &ocfs2_inode_rw_lops;
439                         break;
440                 case OCFS2_LOCK_TYPE_META:
441                         ops = &ocfs2_inode_inode_lops;
442                         break;
443                 case OCFS2_LOCK_TYPE_OPEN:
444                         ops = &ocfs2_inode_open_lops;
445                         break;
446                 default:
447                         mlog_bug_on_msg(1, "type: %d\n", type);
448                         ops = NULL; /* thanks, gcc */
449                         break;
450         };
451
452         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
453                               generation, res->l_name);
454         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
455 }
456
457 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
458 {
459         struct inode *inode = ocfs2_lock_res_inode(lockres);
460
461         return OCFS2_SB(inode->i_sb);
462 }
463
464 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
465 {
466         struct ocfs2_file_private *fp = lockres->l_priv;
467
468         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
469 }
470
471 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
472 {
473         __be64 inode_blkno_be;
474
475         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
476                sizeof(__be64));
477
478         return be64_to_cpu(inode_blkno_be);
479 }
480
481 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
482 {
483         struct ocfs2_dentry_lock *dl = lockres->l_priv;
484
485         return OCFS2_SB(dl->dl_inode->i_sb);
486 }
487
488 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
489                                 u64 parent, struct inode *inode)
490 {
491         int len;
492         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
493         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
494         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
495
496         ocfs2_lock_res_init_once(lockres);
497
498         /*
499          * Unfortunately, the standard lock naming scheme won't work
500          * here because we have two 16 byte values to use. Instead,
501          * we'll stuff the inode number as a binary value. We still
502          * want error prints to show something without garbling the
503          * display, so drop a null byte in there before the inode
504          * number. A future version of OCFS2 will likely use all
505          * binary lock names. The stringified names have been a
506          * tremendous aid in debugging, but now that the debugfs
507          * interface exists, we can mangle things there if need be.
508          *
509          * NOTE: We also drop the standard "pad" value (the total lock
510          * name size stays the same though - the last part is all
511          * zeros due to the memset in ocfs2_lock_res_init_once()
512          */
513         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
514                        "%c%016llx",
515                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
516                        (long long)parent);
517
518         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
519
520         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
521                sizeof(__be64));
522
523         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
524                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
525                                    dl);
526 }
527
528 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
529                                       struct ocfs2_super *osb)
530 {
531         /* Superblock lockres doesn't come from a slab so we call init
532          * once on it manually.  */
533         ocfs2_lock_res_init_once(res);
534         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
535                               0, res->l_name);
536         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
537                                    &ocfs2_super_lops, osb);
538 }
539
540 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
541                                        struct ocfs2_super *osb)
542 {
543         /* Rename lockres doesn't come from a slab so we call init
544          * once on it manually.  */
545         ocfs2_lock_res_init_once(res);
546         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
547         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
548                                    &ocfs2_rename_lops, osb);
549 }
550
551 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
552                               struct ocfs2_file_private *fp)
553 {
554         struct inode *inode = fp->fp_file->f_mapping->host;
555         struct ocfs2_inode_info *oi = OCFS2_I(inode);
556
557         ocfs2_lock_res_init_once(lockres);
558         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
559                               inode->i_generation, lockres->l_name);
560         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
561                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
562                                    fp);
563         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
564 }
565
566 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
567 {
568         mlog_entry_void();
569
570         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
571                 return;
572
573         ocfs2_remove_lockres_tracking(res);
574
575         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
576                         "Lockres %s is on the blocked list\n",
577                         res->l_name);
578         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
579                         "Lockres %s has mask waiters pending\n",
580                         res->l_name);
581         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
582                         "Lockres %s is locked\n",
583                         res->l_name);
584         mlog_bug_on_msg(res->l_ro_holders,
585                         "Lockres %s has %u ro holders\n",
586                         res->l_name, res->l_ro_holders);
587         mlog_bug_on_msg(res->l_ex_holders,
588                         "Lockres %s has %u ex holders\n",
589                         res->l_name, res->l_ex_holders);
590
591         /* Need to clear out the lock status block for the dlm */
592         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
593
594         res->l_flags = 0UL;
595         mlog_exit_void();
596 }
597
598 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
599                                      int level)
600 {
601         mlog_entry_void();
602
603         BUG_ON(!lockres);
604
605         switch(level) {
606         case DLM_LOCK_EX:
607                 lockres->l_ex_holders++;
608                 break;
609         case DLM_LOCK_PR:
610                 lockres->l_ro_holders++;
611                 break;
612         default:
613                 BUG();
614         }
615
616         mlog_exit_void();
617 }
618
619 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
620                                      int level)
621 {
622         mlog_entry_void();
623
624         BUG_ON(!lockres);
625
626         switch(level) {
627         case DLM_LOCK_EX:
628                 BUG_ON(!lockres->l_ex_holders);
629                 lockres->l_ex_holders--;
630                 break;
631         case DLM_LOCK_PR:
632                 BUG_ON(!lockres->l_ro_holders);
633                 lockres->l_ro_holders--;
634                 break;
635         default:
636                 BUG();
637         }
638         mlog_exit_void();
639 }
640
641 /* WARNING: This function lives in a world where the only three lock
642  * levels are EX, PR, and NL. It *will* have to be adjusted when more
643  * lock types are added. */
644 static inline int ocfs2_highest_compat_lock_level(int level)
645 {
646         int new_level = DLM_LOCK_EX;
647
648         if (level == DLM_LOCK_EX)
649                 new_level = DLM_LOCK_NL;
650         else if (level == DLM_LOCK_PR)
651                 new_level = DLM_LOCK_PR;
652         return new_level;
653 }
654
655 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
656                               unsigned long newflags)
657 {
658         struct ocfs2_mask_waiter *mw, *tmp;
659
660         assert_spin_locked(&lockres->l_lock);
661
662         lockres->l_flags = newflags;
663
664         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
665                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
666                         continue;
667
668                 list_del_init(&mw->mw_item);
669                 mw->mw_status = 0;
670                 complete(&mw->mw_complete);
671         }
672 }
673 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
674 {
675         lockres_set_flags(lockres, lockres->l_flags | or);
676 }
677 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
678                                 unsigned long clear)
679 {
680         lockres_set_flags(lockres, lockres->l_flags & ~clear);
681 }
682
683 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
684 {
685         mlog_entry_void();
686
687         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
688         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
689         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
690         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
691
692         lockres->l_level = lockres->l_requested;
693         if (lockres->l_level <=
694             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
695                 lockres->l_blocking = DLM_LOCK_NL;
696                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
697         }
698         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
699
700         mlog_exit_void();
701 }
702
703 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
704 {
705         mlog_entry_void();
706
707         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
708         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
709
710         /* Convert from RO to EX doesn't really need anything as our
711          * information is already up to data. Convert from NL to
712          * *anything* however should mark ourselves as needing an
713          * update */
714         if (lockres->l_level == DLM_LOCK_NL &&
715             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
716                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
717
718         lockres->l_level = lockres->l_requested;
719         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
720
721         mlog_exit_void();
722 }
723
724 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
725 {
726         mlog_entry_void();
727
728         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
729         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
730
731         if (lockres->l_requested > DLM_LOCK_NL &&
732             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
733             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
734                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
735
736         lockres->l_level = lockres->l_requested;
737         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
738         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
739
740         mlog_exit_void();
741 }
742
743 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
744                                      int level)
745 {
746         int needs_downconvert = 0;
747         mlog_entry_void();
748
749         assert_spin_locked(&lockres->l_lock);
750
751         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
752
753         if (level > lockres->l_blocking) {
754                 /* only schedule a downconvert if we haven't already scheduled
755                  * one that goes low enough to satisfy the level we're
756                  * blocking.  this also catches the case where we get
757                  * duplicate BASTs */
758                 if (ocfs2_highest_compat_lock_level(level) <
759                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
760                         needs_downconvert = 1;
761
762                 lockres->l_blocking = level;
763         }
764
765         mlog_exit(needs_downconvert);
766         return needs_downconvert;
767 }
768
769 static void ocfs2_blocking_ast(void *opaque, int level)
770 {
771         struct ocfs2_lock_res *lockres = opaque;
772         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
773         int needs_downconvert;
774         unsigned long flags;
775
776         BUG_ON(level <= DLM_LOCK_NL);
777
778         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
779              lockres->l_name, level, lockres->l_level,
780              ocfs2_lock_type_string(lockres->l_type));
781
782         /*
783          * We can skip the bast for locks which don't enable caching -
784          * they'll be dropped at the earliest possible time anyway.
785          */
786         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
787                 return;
788
789         spin_lock_irqsave(&lockres->l_lock, flags);
790         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
791         if (needs_downconvert)
792                 ocfs2_schedule_blocked_lock(osb, lockres);
793         spin_unlock_irqrestore(&lockres->l_lock, flags);
794
795         wake_up(&lockres->l_event);
796
797         ocfs2_wake_downconvert_thread(osb);
798 }
799
800 static void ocfs2_locking_ast(void *opaque)
801 {
802         struct ocfs2_lock_res *lockres = opaque;
803         unsigned long flags;
804
805         spin_lock_irqsave(&lockres->l_lock, flags);
806
807         if (ocfs2_dlm_lock_status(&lockres->l_lksb)) {
808                 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
809                      lockres->l_name,
810                      ocfs2_dlm_lock_status(&lockres->l_lksb));
811                 spin_unlock_irqrestore(&lockres->l_lock, flags);
812                 return;
813         }
814
815         switch(lockres->l_action) {
816         case OCFS2_AST_ATTACH:
817                 ocfs2_generic_handle_attach_action(lockres);
818                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
819                 break;
820         case OCFS2_AST_CONVERT:
821                 ocfs2_generic_handle_convert_action(lockres);
822                 break;
823         case OCFS2_AST_DOWNCONVERT:
824                 ocfs2_generic_handle_downconvert_action(lockres);
825                 break;
826         default:
827                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
828                      "lockres flags = 0x%lx, unlock action: %u\n",
829                      lockres->l_name, lockres->l_action, lockres->l_flags,
830                      lockres->l_unlock_action);
831                 BUG();
832         }
833
834         /* set it to something invalid so if we get called again we
835          * can catch it. */
836         lockres->l_action = OCFS2_AST_INVALID;
837
838         wake_up(&lockres->l_event);
839         spin_unlock_irqrestore(&lockres->l_lock, flags);
840 }
841
842 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
843                                                 int convert)
844 {
845         unsigned long flags;
846
847         mlog_entry_void();
848         spin_lock_irqsave(&lockres->l_lock, flags);
849         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
850         if (convert)
851                 lockres->l_action = OCFS2_AST_INVALID;
852         else
853                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
854         spin_unlock_irqrestore(&lockres->l_lock, flags);
855
856         wake_up(&lockres->l_event);
857         mlog_exit_void();
858 }
859
860 /* Note: If we detect another process working on the lock (i.e.,
861  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
862  * to do the right thing in that case.
863  */
864 static int ocfs2_lock_create(struct ocfs2_super *osb,
865                              struct ocfs2_lock_res *lockres,
866                              int level,
867                              u32 dlm_flags)
868 {
869         int ret = 0;
870         unsigned long flags;
871
872         mlog_entry_void();
873
874         mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
875              dlm_flags);
876
877         spin_lock_irqsave(&lockres->l_lock, flags);
878         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
879             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
880                 spin_unlock_irqrestore(&lockres->l_lock, flags);
881                 goto bail;
882         }
883
884         lockres->l_action = OCFS2_AST_ATTACH;
885         lockres->l_requested = level;
886         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
887         spin_unlock_irqrestore(&lockres->l_lock, flags);
888
889         ret = ocfs2_dlm_lock(osb->dlm,
890                              level,
891                              &lockres->l_lksb,
892                              dlm_flags,
893                              lockres->l_name,
894                              OCFS2_LOCK_ID_MAX_LEN - 1,
895                              lockres);
896         if (ret) {
897                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
898                 ocfs2_recover_from_dlm_error(lockres, 1);
899         }
900
901         mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
902
903 bail:
904         mlog_exit(ret);
905         return ret;
906 }
907
908 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
909                                         int flag)
910 {
911         unsigned long flags;
912         int ret;
913
914         spin_lock_irqsave(&lockres->l_lock, flags);
915         ret = lockres->l_flags & flag;
916         spin_unlock_irqrestore(&lockres->l_lock, flags);
917
918         return ret;
919 }
920
921 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
922
923 {
924         wait_event(lockres->l_event,
925                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
926 }
927
928 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
929
930 {
931         wait_event(lockres->l_event,
932                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
933 }
934
935 /* predict what lock level we'll be dropping down to on behalf
936  * of another node, and return true if the currently wanted
937  * level will be compatible with it. */
938 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
939                                                      int wanted)
940 {
941         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
942
943         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
944 }
945
946 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
947 {
948         INIT_LIST_HEAD(&mw->mw_item);
949         init_completion(&mw->mw_complete);
950 }
951
952 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
953 {
954         wait_for_completion(&mw->mw_complete);
955         /* Re-arm the completion in case we want to wait on it again */
956         INIT_COMPLETION(mw->mw_complete);
957         return mw->mw_status;
958 }
959
960 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
961                                     struct ocfs2_mask_waiter *mw,
962                                     unsigned long mask,
963                                     unsigned long goal)
964 {
965         BUG_ON(!list_empty(&mw->mw_item));
966
967         assert_spin_locked(&lockres->l_lock);
968
969         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
970         mw->mw_mask = mask;
971         mw->mw_goal = goal;
972 }
973
974 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
975  * if the mask still hadn't reached its goal */
976 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
977                                       struct ocfs2_mask_waiter *mw)
978 {
979         unsigned long flags;
980         int ret = 0;
981
982         spin_lock_irqsave(&lockres->l_lock, flags);
983         if (!list_empty(&mw->mw_item)) {
984                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
985                         ret = -EBUSY;
986
987                 list_del_init(&mw->mw_item);
988                 init_completion(&mw->mw_complete);
989         }
990         spin_unlock_irqrestore(&lockres->l_lock, flags);
991
992         return ret;
993
994 }
995
996 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
997                                              struct ocfs2_lock_res *lockres)
998 {
999         int ret;
1000
1001         ret = wait_for_completion_interruptible(&mw->mw_complete);
1002         if (ret)
1003                 lockres_remove_mask_waiter(lockres, mw);
1004         else
1005                 ret = mw->mw_status;
1006         /* Re-arm the completion in case we want to wait on it again */
1007         INIT_COMPLETION(mw->mw_complete);
1008         return ret;
1009 }
1010
1011 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
1012                               struct ocfs2_lock_res *lockres,
1013                               int level,
1014                               u32 lkm_flags,
1015                               int arg_flags)
1016 {
1017         struct ocfs2_mask_waiter mw;
1018         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1019         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1020         unsigned long flags;
1021
1022         mlog_entry_void();
1023
1024         ocfs2_init_mask_waiter(&mw);
1025
1026         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1027                 lkm_flags |= DLM_LKF_VALBLK;
1028
1029 again:
1030         wait = 0;
1031
1032         if (catch_signals && signal_pending(current)) {
1033                 ret = -ERESTARTSYS;
1034                 goto out;
1035         }
1036
1037         spin_lock_irqsave(&lockres->l_lock, flags);
1038
1039         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1040                         "Cluster lock called on freeing lockres %s! flags "
1041                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1042
1043         /* We only compare against the currently granted level
1044          * here. If the lock is blocked waiting on a downconvert,
1045          * we'll get caught below. */
1046         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1047             level > lockres->l_level) {
1048                 /* is someone sitting in dlm_lock? If so, wait on
1049                  * them. */
1050                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1051                 wait = 1;
1052                 goto unlock;
1053         }
1054
1055         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1056             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1057                 /* is the lock is currently blocked on behalf of
1058                  * another node */
1059                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1060                 wait = 1;
1061                 goto unlock;
1062         }
1063
1064         if (level > lockres->l_level) {
1065                 if (lockres->l_action != OCFS2_AST_INVALID)
1066                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1067                              lockres->l_name, lockres->l_action);
1068
1069                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1070                         lockres->l_action = OCFS2_AST_ATTACH;
1071                         lkm_flags &= ~DLM_LKF_CONVERT;
1072                 } else {
1073                         lockres->l_action = OCFS2_AST_CONVERT;
1074                         lkm_flags |= DLM_LKF_CONVERT;
1075                 }
1076
1077                 lockres->l_requested = level;
1078                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1079                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1080
1081                 BUG_ON(level == DLM_LOCK_IV);
1082                 BUG_ON(level == DLM_LOCK_NL);
1083
1084                 mlog(0, "lock %s, convert from %d to level = %d\n",
1085                      lockres->l_name, lockres->l_level, level);
1086
1087                 /* call dlm_lock to upgrade lock now */
1088                 ret = ocfs2_dlm_lock(osb->dlm,
1089                                      level,
1090                                      &lockres->l_lksb,
1091                                      lkm_flags,
1092                                      lockres->l_name,
1093                                      OCFS2_LOCK_ID_MAX_LEN - 1,
1094                                      lockres);
1095                 if (ret) {
1096                         if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1097                             (ret != -EAGAIN)) {
1098                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1099                                                     ret, lockres);
1100                         }
1101                         ocfs2_recover_from_dlm_error(lockres, 1);
1102                         goto out;
1103                 }
1104
1105                 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
1106                      lockres->l_name);
1107
1108                 /* At this point we've gone inside the dlm and need to
1109                  * complete our work regardless. */
1110                 catch_signals = 0;
1111
1112                 /* wait for busy to clear and carry on */
1113                 goto again;
1114         }
1115
1116         /* Ok, if we get here then we're good to go. */
1117         ocfs2_inc_holders(lockres, level);
1118
1119         ret = 0;
1120 unlock:
1121         spin_unlock_irqrestore(&lockres->l_lock, flags);
1122 out:
1123         /*
1124          * This is helping work around a lock inversion between the page lock
1125          * and dlm locks.  One path holds the page lock while calling aops
1126          * which block acquiring dlm locks.  The voting thread holds dlm
1127          * locks while acquiring page locks while down converting data locks.
1128          * This block is helping an aop path notice the inversion and back
1129          * off to unlock its page lock before trying the dlm lock again.
1130          */
1131         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1132             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1133                 wait = 0;
1134                 if (lockres_remove_mask_waiter(lockres, &mw))
1135                         ret = -EAGAIN;
1136                 else
1137                         goto again;
1138         }
1139         if (wait) {
1140                 ret = ocfs2_wait_for_mask(&mw);
1141                 if (ret == 0)
1142                         goto again;
1143                 mlog_errno(ret);
1144         }
1145
1146         mlog_exit(ret);
1147         return ret;
1148 }
1149
1150 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1151                                  struct ocfs2_lock_res *lockres,
1152                                  int level)
1153 {
1154         unsigned long flags;
1155
1156         mlog_entry_void();
1157         spin_lock_irqsave(&lockres->l_lock, flags);
1158         ocfs2_dec_holders(lockres, level);
1159         ocfs2_downconvert_on_unlock(osb, lockres);
1160         spin_unlock_irqrestore(&lockres->l_lock, flags);
1161         mlog_exit_void();
1162 }
1163
1164 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1165                                  struct ocfs2_lock_res *lockres,
1166                                  int ex,
1167                                  int local)
1168 {
1169         int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1170         unsigned long flags;
1171         u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1172
1173         spin_lock_irqsave(&lockres->l_lock, flags);
1174         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1175         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1176         spin_unlock_irqrestore(&lockres->l_lock, flags);
1177
1178         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1179 }
1180
1181 /* Grants us an EX lock on the data and metadata resources, skipping
1182  * the normal cluster directory lookup. Use this ONLY on newly created
1183  * inodes which other nodes can't possibly see, and which haven't been
1184  * hashed in the inode hash yet. This can give us a good performance
1185  * increase as it'll skip the network broadcast normally associated
1186  * with creating a new lock resource. */
1187 int ocfs2_create_new_inode_locks(struct inode *inode)
1188 {
1189         int ret;
1190         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1191
1192         BUG_ON(!inode);
1193         BUG_ON(!ocfs2_inode_is_new(inode));
1194
1195         mlog_entry_void();
1196
1197         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1198
1199         /* NOTE: That we don't increment any of the holder counts, nor
1200          * do we add anything to a journal handle. Since this is
1201          * supposed to be a new inode which the cluster doesn't know
1202          * about yet, there is no need to.  As far as the LVB handling
1203          * is concerned, this is basically like acquiring an EX lock
1204          * on a resource which has an invalid one -- we'll set it
1205          * valid when we release the EX. */
1206
1207         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1208         if (ret) {
1209                 mlog_errno(ret);
1210                 goto bail;
1211         }
1212
1213         /*
1214          * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1215          * don't use a generation in their lock names.
1216          */
1217         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1218         if (ret) {
1219                 mlog_errno(ret);
1220                 goto bail;
1221         }
1222
1223         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1224         if (ret) {
1225                 mlog_errno(ret);
1226                 goto bail;
1227         }
1228
1229 bail:
1230         mlog_exit(ret);
1231         return ret;
1232 }
1233
1234 int ocfs2_rw_lock(struct inode *inode, int write)
1235 {
1236         int status, level;
1237         struct ocfs2_lock_res *lockres;
1238         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1239
1240         BUG_ON(!inode);
1241
1242         mlog_entry_void();
1243
1244         mlog(0, "inode %llu take %s RW lock\n",
1245              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1246              write ? "EXMODE" : "PRMODE");
1247
1248         if (ocfs2_mount_local(osb))
1249                 return 0;
1250
1251         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1252
1253         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1254
1255         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1256                                     0);
1257         if (status < 0)
1258                 mlog_errno(status);
1259
1260         mlog_exit(status);
1261         return status;
1262 }
1263
1264 void ocfs2_rw_unlock(struct inode *inode, int write)
1265 {
1266         int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1267         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1268         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1269
1270         mlog_entry_void();
1271
1272         mlog(0, "inode %llu drop %s RW lock\n",
1273              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1274              write ? "EXMODE" : "PRMODE");
1275
1276         if (!ocfs2_mount_local(osb))
1277                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1278
1279         mlog_exit_void();
1280 }
1281
1282 /*
1283  * ocfs2_open_lock always get PR mode lock.
1284  */
1285 int ocfs2_open_lock(struct inode *inode)
1286 {
1287         int status = 0;
1288         struct ocfs2_lock_res *lockres;
1289         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1290
1291         BUG_ON(!inode);
1292
1293         mlog_entry_void();
1294
1295         mlog(0, "inode %llu take PRMODE open lock\n",
1296              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1297
1298         if (ocfs2_mount_local(osb))
1299                 goto out;
1300
1301         lockres = &OCFS2_I(inode)->ip_open_lockres;
1302
1303         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1304                                     DLM_LOCK_PR, 0, 0);
1305         if (status < 0)
1306                 mlog_errno(status);
1307
1308 out:
1309         mlog_exit(status);
1310         return status;
1311 }
1312
1313 int ocfs2_try_open_lock(struct inode *inode, int write)
1314 {
1315         int status = 0, level;
1316         struct ocfs2_lock_res *lockres;
1317         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1318
1319         BUG_ON(!inode);
1320
1321         mlog_entry_void();
1322
1323         mlog(0, "inode %llu try to take %s open lock\n",
1324              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1325              write ? "EXMODE" : "PRMODE");
1326
1327         if (ocfs2_mount_local(osb))
1328                 goto out;
1329
1330         lockres = &OCFS2_I(inode)->ip_open_lockres;
1331
1332         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1333
1334         /*
1335          * The file system may already holding a PRMODE/EXMODE open lock.
1336          * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1337          * other nodes and the -EAGAIN will indicate to the caller that
1338          * this inode is still in use.
1339          */
1340         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1341                                     level, DLM_LKF_NOQUEUE, 0);
1342
1343 out:
1344         mlog_exit(status);
1345         return status;
1346 }
1347
1348 /*
1349  * ocfs2_open_unlock unlock PR and EX mode open locks.
1350  */
1351 void ocfs2_open_unlock(struct inode *inode)
1352 {
1353         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1354         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1355
1356         mlog_entry_void();
1357
1358         mlog(0, "inode %llu drop open lock\n",
1359              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1360
1361         if (ocfs2_mount_local(osb))
1362                 goto out;
1363
1364         if(lockres->l_ro_holders)
1365                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1366                                      DLM_LOCK_PR);
1367         if(lockres->l_ex_holders)
1368                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1369                                      DLM_LOCK_EX);
1370
1371 out:
1372         mlog_exit_void();
1373 }
1374
1375 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1376                                      int level)
1377 {
1378         int ret;
1379         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1380         unsigned long flags;
1381         struct ocfs2_mask_waiter mw;
1382
1383         ocfs2_init_mask_waiter(&mw);
1384
1385 retry_cancel:
1386         spin_lock_irqsave(&lockres->l_lock, flags);
1387         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1388                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1389                 if (ret) {
1390                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1391                         ret = ocfs2_cancel_convert(osb, lockres);
1392                         if (ret < 0) {
1393                                 mlog_errno(ret);
1394                                 goto out;
1395                         }
1396                         goto retry_cancel;
1397                 }
1398                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1399                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1400
1401                 ocfs2_wait_for_mask(&mw);
1402                 goto retry_cancel;
1403         }
1404
1405         ret = -ERESTARTSYS;
1406         /*
1407          * We may still have gotten the lock, in which case there's no
1408          * point to restarting the syscall.
1409          */
1410         if (lockres->l_level == level)
1411                 ret = 0;
1412
1413         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1414              lockres->l_flags, lockres->l_level, lockres->l_action);
1415
1416         spin_unlock_irqrestore(&lockres->l_lock, flags);
1417
1418 out:
1419         return ret;
1420 }
1421
1422 /*
1423  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1424  * flock() calls. The locking approach this requires is sufficiently
1425  * different from all other cluster lock types that we implement a
1426  * seperate path to the "low-level" dlm calls. In particular:
1427  *
1428  * - No optimization of lock levels is done - we take at exactly
1429  *   what's been requested.
1430  *
1431  * - No lock caching is employed. We immediately downconvert to
1432  *   no-lock at unlock time. This also means flock locks never go on
1433  *   the blocking list).
1434  *
1435  * - Since userspace can trivially deadlock itself with flock, we make
1436  *   sure to allow cancellation of a misbehaving applications flock()
1437  *   request.
1438  *
1439  * - Access to any flock lockres doesn't require concurrency, so we
1440  *   can simplify the code by requiring the caller to guarantee
1441  *   serialization of dlmglue flock calls.
1442  */
1443 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1444 {
1445         int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
1446         unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
1447         unsigned long flags;
1448         struct ocfs2_file_private *fp = file->private_data;
1449         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1450         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1451         struct ocfs2_mask_waiter mw;
1452
1453         ocfs2_init_mask_waiter(&mw);
1454
1455         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1456             (lockres->l_level > DLM_LOCK_NL)) {
1457                 mlog(ML_ERROR,
1458                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1459                      "level: %u\n", lockres->l_name, lockres->l_flags,
1460                      lockres->l_level);
1461                 return -EINVAL;
1462         }
1463
1464         spin_lock_irqsave(&lockres->l_lock, flags);
1465         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1466                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1467                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1468
1469                 /*
1470                  * Get the lock at NLMODE to start - that way we
1471                  * can cancel the upconvert request if need be.
1472                  */
1473                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
1474                 if (ret < 0) {
1475                         mlog_errno(ret);
1476                         goto out;
1477                 }
1478
1479                 ret = ocfs2_wait_for_mask(&mw);
1480                 if (ret) {
1481                         mlog_errno(ret);
1482                         goto out;
1483                 }
1484                 spin_lock_irqsave(&lockres->l_lock, flags);
1485         }
1486
1487         lockres->l_action = OCFS2_AST_CONVERT;
1488         lkm_flags |= LKM_CONVERT;
1489         lockres->l_requested = level;
1490         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1491
1492         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1493         spin_unlock_irqrestore(&lockres->l_lock, flags);
1494
1495         ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
1496                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1497                              lockres);
1498         if (ret) {
1499                 if (!trylock || (ret != -EAGAIN)) {
1500                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1501                         ret = -EINVAL;
1502                 }
1503
1504                 ocfs2_recover_from_dlm_error(lockres, 1);
1505                 lockres_remove_mask_waiter(lockres, &mw);
1506                 goto out;
1507         }
1508
1509         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1510         if (ret == -ERESTARTSYS) {
1511                 /*
1512                  * Userspace can cause deadlock itself with
1513                  * flock(). Current behavior locally is to allow the
1514                  * deadlock, but abort the system call if a signal is
1515                  * received. We follow this example, otherwise a
1516                  * poorly written program could sit in kernel until
1517                  * reboot.
1518                  *
1519                  * Handling this is a bit more complicated for Ocfs2
1520                  * though. We can't exit this function with an
1521                  * outstanding lock request, so a cancel convert is
1522                  * required. We intentionally overwrite 'ret' - if the
1523                  * cancel fails and the lock was granted, it's easier
1524                  * to just bubble sucess back up to the user.
1525                  */
1526                 ret = ocfs2_flock_handle_signal(lockres, level);
1527         }
1528
1529 out:
1530
1531         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1532              lockres->l_name, ex, trylock, ret);
1533         return ret;
1534 }
1535
1536 void ocfs2_file_unlock(struct file *file)
1537 {
1538         int ret;
1539         unsigned long flags;
1540         struct ocfs2_file_private *fp = file->private_data;
1541         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1542         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1543         struct ocfs2_mask_waiter mw;
1544
1545         ocfs2_init_mask_waiter(&mw);
1546
1547         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1548                 return;
1549
1550         if (lockres->l_level == LKM_NLMODE)
1551                 return;
1552
1553         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1554              lockres->l_name, lockres->l_flags, lockres->l_level,
1555              lockres->l_action);
1556
1557         spin_lock_irqsave(&lockres->l_lock, flags);
1558         /*
1559          * Fake a blocking ast for the downconvert code.
1560          */
1561         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1562         lockres->l_blocking = DLM_LOCK_EX;
1563
1564         ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1565         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1566         spin_unlock_irqrestore(&lockres->l_lock, flags);
1567
1568         ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
1569         if (ret) {
1570                 mlog_errno(ret);
1571                 return;
1572         }
1573
1574         ret = ocfs2_wait_for_mask(&mw);
1575         if (ret)
1576                 mlog_errno(ret);
1577 }
1578
1579 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1580                                         struct ocfs2_lock_res *lockres)
1581 {
1582         int kick = 0;
1583
1584         mlog_entry_void();
1585
1586         /* If we know that another node is waiting on our lock, kick
1587          * the downconvert thread * pre-emptively when we reach a release
1588          * condition. */
1589         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1590                 switch(lockres->l_blocking) {
1591                 case DLM_LOCK_EX:
1592                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1593                                 kick = 1;
1594                         break;
1595                 case DLM_LOCK_PR:
1596                         if (!lockres->l_ex_holders)
1597                                 kick = 1;
1598                         break;
1599                 default:
1600                         BUG();
1601                 }
1602         }
1603
1604         if (kick)
1605                 ocfs2_wake_downconvert_thread(osb);
1606
1607         mlog_exit_void();
1608 }
1609
1610 #define OCFS2_SEC_BITS   34
1611 #define OCFS2_SEC_SHIFT  (64 - 34)
1612 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1613
1614 /* LVB only has room for 64 bits of time here so we pack it for
1615  * now. */
1616 static u64 ocfs2_pack_timespec(struct timespec *spec)
1617 {
1618         u64 res;
1619         u64 sec = spec->tv_sec;
1620         u32 nsec = spec->tv_nsec;
1621
1622         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1623
1624         return res;
1625 }
1626
1627 /* Call this with the lockres locked. I am reasonably sure we don't
1628  * need ip_lock in this function as anyone who would be changing those
1629  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1630 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1631 {
1632         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1633         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1634         struct ocfs2_meta_lvb *lvb;
1635
1636         mlog_entry_void();
1637
1638         lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1639
1640         /*
1641          * Invalidate the LVB of a deleted inode - this way other
1642          * nodes are forced to go to disk and discover the new inode
1643          * status.
1644          */
1645         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1646                 lvb->lvb_version = 0;
1647                 goto out;
1648         }
1649
1650         lvb->lvb_version   = OCFS2_LVB_VERSION;
1651         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1652         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1653         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1654         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1655         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1656         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1657         lvb->lvb_iatime_packed  =
1658                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1659         lvb->lvb_ictime_packed =
1660                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1661         lvb->lvb_imtime_packed =
1662                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1663         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1664         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1665         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1666
1667 out:
1668         mlog_meta_lvb(0, lockres);
1669
1670         mlog_exit_void();
1671 }
1672
1673 static void ocfs2_unpack_timespec(struct timespec *spec,
1674                                   u64 packed_time)
1675 {
1676         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1677         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1678 }
1679
1680 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1681 {
1682         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1683         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1684         struct ocfs2_meta_lvb *lvb;
1685
1686         mlog_entry_void();
1687
1688         mlog_meta_lvb(0, lockres);
1689
1690         lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1691
1692         /* We're safe here without the lockres lock... */
1693         spin_lock(&oi->ip_lock);
1694         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1695         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1696
1697         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1698         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1699         ocfs2_set_inode_flags(inode);
1700
1701         /* fast-symlinks are a special case */
1702         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1703                 inode->i_blocks = 0;
1704         else
1705                 inode->i_blocks = ocfs2_inode_sector_count(inode);
1706
1707         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1708         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1709         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1710         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1711         ocfs2_unpack_timespec(&inode->i_atime,
1712                               be64_to_cpu(lvb->lvb_iatime_packed));
1713         ocfs2_unpack_timespec(&inode->i_mtime,
1714                               be64_to_cpu(lvb->lvb_imtime_packed));
1715         ocfs2_unpack_timespec(&inode->i_ctime,
1716                               be64_to_cpu(lvb->lvb_ictime_packed));
1717         spin_unlock(&oi->ip_lock);
1718
1719         mlog_exit_void();
1720 }
1721
1722 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1723                                               struct ocfs2_lock_res *lockres)
1724 {
1725         struct ocfs2_meta_lvb *lvb =
1726                 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1727
1728         if (lvb->lvb_version == OCFS2_LVB_VERSION
1729             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1730                 return 1;
1731         return 0;
1732 }
1733
1734 /* Determine whether a lock resource needs to be refreshed, and
1735  * arbitrate who gets to refresh it.
1736  *
1737  *   0 means no refresh needed.
1738  *
1739  *   > 0 means you need to refresh this and you MUST call
1740  *   ocfs2_complete_lock_res_refresh afterwards. */
1741 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1742 {
1743         unsigned long flags;
1744         int status = 0;
1745
1746         mlog_entry_void();
1747
1748 refresh_check:
1749         spin_lock_irqsave(&lockres->l_lock, flags);
1750         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1751                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1752                 goto bail;
1753         }
1754
1755         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1756                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1757
1758                 ocfs2_wait_on_refreshing_lock(lockres);
1759                 goto refresh_check;
1760         }
1761
1762         /* Ok, I'll be the one to refresh this lock. */
1763         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1764         spin_unlock_irqrestore(&lockres->l_lock, flags);
1765
1766         status = 1;
1767 bail:
1768         mlog_exit(status);
1769         return status;
1770 }
1771
1772 /* If status is non zero, I'll mark it as not being in refresh
1773  * anymroe, but i won't clear the needs refresh flag. */
1774 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1775                                                    int status)
1776 {
1777         unsigned long flags;
1778         mlog_entry_void();
1779
1780         spin_lock_irqsave(&lockres->l_lock, flags);
1781         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1782         if (!status)
1783                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1784         spin_unlock_irqrestore(&lockres->l_lock, flags);
1785
1786         wake_up(&lockres->l_event);
1787
1788         mlog_exit_void();
1789 }
1790
1791 /* may or may not return a bh if it went to disk. */
1792 static int ocfs2_inode_lock_update(struct inode *inode,
1793                                   struct buffer_head **bh)
1794 {
1795         int status = 0;
1796         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1797         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1798         struct ocfs2_dinode *fe;
1799         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1800
1801         mlog_entry_void();
1802
1803         if (ocfs2_mount_local(osb))
1804                 goto bail;
1805
1806         spin_lock(&oi->ip_lock);
1807         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1808                 mlog(0, "Orphaned inode %llu was deleted while we "
1809                      "were waiting on a lock. ip_flags = 0x%x\n",
1810                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1811                 spin_unlock(&oi->ip_lock);
1812                 status = -ENOENT;
1813                 goto bail;
1814         }
1815         spin_unlock(&oi->ip_lock);
1816
1817         if (!ocfs2_should_refresh_lock_res(lockres))
1818                 goto bail;
1819
1820         /* This will discard any caching information we might have had
1821          * for the inode metadata. */
1822         ocfs2_metadata_cache_purge(inode);
1823
1824         ocfs2_extent_map_trunc(inode, 0);
1825
1826         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1827                 mlog(0, "Trusting LVB on inode %llu\n",
1828                      (unsigned long long)oi->ip_blkno);
1829                 ocfs2_refresh_inode_from_lvb(inode);
1830         } else {
1831                 /* Boo, we have to go to disk. */
1832                 /* read bh, cast, ocfs2_refresh_inode */
1833                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1834                                           bh, OCFS2_BH_CACHED, inode);
1835                 if (status < 0) {
1836                         mlog_errno(status);
1837                         goto bail_refresh;
1838                 }
1839                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1840
1841                 /* This is a good chance to make sure we're not
1842                  * locking an invalid object.
1843                  *
1844                  * We bug on a stale inode here because we checked
1845                  * above whether it was wiped from disk. The wiping
1846                  * node provides a guarantee that we receive that
1847                  * message and can mark the inode before dropping any
1848                  * locks associated with it. */
1849                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1850                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1851                         status = -EIO;
1852                         goto bail_refresh;
1853                 }
1854                 mlog_bug_on_msg(inode->i_generation !=
1855                                 le32_to_cpu(fe->i_generation),
1856                                 "Invalid dinode %llu disk generation: %u "
1857                                 "inode->i_generation: %u\n",
1858                                 (unsigned long long)oi->ip_blkno,
1859                                 le32_to_cpu(fe->i_generation),
1860                                 inode->i_generation);
1861                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1862                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1863                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1864                                 (unsigned long long)oi->ip_blkno,
1865                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1866                                 le32_to_cpu(fe->i_flags));
1867
1868                 ocfs2_refresh_inode(inode, fe);
1869         }
1870
1871         status = 0;
1872 bail_refresh:
1873         ocfs2_complete_lock_res_refresh(lockres, status);
1874 bail:
1875         mlog_exit(status);
1876         return status;
1877 }
1878
1879 static int ocfs2_assign_bh(struct inode *inode,
1880                            struct buffer_head **ret_bh,
1881                            struct buffer_head *passed_bh)
1882 {
1883         int status;
1884
1885         if (passed_bh) {
1886                 /* Ok, the update went to disk for us, use the
1887                  * returned bh. */
1888                 *ret_bh = passed_bh;
1889                 get_bh(*ret_bh);
1890
1891                 return 0;
1892         }
1893
1894         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1895                                   OCFS2_I(inode)->ip_blkno,
1896                                   ret_bh,
1897                                   OCFS2_BH_CACHED,
1898                                   inode);
1899         if (status < 0)
1900                 mlog_errno(status);
1901
1902         return status;
1903 }
1904
1905 /*
1906  * returns < 0 error if the callback will never be called, otherwise
1907  * the result of the lock will be communicated via the callback.
1908  */
1909 int ocfs2_inode_lock_full(struct inode *inode,
1910                          struct buffer_head **ret_bh,
1911                          int ex,
1912                          int arg_flags)
1913 {
1914         int status, level, acquired;
1915         u32 dlm_flags;
1916         struct ocfs2_lock_res *lockres = NULL;
1917         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1918         struct buffer_head *local_bh = NULL;
1919
1920         BUG_ON(!inode);
1921
1922         mlog_entry_void();
1923
1924         mlog(0, "inode %llu, take %s META lock\n",
1925              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1926              ex ? "EXMODE" : "PRMODE");
1927
1928         status = 0;
1929         acquired = 0;
1930         /* We'll allow faking a readonly metadata lock for
1931          * rodevices. */
1932         if (ocfs2_is_hard_readonly(osb)) {
1933                 if (ex)
1934                         status = -EROFS;
1935                 goto bail;
1936         }
1937
1938         if (ocfs2_mount_local(osb))
1939                 goto local;
1940
1941         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1942                 ocfs2_wait_for_recovery(osb);
1943
1944         lockres = &OCFS2_I(inode)->ip_inode_lockres;
1945         level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1946         dlm_flags = 0;
1947         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1948                 dlm_flags |= DLM_LKF_NOQUEUE;
1949
1950         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1951         if (status < 0) {
1952                 if (status != -EAGAIN && status != -EIOCBRETRY)
1953                         mlog_errno(status);
1954                 goto bail;
1955         }
1956
1957         /* Notify the error cleanup path to drop the cluster lock. */
1958         acquired = 1;
1959
1960         /* We wait twice because a node may have died while we were in
1961          * the lower dlm layers. The second time though, we've
1962          * committed to owning this lock so we don't allow signals to
1963          * abort the operation. */
1964         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1965                 ocfs2_wait_for_recovery(osb);
1966
1967 local:
1968         /*
1969          * We only see this flag if we're being called from
1970          * ocfs2_read_locked_inode(). It means we're locking an inode
1971          * which hasn't been populated yet, so clear the refresh flag
1972          * and let the caller handle it.
1973          */
1974         if (inode->i_state & I_NEW) {
1975                 status = 0;
1976                 if (lockres)
1977                         ocfs2_complete_lock_res_refresh(lockres, 0);
1978                 goto bail;
1979         }
1980
1981         /* This is fun. The caller may want a bh back, or it may
1982          * not. ocfs2_inode_lock_update definitely wants one in, but
1983          * may or may not read one, depending on what's in the
1984          * LVB. The result of all of this is that we've *only* gone to
1985          * disk if we have to, so the complexity is worthwhile. */
1986         status = ocfs2_inode_lock_update(inode, &local_bh);
1987         if (status < 0) {
1988                 if (status != -ENOENT)
1989                         mlog_errno(status);
1990                 goto bail;
1991         }
1992
1993         if (ret_bh) {
1994                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1995                 if (status < 0) {
1996                         mlog_errno(status);
1997                         goto bail;
1998                 }
1999         }
2000
2001 bail:
2002         if (status < 0) {
2003                 if (ret_bh && (*ret_bh)) {
2004                         brelse(*ret_bh);
2005                         *ret_bh = NULL;
2006                 }
2007                 if (acquired)
2008                         ocfs2_inode_unlock(inode, ex);
2009         }
2010
2011         if (local_bh)
2012                 brelse(local_bh);
2013
2014         mlog_exit(status);
2015         return status;
2016 }
2017
2018 /*
2019  * This is working around a lock inversion between tasks acquiring DLM
2020  * locks while holding a page lock and the downconvert thread which
2021  * blocks dlm lock acquiry while acquiring page locks.
2022  *
2023  * ** These _with_page variantes are only intended to be called from aop
2024  * methods that hold page locks and return a very specific *positive* error
2025  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2026  *
2027  * The DLM is called such that it returns -EAGAIN if it would have
2028  * blocked waiting for the downconvert thread.  In that case we unlock
2029  * our page so the downconvert thread can make progress.  Once we've
2030  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2031  * that called us can bubble that back up into the VFS who will then
2032  * immediately retry the aop call.
2033  *
2034  * We do a blocking lock and immediate unlock before returning, though, so that
2035  * the lock has a great chance of being cached on this node by the time the VFS
2036  * calls back to retry the aop.    This has a potential to livelock as nodes
2037  * ping locks back and forth, but that's a risk we're willing to take to avoid
2038  * the lock inversion simply.
2039  */
2040 int ocfs2_inode_lock_with_page(struct inode *inode,
2041                               struct buffer_head **ret_bh,
2042                               int ex,
2043                               struct page *page)
2044 {
2045         int ret;
2046
2047         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2048         if (ret == -EAGAIN) {
2049                 unlock_page(page);
2050                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2051                         ocfs2_inode_unlock(inode, ex);
2052                 ret = AOP_TRUNCATED_PAGE;
2053         }
2054
2055         return ret;
2056 }
2057
2058 int ocfs2_inode_lock_atime(struct inode *inode,
2059                           struct vfsmount *vfsmnt,
2060                           int *level)
2061 {
2062         int ret;
2063
2064         mlog_entry_void();
2065         ret = ocfs2_inode_lock(inode, NULL, 0);
2066         if (ret < 0) {
2067                 mlog_errno(ret);
2068                 return ret;
2069         }
2070
2071         /*
2072          * If we should update atime, we will get EX lock,
2073          * otherwise we just get PR lock.
2074          */
2075         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2076                 struct buffer_head *bh = NULL;
2077
2078                 ocfs2_inode_unlock(inode, 0);
2079                 ret = ocfs2_inode_lock(inode, &bh, 1);
2080                 if (ret < 0) {
2081                         mlog_errno(ret);
2082                         return ret;
2083                 }
2084                 *level = 1;
2085                 if (ocfs2_should_update_atime(inode, vfsmnt))
2086                         ocfs2_update_inode_atime(inode, bh);
2087                 if (bh)
2088                         brelse(bh);
2089         } else
2090                 *level = 0;
2091
2092         mlog_exit(ret);
2093         return ret;
2094 }
2095
2096 void ocfs2_inode_unlock(struct inode *inode,
2097                        int ex)
2098 {
2099         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2100         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2101         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2102
2103         mlog_entry_void();
2104
2105         mlog(0, "inode %llu drop %s META lock\n",
2106              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2107              ex ? "EXMODE" : "PRMODE");
2108
2109         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2110             !ocfs2_mount_local(osb))
2111                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2112
2113         mlog_exit_void();
2114 }
2115
2116 int ocfs2_super_lock(struct ocfs2_super *osb,
2117                      int ex)
2118 {
2119         int status = 0;
2120         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2121         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2122
2123         mlog_entry_void();
2124
2125         if (ocfs2_is_hard_readonly(osb))
2126                 return -EROFS;
2127
2128         if (ocfs2_mount_local(osb))
2129                 goto bail;
2130
2131         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2132         if (status < 0) {
2133                 mlog_errno(status);
2134                 goto bail;
2135         }
2136
2137         /* The super block lock path is really in the best position to
2138          * know when resources covered by the lock need to be
2139          * refreshed, so we do it here. Of course, making sense of
2140          * everything is up to the caller :) */
2141         status = ocfs2_should_refresh_lock_res(lockres);
2142         if (status < 0) {
2143                 mlog_errno(status);
2144                 goto bail;
2145         }
2146         if (status) {
2147                 status = ocfs2_refresh_slot_info(osb);
2148
2149                 ocfs2_complete_lock_res_refresh(lockres, status);
2150
2151                 if (status < 0)
2152                         mlog_errno(status);
2153         }
2154 bail:
2155         mlog_exit(status);
2156         return status;
2157 }
2158
2159 void ocfs2_super_unlock(struct ocfs2_super *osb,
2160                         int ex)
2161 {
2162         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2163         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2164
2165         if (!ocfs2_mount_local(osb))
2166                 ocfs2_cluster_unlock(osb, lockres, level);
2167 }
2168
2169 int ocfs2_rename_lock(struct ocfs2_super *osb)
2170 {
2171         int status;
2172         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2173
2174         if (ocfs2_is_hard_readonly(osb))
2175                 return -EROFS;
2176
2177         if (ocfs2_mount_local(osb))
2178                 return 0;
2179
2180         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2181         if (status < 0)
2182                 mlog_errno(status);
2183
2184         return status;
2185 }
2186
2187 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2188 {
2189         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2190
2191         if (!ocfs2_mount_local(osb))
2192                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2193 }
2194
2195 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2196 {
2197         int ret;
2198         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2199         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2200         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2201
2202         BUG_ON(!dl);
2203
2204         if (ocfs2_is_hard_readonly(osb))
2205                 return -EROFS;
2206
2207         if (ocfs2_mount_local(osb))
2208                 return 0;
2209
2210         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2211         if (ret < 0)
2212                 mlog_errno(ret);
2213
2214         return ret;
2215 }
2216
2217 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2218 {
2219         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2220         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2221         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2222
2223         if (!ocfs2_mount_local(osb))
2224                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2225 }
2226
2227 /* Reference counting of the dlm debug structure. We want this because
2228  * open references on the debug inodes can live on after a mount, so
2229  * we can't rely on the ocfs2_super to always exist. */
2230 static void ocfs2_dlm_debug_free(struct kref *kref)
2231 {
2232         struct ocfs2_dlm_debug *dlm_debug;
2233
2234         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2235
2236         kfree(dlm_debug);
2237 }
2238
2239 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2240 {
2241         if (dlm_debug)
2242                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2243 }
2244
2245 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2246 {
2247         kref_get(&debug->d_refcnt);
2248 }
2249
2250 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2251 {
2252         struct ocfs2_dlm_debug *dlm_debug;
2253
2254         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2255         if (!dlm_debug) {
2256                 mlog_errno(-ENOMEM);
2257                 goto out;
2258         }
2259
2260         kref_init(&dlm_debug->d_refcnt);
2261         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2262         dlm_debug->d_locking_state = NULL;
2263 out:
2264         return dlm_debug;
2265 }
2266
2267 /* Access to this is arbitrated for us via seq_file->sem. */
2268 struct ocfs2_dlm_seq_priv {
2269         struct ocfs2_dlm_debug *p_dlm_debug;
2270         struct ocfs2_lock_res p_iter_res;
2271         struct ocfs2_lock_res p_tmp_res;
2272 };
2273
2274 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2275                                                  struct ocfs2_dlm_seq_priv *priv)
2276 {
2277         struct ocfs2_lock_res *iter, *ret = NULL;
2278         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2279
2280         assert_spin_locked(&ocfs2_dlm_tracking_lock);
2281
2282         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2283                 /* discover the head of the list */
2284                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2285                         mlog(0, "End of list found, %p\n", ret);
2286                         break;
2287                 }
2288
2289                 /* We track our "dummy" iteration lockres' by a NULL
2290                  * l_ops field. */
2291                 if (iter->l_ops != NULL) {
2292                         ret = iter;
2293                         break;
2294                 }
2295         }
2296
2297         return ret;
2298 }
2299
2300 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2301 {
2302         struct ocfs2_dlm_seq_priv *priv = m->private;
2303         struct ocfs2_lock_res *iter;
2304
2305         spin_lock(&ocfs2_dlm_tracking_lock);
2306         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2307         if (iter) {
2308                 /* Since lockres' have the lifetime of their container
2309                  * (which can be inodes, ocfs2_supers, etc) we want to
2310                  * copy this out to a temporary lockres while still
2311                  * under the spinlock. Obviously after this we can't
2312                  * trust any pointers on the copy returned, but that's
2313                  * ok as the information we want isn't typically held
2314                  * in them. */
2315                 priv->p_tmp_res = *iter;
2316                 iter = &priv->p_tmp_res;
2317         }
2318         spin_unlock(&ocfs2_dlm_tracking_lock);
2319
2320         return iter;
2321 }
2322
2323 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2324 {
2325 }
2326
2327 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2328 {
2329         struct ocfs2_dlm_seq_priv *priv = m->private;
2330         struct ocfs2_lock_res *iter = v;
2331         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2332
2333         spin_lock(&ocfs2_dlm_tracking_lock);
2334         iter = ocfs2_dlm_next_res(iter, priv);
2335         list_del_init(&dummy->l_debug_list);
2336         if (iter) {
2337                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
2338                 priv->p_tmp_res = *iter;
2339                 iter = &priv->p_tmp_res;
2340         }
2341         spin_unlock(&ocfs2_dlm_tracking_lock);
2342
2343         return iter;
2344 }
2345
2346 /* So that debugfs.ocfs2 can determine which format is being used */
2347 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2348 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2349 {
2350         int i;
2351         char *lvb;
2352         struct ocfs2_lock_res *lockres = v;
2353
2354         if (!lockres)
2355                 return -EINVAL;
2356
2357         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2358
2359         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2360                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2361                            lockres->l_name,
2362                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2363         else
2364                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2365
2366         seq_printf(m, "%d\t"
2367                    "0x%lx\t"
2368                    "0x%x\t"
2369                    "0x%x\t"
2370                    "%u\t"
2371                    "%u\t"
2372                    "%d\t"
2373                    "%d\t",
2374                    lockres->l_level,
2375                    lockres->l_flags,
2376                    lockres->l_action,
2377                    lockres->l_unlock_action,
2378                    lockres->l_ro_holders,
2379                    lockres->l_ex_holders,
2380                    lockres->l_requested,
2381                    lockres->l_blocking);
2382
2383         /* Dump the raw LVB */
2384         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2385         for(i = 0; i < DLM_LVB_LEN; i++)
2386                 seq_printf(m, "0x%x\t", lvb[i]);
2387
2388         /* End the line */
2389         seq_printf(m, "\n");
2390         return 0;
2391 }
2392
2393 static const struct seq_operations ocfs2_dlm_seq_ops = {
2394         .start =        ocfs2_dlm_seq_start,
2395         .stop =         ocfs2_dlm_seq_stop,
2396         .next =         ocfs2_dlm_seq_next,
2397         .show =         ocfs2_dlm_seq_show,
2398 };
2399
2400 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2401 {
2402         struct seq_file *seq = (struct seq_file *) file->private_data;
2403         struct ocfs2_dlm_seq_priv *priv = seq->private;
2404         struct ocfs2_lock_res *res = &priv->p_iter_res;
2405
2406         ocfs2_remove_lockres_tracking(res);
2407         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2408         return seq_release_private(inode, file);
2409 }
2410
2411 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2412 {
2413         int ret;
2414         struct ocfs2_dlm_seq_priv *priv;
2415         struct seq_file *seq;
2416         struct ocfs2_super *osb;
2417
2418         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2419         if (!priv) {
2420                 ret = -ENOMEM;
2421                 mlog_errno(ret);
2422                 goto out;
2423         }
2424         osb = inode->i_private;
2425         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2426         priv->p_dlm_debug = osb->osb_dlm_debug;
2427         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2428
2429         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2430         if (ret) {
2431                 kfree(priv);
2432                 mlog_errno(ret);
2433                 goto out;
2434         }
2435
2436         seq = (struct seq_file *) file->private_data;
2437         seq->private = priv;
2438
2439         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2440                                    priv->p_dlm_debug);
2441
2442 out:
2443         return ret;
2444 }
2445
2446 static const struct file_operations ocfs2_dlm_debug_fops = {
2447         .open =         ocfs2_dlm_debug_open,
2448         .release =      ocfs2_dlm_debug_release,
2449         .read =         seq_read,
2450         .llseek =       seq_lseek,
2451 };
2452
2453 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2454 {
2455         int ret = 0;
2456         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2457
2458         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2459                                                          S_IFREG|S_IRUSR,
2460                                                          osb->osb_debug_root,
2461                                                          osb,
2462                                                          &ocfs2_dlm_debug_fops);
2463         if (!dlm_debug->d_locking_state) {
2464                 ret = -EINVAL;
2465                 mlog(ML_ERROR,
2466                      "Unable to create locking state debugfs file.\n");
2467                 goto out;
2468         }
2469
2470         ocfs2_get_dlm_debug(dlm_debug);
2471 out:
2472         return ret;
2473 }
2474
2475 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2476 {
2477         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2478
2479         if (dlm_debug) {
2480                 debugfs_remove(dlm_debug->d_locking_state);
2481                 ocfs2_put_dlm_debug(dlm_debug);
2482         }
2483 }
2484
2485 int ocfs2_dlm_init(struct ocfs2_super *osb)
2486 {
2487         int status = 0;
2488         u32 dlm_key;
2489         struct dlm_ctxt *dlm = NULL;
2490
2491         mlog_entry_void();
2492
2493         if (ocfs2_mount_local(osb))
2494                 goto local;
2495
2496         status = ocfs2_dlm_init_debug(osb);
2497         if (status < 0) {
2498                 mlog_errno(status);
2499                 goto bail;
2500         }
2501
2502         /* launch downconvert thread */
2503         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2504         if (IS_ERR(osb->dc_task)) {
2505                 status = PTR_ERR(osb->dc_task);
2506                 osb->dc_task = NULL;
2507                 mlog_errno(status);
2508                 goto bail;
2509         }
2510
2511         /* used by the dlm code to make message headers unique, each
2512          * node in this domain must agree on this. */
2513         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2514
2515         /* for now, uuid == domain */
2516         dlm = dlm_register_domain(osb->uuid_str, dlm_key,
2517                                   &osb->osb_locking_proto);
2518         if (IS_ERR(dlm)) {
2519                 status = PTR_ERR(dlm);
2520                 mlog_errno(status);
2521                 goto bail;
2522         }
2523
2524         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2525
2526 local:
2527         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2528         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2529
2530         osb->dlm = dlm;
2531
2532         status = 0;
2533 bail:
2534         if (status < 0) {
2535                 ocfs2_dlm_shutdown_debug(osb);
2536                 if (osb->dc_task)
2537                         kthread_stop(osb->dc_task);
2538         }
2539
2540         mlog_exit(status);
2541         return status;
2542 }
2543
2544 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2545 {
2546         mlog_entry_void();
2547
2548         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2549
2550         ocfs2_drop_osb_locks(osb);
2551
2552         if (osb->dc_task) {
2553                 kthread_stop(osb->dc_task);
2554                 osb->dc_task = NULL;
2555         }
2556
2557         ocfs2_lock_res_free(&osb->osb_super_lockres);
2558         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2559
2560         dlm_unregister_domain(osb->dlm);
2561         osb->dlm = NULL;
2562
2563         ocfs2_dlm_shutdown_debug(osb);
2564
2565         mlog_exit_void();
2566 }
2567
2568 static void ocfs2_unlock_ast(void *opaque, int error)
2569 {
2570         struct ocfs2_lock_res *lockres = opaque;
2571         unsigned long flags;
2572
2573         mlog_entry_void();
2574
2575         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2576              lockres->l_unlock_action);
2577
2578         spin_lock_irqsave(&lockres->l_lock, flags);
2579         /* We tried to cancel a convert request, but it was already
2580          * granted. All we want to do here is clear our unlock
2581          * state. The wake_up call done at the bottom is redundant
2582          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2583          * hurt anything anyway */
2584         if (error == -DLM_ECANCEL &&
2585             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2586                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2587
2588                 /* We don't clear the busy flag in this case as it
2589                  * should have been cleared by the ast which the dlm
2590                  * has called. */
2591                 goto complete_unlock;
2592         }
2593
2594         /* DLM_EUNLOCK is the success code for unlock */
2595         if (error != -DLM_EUNLOCK) {
2596                 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
2597                      "unlock_action %d\n", error, lockres->l_name,
2598                      lockres->l_unlock_action);
2599                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2600                 return;
2601         }
2602
2603         switch(lockres->l_unlock_action) {
2604         case OCFS2_UNLOCK_CANCEL_CONVERT:
2605                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2606                 lockres->l_action = OCFS2_AST_INVALID;
2607                 break;
2608         case OCFS2_UNLOCK_DROP_LOCK:
2609                 lockres->l_level = DLM_LOCK_IV;
2610                 break;
2611         default:
2612                 BUG();
2613         }
2614
2615         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2616 complete_unlock:
2617         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2618         spin_unlock_irqrestore(&lockres->l_lock, flags);
2619
2620         wake_up(&lockres->l_event);
2621
2622         mlog_exit_void();
2623 }
2624
2625 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2626                            struct ocfs2_lock_res *lockres)
2627 {
2628         int ret;
2629         unsigned long flags;
2630         u32 lkm_flags = 0;
2631
2632         /* We didn't get anywhere near actually using this lockres. */
2633         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2634                 goto out;
2635
2636         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2637                 lkm_flags |= DLM_LKF_VALBLK;
2638
2639         spin_lock_irqsave(&lockres->l_lock, flags);
2640
2641         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2642                         "lockres %s, flags 0x%lx\n",
2643                         lockres->l_name, lockres->l_flags);
2644
2645         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2646                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2647                      "%u, unlock_action = %u\n",
2648                      lockres->l_name, lockres->l_flags, lockres->l_action,
2649                      lockres->l_unlock_action);
2650
2651                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2652
2653                 /* XXX: Today we just wait on any busy
2654                  * locks... Perhaps we need to cancel converts in the
2655                  * future? */
2656                 ocfs2_wait_on_busy_lock(lockres);
2657
2658                 spin_lock_irqsave(&lockres->l_lock, flags);
2659         }
2660
2661         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2662                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2663                     lockres->l_level == DLM_LOCK_EX &&
2664                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2665                         lockres->l_ops->set_lvb(lockres);
2666         }
2667
2668         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2669                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2670                      lockres->l_name);
2671         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2672                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2673
2674         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2675                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2676                 goto out;
2677         }
2678
2679         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2680
2681         /* make sure we never get here while waiting for an ast to
2682          * fire. */
2683         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2684
2685         /* is this necessary? */
2686         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2687         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2688         spin_unlock_irqrestore(&lockres->l_lock, flags);
2689
2690         mlog(0, "lock %s\n", lockres->l_name);
2691
2692         ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2693                                lockres);
2694         if (ret) {
2695                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
2696                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2697                 /* XXX Need to abstract this */
2698                 dlm_print_one_lock(lockres->l_lksb.lksb_o2dlm.lockid);
2699                 BUG();
2700         }
2701         mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
2702              lockres->l_name);
2703
2704         ocfs2_wait_on_busy_lock(lockres);
2705 out:
2706         mlog_exit(0);
2707         return 0;
2708 }
2709
2710 /* Mark the lockres as being dropped. It will no longer be
2711  * queued if blocking, but we still may have to wait on it
2712  * being dequeued from the downconvert thread before we can consider
2713  * it safe to drop. 
2714  *
2715  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2716 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2717 {
2718         int status;
2719         struct ocfs2_mask_waiter mw;
2720         unsigned long flags;
2721
2722         ocfs2_init_mask_waiter(&mw);
2723
2724         spin_lock_irqsave(&lockres->l_lock, flags);
2725         lockres->l_flags |= OCFS2_LOCK_FREEING;
2726         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2727                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2728                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2729
2730                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2731
2732                 status = ocfs2_wait_for_mask(&mw);
2733                 if (status)
2734                         mlog_errno(status);
2735
2736                 spin_lock_irqsave(&lockres->l_lock, flags);
2737         }
2738         spin_unlock_irqrestore(&lockres->l_lock, flags);
2739 }
2740
2741 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2742                                struct ocfs2_lock_res *lockres)
2743 {
2744         int ret;
2745
2746         ocfs2_mark_lockres_freeing(lockres);
2747         ret = ocfs2_drop_lock(osb, lockres);
2748         if (ret)
2749                 mlog_errno(ret);
2750 }
2751
2752 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2753 {
2754         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2755         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2756 }
2757
2758 int ocfs2_drop_inode_locks(struct inode *inode)
2759 {
2760         int status, err;
2761
2762         mlog_entry_void();
2763
2764         /* No need to call ocfs2_mark_lockres_freeing here -
2765          * ocfs2_clear_inode has done it for us. */
2766
2767         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2768                               &OCFS2_I(inode)->ip_open_lockres);
2769         if (err < 0)
2770                 mlog_errno(err);
2771
2772         status = err;
2773
2774         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2775                               &OCFS2_I(inode)->ip_inode_lockres);
2776         if (err < 0)
2777                 mlog_errno(err);
2778         if (err < 0 && !status)
2779                 status = err;
2780
2781         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2782                               &OCFS2_I(inode)->ip_rw_lockres);
2783         if (err < 0)
2784                 mlog_errno(err);
2785         if (err < 0 && !status)
2786                 status = err;
2787
2788         mlog_exit(status);
2789         return status;
2790 }
2791
2792 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2793                                       int new_level)
2794 {
2795         assert_spin_locked(&lockres->l_lock);
2796
2797         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
2798
2799         if (lockres->l_level <= new_level) {
2800                 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
2801                      lockres->l_level, new_level);
2802                 BUG();
2803         }
2804
2805         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2806              lockres->l_name, new_level, lockres->l_blocking);
2807
2808         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2809         lockres->l_requested = new_level;
2810         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2811 }
2812
2813 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2814                                   struct ocfs2_lock_res *lockres,
2815                                   int new_level,
2816                                   int lvb)
2817 {
2818         int ret;
2819         u32 dlm_flags = DLM_LKF_CONVERT;
2820
2821         mlog_entry_void();
2822
2823         if (lvb)
2824                 dlm_flags |= DLM_LKF_VALBLK;
2825
2826         ret = ocfs2_dlm_lock(osb->dlm,
2827                              new_level,
2828                              &lockres->l_lksb,
2829                              dlm_flags,
2830                              lockres->l_name,
2831                              OCFS2_LOCK_ID_MAX_LEN - 1,
2832                              lockres);
2833         if (ret) {
2834                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2835                 ocfs2_recover_from_dlm_error(lockres, 1);
2836                 goto bail;
2837         }
2838
2839         ret = 0;
2840 bail:
2841         mlog_exit(ret);
2842         return ret;
2843 }
2844
2845 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
2846 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2847                                         struct ocfs2_lock_res *lockres)
2848 {
2849         assert_spin_locked(&lockres->l_lock);
2850
2851         mlog_entry_void();
2852         mlog(0, "lock %s\n", lockres->l_name);
2853
2854         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2855                 /* If we're already trying to cancel a lock conversion
2856                  * then just drop the spinlock and allow the caller to
2857                  * requeue this lock. */
2858
2859                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2860                 return 0;
2861         }
2862
2863         /* were we in a convert when we got the bast fire? */
2864         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2865                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2866         /* set things up for the unlockast to know to just
2867          * clear out the ast_action and unset busy, etc. */
2868         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2869
2870         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2871                         "lock %s, invalid flags: 0x%lx\n",
2872                         lockres->l_name, lockres->l_flags);
2873
2874         return 1;
2875 }
2876
2877 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2878                                 struct ocfs2_lock_res *lockres)
2879 {
2880         int ret;
2881
2882         mlog_entry_void();
2883         mlog(0, "lock %s\n", lockres->l_name);
2884
2885         ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb,
2886                                DLM_LKF_CANCEL, lockres);
2887         if (ret) {
2888                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
2889                 ocfs2_recover_from_dlm_error(lockres, 0);
2890         }
2891
2892         mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
2893
2894         mlog_exit(ret);
2895         return ret;
2896 }
2897
2898 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2899                               struct ocfs2_lock_res *lockres,
2900                               struct ocfs2_unblock_ctl *ctl)
2901 {
2902         unsigned long flags;
2903         int blocking;
2904         int new_level;
2905         int ret = 0;
2906         int set_lvb = 0;
2907
2908         mlog_entry_void();
2909
2910         spin_lock_irqsave(&lockres->l_lock, flags);
2911
2912         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2913
2914 recheck:
2915         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2916                 ctl->requeue = 1;
2917                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2918                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2919                 if (ret) {
2920                         ret = ocfs2_cancel_convert(osb, lockres);
2921                         if (ret < 0)
2922                                 mlog_errno(ret);
2923                 }
2924                 goto leave;
2925         }
2926
2927         /* if we're blocking an exclusive and we have *any* holders,
2928          * then requeue. */
2929         if ((lockres->l_blocking == DLM_LOCK_EX)
2930             && (lockres->l_ex_holders || lockres->l_ro_holders))
2931                 goto leave_requeue;
2932
2933         /* If it's a PR we're blocking, then only
2934          * requeue if we've got any EX holders */
2935         if (lockres->l_blocking == DLM_LOCK_PR &&
2936             lockres->l_ex_holders)
2937                 goto leave_requeue;
2938
2939         /*
2940          * Can we get a lock in this state if the holder counts are
2941          * zero? The meta data unblock code used to check this.
2942          */
2943         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2944             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2945                 goto leave_requeue;
2946
2947         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2948
2949         if (lockres->l_ops->check_downconvert
2950             && !lockres->l_ops->check_downconvert(lockres, new_level))
2951                 goto leave_requeue;
2952
2953         /* If we get here, then we know that there are no more
2954          * incompatible holders (and anyone asking for an incompatible
2955          * lock is blocked). We can now downconvert the lock */
2956         if (!lockres->l_ops->downconvert_worker)
2957                 goto downconvert;
2958
2959         /* Some lockres types want to do a bit of work before
2960          * downconverting a lock. Allow that here. The worker function
2961          * may sleep, so we save off a copy of what we're blocking as
2962          * it may change while we're not holding the spin lock. */
2963         blocking = lockres->l_blocking;
2964         spin_unlock_irqrestore(&lockres->l_lock, flags);
2965
2966         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2967
2968         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2969                 goto leave;
2970
2971         spin_lock_irqsave(&lockres->l_lock, flags);
2972         if (blocking != lockres->l_blocking) {
2973                 /* If this changed underneath us, then we can't drop
2974                  * it just yet. */
2975                 goto recheck;
2976         }
2977
2978 downconvert:
2979         ctl->requeue = 0;
2980
2981         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2982                 if (lockres->l_level == DLM_LOCK_EX)
2983                         set_lvb = 1;
2984
2985                 /*
2986                  * We only set the lvb if the lock has been fully
2987                  * refreshed - otherwise we risk setting stale
2988                  * data. Otherwise, there's no need to actually clear
2989                  * out the lvb here as it's value is still valid.
2990                  */
2991                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2992                         lockres->l_ops->set_lvb(lockres);
2993         }
2994
2995         ocfs2_prepare_downconvert(lockres, new_level);
2996         spin_unlock_irqrestore(&lockres->l_lock, flags);
2997         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2998 leave:
2999         mlog_exit(ret);
3000         return ret;
3001
3002 leave_requeue:
3003         spin_unlock_irqrestore(&lockres->l_lock, flags);
3004         ctl->requeue = 1;
3005
3006         mlog_exit(0);
3007         return 0;
3008 }
3009
3010 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3011                                      int blocking)
3012 {
3013         struct inode *inode;
3014         struct address_space *mapping;
3015
3016         inode = ocfs2_lock_res_inode(lockres);
3017         mapping = inode->i_mapping;
3018
3019         if (!S_ISREG(inode->i_mode))
3020                 goto out;
3021
3022         /*
3023          * We need this before the filemap_fdatawrite() so that it can
3024          * transfer the dirty bit from the PTE to the
3025          * page. Unfortunately this means that even for EX->PR
3026          * downconverts, we'll lose our mappings and have to build
3027          * them up again.
3028          */
3029         unmap_mapping_range(mapping, 0, 0, 0);
3030
3031         if (filemap_fdatawrite(mapping)) {
3032                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3033                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3034         }
3035         sync_mapping_buffers(mapping);
3036         if (blocking == DLM_LOCK_EX) {
3037                 truncate_inode_pages(mapping, 0);
3038         } else {
3039                 /* We only need to wait on the I/O if we're not also
3040                  * truncating pages because truncate_inode_pages waits
3041                  * for us above. We don't truncate pages if we're
3042                  * blocking anything < EXMODE because we want to keep
3043                  * them around in that case. */
3044                 filemap_fdatawait(mapping);
3045         }
3046
3047 out:
3048         return UNBLOCK_CONTINUE;
3049 }
3050
3051 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3052                                         int new_level)
3053 {
3054         struct inode *inode = ocfs2_lock_res_inode(lockres);
3055         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3056
3057         BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3058         BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3059
3060         if (checkpointed)
3061                 return 1;
3062
3063         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3064         return 0;
3065 }
3066
3067 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3068 {
3069         struct inode *inode = ocfs2_lock_res_inode(lockres);
3070
3071         __ocfs2_stuff_meta_lvb(inode);
3072 }
3073
3074 /*
3075  * Does the final reference drop on our dentry lock. Right now this
3076  * happens in the downconvert thread, but we could choose to simplify the
3077  * dlmglue API and push these off to the ocfs2_wq in the future.
3078  */
3079 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3080                                      struct ocfs2_lock_res *lockres)
3081 {
3082         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3083         ocfs2_dentry_lock_put(osb, dl);
3084 }
3085
3086 /*
3087  * d_delete() matching dentries before the lock downconvert.
3088  *
3089  * At this point, any process waiting to destroy the
3090  * dentry_lock due to last ref count is stopped by the
3091  * OCFS2_LOCK_QUEUED flag.
3092  *
3093  * We have two potential problems
3094  *
3095  * 1) If we do the last reference drop on our dentry_lock (via dput)
3096  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3097  *    the downconvert to finish. Instead we take an elevated
3098  *    reference and push the drop until after we've completed our
3099  *    unblock processing.
3100  *
3101  * 2) There might be another process with a final reference,
3102  *    waiting on us to finish processing. If this is the case, we
3103  *    detect it and exit out - there's no more dentries anyway.
3104  */
3105 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3106                                        int blocking)
3107 {
3108         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3109         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3110         struct dentry *dentry;
3111         unsigned long flags;
3112         int extra_ref = 0;
3113
3114         /*
3115          * This node is blocking another node from getting a read
3116          * lock. This happens when we've renamed within a
3117          * directory. We've forced the other nodes to d_delete(), but
3118          * we never actually dropped our lock because it's still
3119          * valid. The downconvert code will retain a PR for this node,
3120          * so there's no further work to do.
3121          */
3122         if (blocking == DLM_LOCK_PR)
3123                 return UNBLOCK_CONTINUE;
3124
3125         /*
3126          * Mark this inode as potentially orphaned. The code in
3127          * ocfs2_delete_inode() will figure out whether it actually
3128          * needs to be freed or not.
3129          */
3130         spin_lock(&oi->ip_lock);
3131         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3132         spin_unlock(&oi->ip_lock);
3133
3134         /*
3135          * Yuck. We need to make sure however that the check of
3136          * OCFS2_LOCK_FREEING and the extra reference are atomic with
3137          * respect to a reference decrement or the setting of that
3138          * flag.
3139          */
3140         spin_lock_irqsave(&lockres->l_lock, flags);
3141         spin_lock(&dentry_attach_lock);
3142         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3143             && dl->dl_count) {
3144                 dl->dl_count++;
3145                 extra_ref = 1;
3146         }
3147         spin_unlock(&dentry_attach_lock);
3148         spin_unlock_irqrestore(&lockres->l_lock, flags);
3149
3150         mlog(0, "extra_ref = %d\n", extra_ref);
3151
3152         /*
3153          * We have a process waiting on us in ocfs2_dentry_iput(),
3154          * which means we can't have any more outstanding
3155          * aliases. There's no need to do any more work.
3156          */
3157         if (!extra_ref)
3158                 return UNBLOCK_CONTINUE;
3159
3160         spin_lock(&dentry_attach_lock);
3161         while (1) {
3162                 dentry = ocfs2_find_local_alias(dl->dl_inode,
3163                                                 dl->dl_parent_blkno, 1);
3164                 if (!dentry)
3165                         break;
3166                 spin_unlock(&dentry_attach_lock);
3167
3168                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3169                      dentry->d_name.name);
3170
3171                 /*
3172                  * The following dcache calls may do an
3173                  * iput(). Normally we don't want that from the
3174                  * downconverting thread, but in this case it's ok
3175                  * because the requesting node already has an
3176                  * exclusive lock on the inode, so it can't be queued
3177                  * for a downconvert.
3178                  */
3179                 d_delete(dentry);
3180                 dput(dentry);
3181
3182                 spin_lock(&dentry_attach_lock);
3183         }
3184         spin_unlock(&dentry_attach_lock);
3185
3186         /*
3187          * If we are the last holder of this dentry lock, there is no
3188          * reason to downconvert so skip straight to the unlock.
3189          */
3190         if (dl->dl_count == 1)
3191                 return UNBLOCK_STOP_POST;
3192
3193         return UNBLOCK_CONTINUE_POST;
3194 }
3195
3196 static struct ocfs2_locking_protocol lproto = {
3197         .lp_lock_ast            = ocfs2_locking_ast,
3198         .lp_blocking_ast        = ocfs2_blocking_ast,
3199         .lp_unlock_ast          = ocfs2_unlock_ast,
3200 };
3201
3202 /* This interface isn't the final one, hence the less-than-perfect names */
3203 void dlmglue_init_stack(void)
3204 {
3205         o2cb_get_stack(&lproto);
3206 }
3207
3208 void dlmglue_exit_stack(void)
3209 {
3210         o2cb_put_stack();
3211 }
3212
3213 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3214                                        struct ocfs2_lock_res *lockres)
3215 {
3216         int status;
3217         struct ocfs2_unblock_ctl ctl = {0, 0,};
3218         unsigned long flags;
3219
3220         /* Our reference to the lockres in this function can be
3221          * considered valid until we remove the OCFS2_LOCK_QUEUED
3222          * flag. */
3223
3224         mlog_entry_void();
3225
3226         BUG_ON(!lockres);
3227         BUG_ON(!lockres->l_ops);
3228
3229         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3230
3231         /* Detect whether a lock has been marked as going away while
3232          * the downconvert thread was processing other things. A lock can
3233          * still be marked with OCFS2_LOCK_FREEING after this check,
3234          * but short circuiting here will still save us some
3235          * performance. */
3236         spin_lock_irqsave(&lockres->l_lock, flags);
3237         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3238                 goto unqueue;
3239         spin_unlock_irqrestore(&lockres->l_lock, flags);
3240
3241         status = ocfs2_unblock_lock(osb, lockres, &ctl);
3242         if (status < 0)
3243                 mlog_errno(status);
3244
3245         spin_lock_irqsave(&lockres->l_lock, flags);
3246 unqueue:
3247         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3248                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3249         } else
3250                 ocfs2_schedule_blocked_lock(osb, lockres);
3251
3252         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3253              ctl.requeue ? "yes" : "no");
3254         spin_unlock_irqrestore(&lockres->l_lock, flags);
3255
3256         if (ctl.unblock_action != UNBLOCK_CONTINUE
3257             && lockres->l_ops->post_unlock)
3258                 lockres->l_ops->post_unlock(osb, lockres);
3259
3260         mlog_exit_void();
3261 }
3262
3263 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3264                                         struct ocfs2_lock_res *lockres)
3265 {
3266         mlog_entry_void();
3267
3268         assert_spin_locked(&lockres->l_lock);
3269
3270         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3271                 /* Do not schedule a lock for downconvert when it's on
3272                  * the way to destruction - any nodes wanting access
3273                  * to the resource will get it soon. */
3274                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3275                      lockres->l_name, lockres->l_flags);
3276                 return;
3277         }
3278
3279         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3280
3281         spin_lock(&osb->dc_task_lock);
3282         if (list_empty(&lockres->l_blocked_list)) {
3283                 list_add_tail(&lockres->l_blocked_list,
3284                               &osb->blocked_lock_list);
3285                 osb->blocked_lock_count++;
3286         }
3287         spin_unlock(&osb->dc_task_lock);
3288
3289         mlog_exit_void();
3290 }
3291
3292 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3293 {
3294         unsigned long processed;
3295         struct ocfs2_lock_res *lockres;
3296
3297         mlog_entry_void();
3298
3299         spin_lock(&osb->dc_task_lock);
3300         /* grab this early so we know to try again if a state change and
3301          * wake happens part-way through our work  */
3302         osb->dc_work_sequence = osb->dc_wake_sequence;
3303
3304         processed = osb->blocked_lock_count;
3305         while (processed) {
3306                 BUG_ON(list_empty(&osb->blocked_lock_list));
3307
3308                 lockres = list_entry(osb->blocked_lock_list.next,
3309                                      struct ocfs2_lock_res, l_blocked_list);
3310                 list_del_init(&lockres->l_blocked_list);
3311                 osb->blocked_lock_count--;
3312                 spin_unlock(&osb->dc_task_lock);
3313
3314                 BUG_ON(!processed);
3315                 processed--;
3316
3317                 ocfs2_process_blocked_lock(osb, lockres);
3318
3319                 spin_lock(&osb->dc_task_lock);
3320         }
3321         spin_unlock(&osb->dc_task_lock);
3322
3323         mlog_exit_void();
3324 }
3325
3326 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3327 {
3328         int empty = 0;
3329
3330         spin_lock(&osb->dc_task_lock);
3331         if (list_empty(&osb->blocked_lock_list))
3332                 empty = 1;
3333
3334         spin_unlock(&osb->dc_task_lock);
3335         return empty;
3336 }
3337
3338 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3339 {
3340         int should_wake = 0;
3341
3342         spin_lock(&osb->dc_task_lock);
3343         if (osb->dc_work_sequence != osb->dc_wake_sequence)
3344                 should_wake = 1;
3345         spin_unlock(&osb->dc_task_lock);
3346
3347         return should_wake;
3348 }
3349
3350 static int ocfs2_downconvert_thread(void *arg)
3351 {
3352         int status = 0;
3353         struct ocfs2_super *osb = arg;
3354
3355         /* only quit once we've been asked to stop and there is no more
3356          * work available */
3357         while (!(kthread_should_stop() &&
3358                 ocfs2_downconvert_thread_lists_empty(osb))) {
3359
3360                 wait_event_interruptible(osb->dc_event,
3361                                          ocfs2_downconvert_thread_should_wake(osb) ||
3362                                          kthread_should_stop());
3363
3364                 mlog(0, "downconvert_thread: awoken\n");
3365
3366                 ocfs2_downconvert_thread_do_work(osb);
3367         }
3368
3369         osb->dc_task = NULL;
3370         return status;
3371 }
3372
3373 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3374 {
3375         spin_lock(&osb->dc_task_lock);
3376         /* make sure the voting thread gets a swipe at whatever changes
3377          * the caller may have made to the voting state */
3378         osb->dc_wake_sequence++;
3379         spin_unlock(&osb->dc_task_lock);
3380         wake_up(&osb->dc_event);
3381 }