]> nv-tegra.nvidia Code Review - linux-2.6.git/blob - fs/dlm/lockspace.c
Merge branch 'master' into gfs2
[linux-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return snprintf(buf, PAGE_SIZE, "%x\n", status);
82 }
83
84 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 {
86         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
87 }
88
89 struct dlm_attr {
90         struct attribute attr;
91         ssize_t (*show)(struct dlm_ls *, char *);
92         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
93 };
94
95 static struct dlm_attr dlm_attr_control = {
96         .attr  = {.name = "control", .mode = S_IWUSR},
97         .store = dlm_control_store
98 };
99
100 static struct dlm_attr dlm_attr_event = {
101         .attr  = {.name = "event_done", .mode = S_IWUSR},
102         .store = dlm_event_store
103 };
104
105 static struct dlm_attr dlm_attr_id = {
106         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
107         .show  = dlm_id_show,
108         .store = dlm_id_store
109 };
110
111 static struct dlm_attr dlm_attr_recover_status = {
112         .attr  = {.name = "recover_status", .mode = S_IRUGO},
113         .show  = dlm_recover_status_show
114 };
115
116 static struct dlm_attr dlm_attr_recover_nodeid = {
117         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
118         .show  = dlm_recover_nodeid_show
119 };
120
121 static struct attribute *dlm_attrs[] = {
122         &dlm_attr_control.attr,
123         &dlm_attr_event.attr,
124         &dlm_attr_id.attr,
125         &dlm_attr_recover_status.attr,
126         &dlm_attr_recover_nodeid.attr,
127         NULL,
128 };
129
130 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
131                              char *buf)
132 {
133         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
134         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
135         return a->show ? a->show(ls, buf) : 0;
136 }
137
138 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
139                               const char *buf, size_t len)
140 {
141         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
142         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
143         return a->store ? a->store(ls, buf, len) : len;
144 }
145
146 static struct sysfs_ops dlm_attr_ops = {
147         .show  = dlm_attr_show,
148         .store = dlm_attr_store,
149 };
150
151 static struct kobj_type dlm_ktype = {
152         .default_attrs = dlm_attrs,
153         .sysfs_ops     = &dlm_attr_ops,
154 };
155
156 static struct kset dlm_kset = {
157         .subsys = &kernel_subsys,
158         .kobj   = {.name = "dlm",},
159         .ktype  = &dlm_ktype,
160 };
161
162 static int kobject_setup(struct dlm_ls *ls)
163 {
164         char lsname[DLM_LOCKSPACE_LEN];
165         int error;
166
167         memset(lsname, 0, DLM_LOCKSPACE_LEN);
168         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
169
170         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
171         if (error)
172                 return error;
173
174         ls->ls_kobj.kset = &dlm_kset;
175         ls->ls_kobj.ktype = &dlm_ktype;
176         return 0;
177 }
178
179 static int do_uevent(struct dlm_ls *ls, int in)
180 {
181         int error;
182
183         if (in)
184                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
185         else
186                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
187
188         error = wait_event_interruptible(ls->ls_uevent_wait,
189                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
190         if (error)
191                 goto out;
192
193         error = ls->ls_uevent_result;
194  out:
195         return error;
196 }
197
198
199 int dlm_lockspace_init(void)
200 {
201         int error;
202
203         ls_count = 0;
204         mutex_init(&ls_lock);
205         INIT_LIST_HEAD(&lslist);
206         spin_lock_init(&lslist_lock);
207
208         error = kset_register(&dlm_kset);
209         if (error)
210                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
211         return error;
212 }
213
214 void dlm_lockspace_exit(void)
215 {
216         kset_unregister(&dlm_kset);
217 }
218
219 static int dlm_scand(void *data)
220 {
221         struct dlm_ls *ls;
222
223         while (!kthread_should_stop()) {
224                 list_for_each_entry(ls, &lslist, ls_list)
225                         dlm_scan_rsbs(ls);
226                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
227         }
228         return 0;
229 }
230
231 static int dlm_scand_start(void)
232 {
233         struct task_struct *p;
234         int error = 0;
235
236         p = kthread_run(dlm_scand, NULL, "dlm_scand");
237         if (IS_ERR(p))
238                 error = PTR_ERR(p);
239         else
240                 scand_task = p;
241         return error;
242 }
243
244 static void dlm_scand_stop(void)
245 {
246         kthread_stop(scand_task);
247 }
248
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
250 {
251         struct dlm_ls *ls;
252
253         spin_lock(&lslist_lock);
254
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (ls->ls_namelen == namelen &&
257                     memcmp(ls->ls_name, name, namelen) == 0)
258                         goto out;
259         }
260         ls = NULL;
261  out:
262         spin_unlock(&lslist_lock);
263         return ls;
264 }
265
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
267 {
268         struct dlm_ls *ls;
269
270         spin_lock(&lslist_lock);
271
272         list_for_each_entry(ls, &lslist, ls_list) {
273                 if (ls->ls_global_id == id) {
274                         ls->ls_count++;
275                         goto out;
276                 }
277         }
278         ls = NULL;
279  out:
280         spin_unlock(&lslist_lock);
281         return ls;
282 }
283
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289         list_for_each_entry(ls, &lslist, ls_list) {
290                 if (ls->ls_local_handle == lockspace) {
291                         ls->ls_count++;
292                         goto out;
293                 }
294         }
295         ls = NULL;
296  out:
297         spin_unlock(&lslist_lock);
298         return ls;
299 }
300
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
302 {
303         struct dlm_ls *ls;
304
305         spin_lock(&lslist_lock);
306         list_for_each_entry(ls, &lslist, ls_list) {
307                 if (ls->ls_device.minor == minor) {
308                         ls->ls_count++;
309                         goto out;
310                 }
311         }
312         ls = NULL;
313  out:
314         spin_unlock(&lslist_lock);
315         return ls;
316 }
317
318 void dlm_put_lockspace(struct dlm_ls *ls)
319 {
320         spin_lock(&lslist_lock);
321         ls->ls_count--;
322         spin_unlock(&lslist_lock);
323 }
324
325 static void remove_lockspace(struct dlm_ls *ls)
326 {
327         for (;;) {
328                 spin_lock(&lslist_lock);
329                 if (ls->ls_count == 0) {
330                         list_del(&ls->ls_list);
331                         spin_unlock(&lslist_lock);
332                         return;
333                 }
334                 spin_unlock(&lslist_lock);
335                 ssleep(1);
336         }
337 }
338
339 static int threads_start(void)
340 {
341         int error;
342
343         /* Thread which process lock requests for all lockspace's */
344         error = dlm_astd_start();
345         if (error) {
346                 log_print("cannot start dlm_astd thread %d", error);
347                 goto fail;
348         }
349
350         error = dlm_scand_start();
351         if (error) {
352                 log_print("cannot start dlm_scand thread %d", error);
353                 goto astd_fail;
354         }
355
356         /* Thread for sending/receiving messages for all lockspace's */
357         error = dlm_lowcomms_start();
358         if (error) {
359                 log_print("cannot start dlm lowcomms %d", error);
360                 goto scand_fail;
361         }
362
363         return 0;
364
365  scand_fail:
366         dlm_scand_stop();
367  astd_fail:
368         dlm_astd_stop();
369  fail:
370         return error;
371 }
372
373 static void threads_stop(void)
374 {
375         dlm_scand_stop();
376         dlm_lowcomms_stop();
377         dlm_astd_stop();
378 }
379
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381                          uint32_t flags, int lvblen)
382 {
383         struct dlm_ls *ls;
384         int i, size, error = -ENOMEM;
385
386         if (namelen > DLM_LOCKSPACE_LEN)
387                 return -EINVAL;
388
389         if (!lvblen || (lvblen % 8))
390                 return -EINVAL;
391
392         if (!try_module_get(THIS_MODULE))
393                 return -EINVAL;
394
395         ls = dlm_find_lockspace_name(name, namelen);
396         if (ls) {
397                 *lockspace = ls;
398                 module_put(THIS_MODULE);
399                 return -EEXIST;
400         }
401
402         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
403         if (!ls)
404                 goto out;
405         memcpy(ls->ls_name, name, namelen);
406         ls->ls_namelen = namelen;
407         ls->ls_exflags = flags;
408         ls->ls_lvblen = lvblen;
409         ls->ls_count = 0;
410         ls->ls_flags = 0;
411
412         size = dlm_config.rsbtbl_size;
413         ls->ls_rsbtbl_size = size;
414
415         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
416         if (!ls->ls_rsbtbl)
417                 goto out_lsfree;
418         for (i = 0; i < size; i++) {
419                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
420                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
421                 rwlock_init(&ls->ls_rsbtbl[i].lock);
422         }
423
424         size = dlm_config.lkbtbl_size;
425         ls->ls_lkbtbl_size = size;
426
427         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
428         if (!ls->ls_lkbtbl)
429                 goto out_rsbfree;
430         for (i = 0; i < size; i++) {
431                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
432                 rwlock_init(&ls->ls_lkbtbl[i].lock);
433                 ls->ls_lkbtbl[i].counter = 1;
434         }
435
436         size = dlm_config.dirtbl_size;
437         ls->ls_dirtbl_size = size;
438
439         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
440         if (!ls->ls_dirtbl)
441                 goto out_lkbfree;
442         for (i = 0; i < size; i++) {
443                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
444                 rwlock_init(&ls->ls_dirtbl[i].lock);
445         }
446
447         INIT_LIST_HEAD(&ls->ls_waiters);
448         mutex_init(&ls->ls_waiters_mutex);
449
450         INIT_LIST_HEAD(&ls->ls_nodes);
451         INIT_LIST_HEAD(&ls->ls_nodes_gone);
452         ls->ls_num_nodes = 0;
453         ls->ls_low_nodeid = 0;
454         ls->ls_total_weight = 0;
455         ls->ls_node_array = NULL;
456
457         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
458         ls->ls_stub_rsb.res_ls = ls;
459
460         ls->ls_debug_rsb_dentry = NULL;
461         ls->ls_debug_waiters_dentry = NULL;
462
463         init_waitqueue_head(&ls->ls_uevent_wait);
464         ls->ls_uevent_result = 0;
465
466         ls->ls_recoverd_task = NULL;
467         mutex_init(&ls->ls_recoverd_active);
468         spin_lock_init(&ls->ls_recover_lock);
469         ls->ls_recover_status = 0;
470         ls->ls_recover_seq = 0;
471         ls->ls_recover_args = NULL;
472         init_rwsem(&ls->ls_in_recovery);
473         INIT_LIST_HEAD(&ls->ls_requestqueue);
474         mutex_init(&ls->ls_requestqueue_mutex);
475         mutex_init(&ls->ls_clear_proc_locks);
476
477         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
478         if (!ls->ls_recover_buf)
479                 goto out_dirfree;
480
481         INIT_LIST_HEAD(&ls->ls_recover_list);
482         spin_lock_init(&ls->ls_recover_list_lock);
483         ls->ls_recover_list_count = 0;
484         ls->ls_local_handle = ls;
485         init_waitqueue_head(&ls->ls_wait_general);
486         INIT_LIST_HEAD(&ls->ls_root_list);
487         init_rwsem(&ls->ls_root_sem);
488
489         down_write(&ls->ls_in_recovery);
490
491         spin_lock(&lslist_lock);
492         list_add(&ls->ls_list, &lslist);
493         spin_unlock(&lslist_lock);
494
495         /* needs to find ls in lslist */
496         error = dlm_recoverd_start(ls);
497         if (error) {
498                 log_error(ls, "can't start dlm_recoverd %d", error);
499                 goto out_rcomfree;
500         }
501
502         dlm_create_debug_file(ls);
503
504         error = kobject_setup(ls);
505         if (error)
506                 goto out_del;
507
508         error = kobject_register(&ls->ls_kobj);
509         if (error)
510                 goto out_del;
511
512         error = do_uevent(ls, 1);
513         if (error)
514                 goto out_unreg;
515
516         *lockspace = ls;
517         return 0;
518
519  out_unreg:
520         kobject_unregister(&ls->ls_kobj);
521  out_del:
522         dlm_delete_debug_file(ls);
523         dlm_recoverd_stop(ls);
524  out_rcomfree:
525         spin_lock(&lslist_lock);
526         list_del(&ls->ls_list);
527         spin_unlock(&lslist_lock);
528         kfree(ls->ls_recover_buf);
529  out_dirfree:
530         kfree(ls->ls_dirtbl);
531  out_lkbfree:
532         kfree(ls->ls_lkbtbl);
533  out_rsbfree:
534         kfree(ls->ls_rsbtbl);
535  out_lsfree:
536         kfree(ls);
537  out:
538         module_put(THIS_MODULE);
539         return error;
540 }
541
542 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
543                       uint32_t flags, int lvblen)
544 {
545         int error = 0;
546
547         mutex_lock(&ls_lock);
548         if (!ls_count)
549                 error = threads_start();
550         if (error)
551                 goto out;
552
553         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
554         if (!error)
555                 ls_count++;
556  out:
557         mutex_unlock(&ls_lock);
558         return error;
559 }
560
561 /* Return 1 if the lockspace still has active remote locks,
562  *        2 if the lockspace still has active local locks.
563  */
564 static int lockspace_busy(struct dlm_ls *ls)
565 {
566         int i, lkb_found = 0;
567         struct dlm_lkb *lkb;
568
569         /* NOTE: We check the lockidtbl here rather than the resource table.
570            This is because there may be LKBs queued as ASTs that have been
571            unlinked from their RSBs and are pending deletion once the AST has
572            been delivered */
573
574         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
575                 read_lock(&ls->ls_lkbtbl[i].lock);
576                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
577                         lkb_found = 1;
578                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
579                                             lkb_idtbl_list) {
580                                 if (!lkb->lkb_nodeid) {
581                                         read_unlock(&ls->ls_lkbtbl[i].lock);
582                                         return 2;
583                                 }
584                         }
585                 }
586                 read_unlock(&ls->ls_lkbtbl[i].lock);
587         }
588         return lkb_found;
589 }
590
591 static int release_lockspace(struct dlm_ls *ls, int force)
592 {
593         struct dlm_lkb *lkb;
594         struct dlm_rsb *rsb;
595         struct list_head *head;
596         int i;
597         int busy = lockspace_busy(ls);
598
599         if (busy > force)
600                 return -EBUSY;
601
602         if (force < 3)
603                 do_uevent(ls, 0);
604
605         dlm_recoverd_stop(ls);
606
607         remove_lockspace(ls);
608
609         dlm_delete_debug_file(ls);
610
611         dlm_astd_suspend();
612
613         kfree(ls->ls_recover_buf);
614
615         /*
616          * Free direntry structs.
617          */
618
619         dlm_dir_clear(ls);
620         kfree(ls->ls_dirtbl);
621
622         /*
623          * Free all lkb's on lkbtbl[] lists.
624          */
625
626         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
627                 head = &ls->ls_lkbtbl[i].list;
628                 while (!list_empty(head)) {
629                         lkb = list_entry(head->next, struct dlm_lkb,
630                                          lkb_idtbl_list);
631
632                         list_del(&lkb->lkb_idtbl_list);
633
634                         dlm_del_ast(lkb);
635
636                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
637                                 free_lvb(lkb->lkb_lvbptr);
638
639                         free_lkb(lkb);
640                 }
641         }
642         dlm_astd_resume();
643
644         kfree(ls->ls_lkbtbl);
645
646         /*
647          * Free all rsb's on rsbtbl[] lists
648          */
649
650         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
651                 head = &ls->ls_rsbtbl[i].list;
652                 while (!list_empty(head)) {
653                         rsb = list_entry(head->next, struct dlm_rsb,
654                                          res_hashchain);
655
656                         list_del(&rsb->res_hashchain);
657                         free_rsb(rsb);
658                 }
659
660                 head = &ls->ls_rsbtbl[i].toss;
661                 while (!list_empty(head)) {
662                         rsb = list_entry(head->next, struct dlm_rsb,
663                                          res_hashchain);
664                         list_del(&rsb->res_hashchain);
665                         free_rsb(rsb);
666                 }
667         }
668
669         kfree(ls->ls_rsbtbl);
670
671         /*
672          * Free structures on any other lists
673          */
674
675         kfree(ls->ls_recover_args);
676         dlm_clear_free_entries(ls);
677         dlm_clear_members(ls);
678         dlm_clear_members_gone(ls);
679         kfree(ls->ls_node_array);
680         kobject_unregister(&ls->ls_kobj);
681         kfree(ls);
682
683         mutex_lock(&ls_lock);
684         ls_count--;
685         if (!ls_count)
686                 threads_stop();
687         mutex_unlock(&ls_lock);
688
689         module_put(THIS_MODULE);
690         return 0;
691 }
692
693 /*
694  * Called when a system has released all its locks and is not going to use the
695  * lockspace any longer.  We free everything we're managing for this lockspace.
696  * Remaining nodes will go through the recovery process as if we'd died.  The
697  * lockspace must continue to function as usual, participating in recoveries,
698  * until this returns.
699  *
700  * Force has 4 possible values:
701  * 0 - don't destroy locksapce if it has any LKBs
702  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
703  * 2 - destroy lockspace regardless of LKBs
704  * 3 - destroy lockspace as part of a forced shutdown
705  */
706
707 int dlm_release_lockspace(void *lockspace, int force)
708 {
709         struct dlm_ls *ls;
710
711         ls = dlm_find_lockspace_local(lockspace);
712         if (!ls)
713                 return -EINVAL;
714         dlm_put_lockspace(ls);
715         return release_lockspace(ls, force);
716 }
717