414a108df934870c63b50abc2be273bb332e917f
[linux-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         error = wait_event_interruptible(ls->ls_uevent_wait,
201                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
202         if (error)
203                 goto out;
204
205         error = ls->ls_uevent_result;
206  out:
207         return error;
208 }
209
210
211 int dlm_lockspace_init(void)
212 {
213         int error;
214
215         ls_count = 0;
216         mutex_init(&ls_lock);
217         INIT_LIST_HEAD(&lslist);
218         spin_lock_init(&lslist_lock);
219
220         kobj_set_kset_s(&dlm_kset, kernel_subsys);
221         error = kset_register(&dlm_kset);
222         if (error)
223                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
224         return error;
225 }
226
227 void dlm_lockspace_exit(void)
228 {
229         kset_unregister(&dlm_kset);
230 }
231
232 static int dlm_scand(void *data)
233 {
234         struct dlm_ls *ls;
235
236         while (!kthread_should_stop()) {
237                 list_for_each_entry(ls, &lslist, ls_list) {
238                         if (dlm_lock_recovery_try(ls)) {
239                                 dlm_scan_rsbs(ls);
240                                 dlm_unlock_recovery(ls);
241                         }
242                 }
243                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
244         }
245         return 0;
246 }
247
248 static int dlm_scand_start(void)
249 {
250         struct task_struct *p;
251         int error = 0;
252
253         p = kthread_run(dlm_scand, NULL, "dlm_scand");
254         if (IS_ERR(p))
255                 error = PTR_ERR(p);
256         else
257                 scand_task = p;
258         return error;
259 }
260
261 static void dlm_scand_stop(void)
262 {
263         kthread_stop(scand_task);
264 }
265
266 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
267 {
268         struct dlm_ls *ls;
269
270         spin_lock(&lslist_lock);
271
272         list_for_each_entry(ls, &lslist, ls_list) {
273                 if (ls->ls_namelen == namelen &&
274                     memcmp(ls->ls_name, name, namelen) == 0)
275                         goto out;
276         }
277         ls = NULL;
278  out:
279         spin_unlock(&lslist_lock);
280         return ls;
281 }
282
283 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
284 {
285         struct dlm_ls *ls;
286
287         spin_lock(&lslist_lock);
288
289         list_for_each_entry(ls, &lslist, ls_list) {
290                 if (ls->ls_global_id == id) {
291                         ls->ls_count++;
292                         goto out;
293                 }
294         }
295         ls = NULL;
296  out:
297         spin_unlock(&lslist_lock);
298         return ls;
299 }
300
301 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
302 {
303         struct dlm_ls *ls;
304
305         spin_lock(&lslist_lock);
306         list_for_each_entry(ls, &lslist, ls_list) {
307                 if (ls->ls_local_handle == lockspace) {
308                         ls->ls_count++;
309                         goto out;
310                 }
311         }
312         ls = NULL;
313  out:
314         spin_unlock(&lslist_lock);
315         return ls;
316 }
317
318 struct dlm_ls *dlm_find_lockspace_device(int minor)
319 {
320         struct dlm_ls *ls;
321
322         spin_lock(&lslist_lock);
323         list_for_each_entry(ls, &lslist, ls_list) {
324                 if (ls->ls_device.minor == minor) {
325                         ls->ls_count++;
326                         goto out;
327                 }
328         }
329         ls = NULL;
330  out:
331         spin_unlock(&lslist_lock);
332         return ls;
333 }
334
335 void dlm_put_lockspace(struct dlm_ls *ls)
336 {
337         spin_lock(&lslist_lock);
338         ls->ls_count--;
339         spin_unlock(&lslist_lock);
340 }
341
342 static void remove_lockspace(struct dlm_ls *ls)
343 {
344         for (;;) {
345                 spin_lock(&lslist_lock);
346                 if (ls->ls_count == 0) {
347                         list_del(&ls->ls_list);
348                         spin_unlock(&lslist_lock);
349                         return;
350                 }
351                 spin_unlock(&lslist_lock);
352                 ssleep(1);
353         }
354 }
355
356 static int threads_start(void)
357 {
358         int error;
359
360         /* Thread which process lock requests for all lockspace's */
361         error = dlm_astd_start();
362         if (error) {
363                 log_print("cannot start dlm_astd thread %d", error);
364                 goto fail;
365         }
366
367         error = dlm_scand_start();
368         if (error) {
369                 log_print("cannot start dlm_scand thread %d", error);
370                 goto astd_fail;
371         }
372
373         /* Thread for sending/receiving messages for all lockspace's */
374         error = dlm_lowcomms_start();
375         if (error) {
376                 log_print("cannot start dlm lowcomms %d", error);
377                 goto scand_fail;
378         }
379
380         return 0;
381
382  scand_fail:
383         dlm_scand_stop();
384  astd_fail:
385         dlm_astd_stop();
386  fail:
387         return error;
388 }
389
390 static void threads_stop(void)
391 {
392         dlm_scand_stop();
393         dlm_lowcomms_stop();
394         dlm_astd_stop();
395 }
396
397 static int new_lockspace(char *name, int namelen, void **lockspace,
398                          uint32_t flags, int lvblen)
399 {
400         struct dlm_ls *ls;
401         int i, size, error = -ENOMEM;
402
403         if (namelen > DLM_LOCKSPACE_LEN)
404                 return -EINVAL;
405
406         if (!lvblen || (lvblen % 8))
407                 return -EINVAL;
408
409         if (!try_module_get(THIS_MODULE))
410                 return -EINVAL;
411
412         ls = dlm_find_lockspace_name(name, namelen);
413         if (ls) {
414                 *lockspace = ls;
415                 module_put(THIS_MODULE);
416                 return -EEXIST;
417         }
418
419         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
420         if (!ls)
421                 goto out;
422         memcpy(ls->ls_name, name, namelen);
423         ls->ls_namelen = namelen;
424         ls->ls_exflags = flags;
425         ls->ls_lvblen = lvblen;
426         ls->ls_count = 0;
427         ls->ls_flags = 0;
428
429         size = dlm_config.ci_rsbtbl_size;
430         ls->ls_rsbtbl_size = size;
431
432         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
433         if (!ls->ls_rsbtbl)
434                 goto out_lsfree;
435         for (i = 0; i < size; i++) {
436                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
437                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
438                 rwlock_init(&ls->ls_rsbtbl[i].lock);
439         }
440
441         size = dlm_config.ci_lkbtbl_size;
442         ls->ls_lkbtbl_size = size;
443
444         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
445         if (!ls->ls_lkbtbl)
446                 goto out_rsbfree;
447         for (i = 0; i < size; i++) {
448                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
449                 rwlock_init(&ls->ls_lkbtbl[i].lock);
450                 ls->ls_lkbtbl[i].counter = 1;
451         }
452
453         size = dlm_config.ci_dirtbl_size;
454         ls->ls_dirtbl_size = size;
455
456         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
457         if (!ls->ls_dirtbl)
458                 goto out_lkbfree;
459         for (i = 0; i < size; i++) {
460                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
461                 rwlock_init(&ls->ls_dirtbl[i].lock);
462         }
463
464         INIT_LIST_HEAD(&ls->ls_waiters);
465         mutex_init(&ls->ls_waiters_mutex);
466         INIT_LIST_HEAD(&ls->ls_orphans);
467         mutex_init(&ls->ls_orphans_mutex);
468
469         INIT_LIST_HEAD(&ls->ls_nodes);
470         INIT_LIST_HEAD(&ls->ls_nodes_gone);
471         ls->ls_num_nodes = 0;
472         ls->ls_low_nodeid = 0;
473         ls->ls_total_weight = 0;
474         ls->ls_node_array = NULL;
475
476         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
477         ls->ls_stub_rsb.res_ls = ls;
478
479         ls->ls_debug_rsb_dentry = NULL;
480         ls->ls_debug_waiters_dentry = NULL;
481
482         init_waitqueue_head(&ls->ls_uevent_wait);
483         ls->ls_uevent_result = 0;
484
485         ls->ls_recoverd_task = NULL;
486         mutex_init(&ls->ls_recoverd_active);
487         spin_lock_init(&ls->ls_recover_lock);
488         spin_lock_init(&ls->ls_rcom_spin);
489         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
490         ls->ls_recover_status = 0;
491         ls->ls_recover_seq = 0;
492         ls->ls_recover_args = NULL;
493         init_rwsem(&ls->ls_in_recovery);
494         INIT_LIST_HEAD(&ls->ls_requestqueue);
495         mutex_init(&ls->ls_requestqueue_mutex);
496         mutex_init(&ls->ls_clear_proc_locks);
497
498         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
499         if (!ls->ls_recover_buf)
500                 goto out_dirfree;
501
502         INIT_LIST_HEAD(&ls->ls_recover_list);
503         spin_lock_init(&ls->ls_recover_list_lock);
504         ls->ls_recover_list_count = 0;
505         ls->ls_local_handle = ls;
506         init_waitqueue_head(&ls->ls_wait_general);
507         INIT_LIST_HEAD(&ls->ls_root_list);
508         init_rwsem(&ls->ls_root_sem);
509
510         down_write(&ls->ls_in_recovery);
511
512         spin_lock(&lslist_lock);
513         list_add(&ls->ls_list, &lslist);
514         spin_unlock(&lslist_lock);
515
516         /* needs to find ls in lslist */
517         error = dlm_recoverd_start(ls);
518         if (error) {
519                 log_error(ls, "can't start dlm_recoverd %d", error);
520                 goto out_rcomfree;
521         }
522
523         dlm_create_debug_file(ls);
524
525         error = kobject_setup(ls);
526         if (error)
527                 goto out_del;
528
529         error = kobject_register(&ls->ls_kobj);
530         if (error)
531                 goto out_del;
532
533         error = do_uevent(ls, 1);
534         if (error)
535                 goto out_unreg;
536
537         *lockspace = ls;
538         return 0;
539
540  out_unreg:
541         kobject_unregister(&ls->ls_kobj);
542  out_del:
543         dlm_delete_debug_file(ls);
544         dlm_recoverd_stop(ls);
545  out_rcomfree:
546         spin_lock(&lslist_lock);
547         list_del(&ls->ls_list);
548         spin_unlock(&lslist_lock);
549         kfree(ls->ls_recover_buf);
550  out_dirfree:
551         kfree(ls->ls_dirtbl);
552  out_lkbfree:
553         kfree(ls->ls_lkbtbl);
554  out_rsbfree:
555         kfree(ls->ls_rsbtbl);
556  out_lsfree:
557         kfree(ls);
558  out:
559         module_put(THIS_MODULE);
560         return error;
561 }
562
563 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
564                       uint32_t flags, int lvblen)
565 {
566         int error = 0;
567
568         mutex_lock(&ls_lock);
569         if (!ls_count)
570                 error = threads_start();
571         if (error)
572                 goto out;
573
574         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
575         if (!error)
576                 ls_count++;
577  out:
578         mutex_unlock(&ls_lock);
579         return error;
580 }
581
582 /* Return 1 if the lockspace still has active remote locks,
583  *        2 if the lockspace still has active local locks.
584  */
585 static int lockspace_busy(struct dlm_ls *ls)
586 {
587         int i, lkb_found = 0;
588         struct dlm_lkb *lkb;
589
590         /* NOTE: We check the lockidtbl here rather than the resource table.
591            This is because there may be LKBs queued as ASTs that have been
592            unlinked from their RSBs and are pending deletion once the AST has
593            been delivered */
594
595         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
596                 read_lock(&ls->ls_lkbtbl[i].lock);
597                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
598                         lkb_found = 1;
599                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
600                                             lkb_idtbl_list) {
601                                 if (!lkb->lkb_nodeid) {
602                                         read_unlock(&ls->ls_lkbtbl[i].lock);
603                                         return 2;
604                                 }
605                         }
606                 }
607                 read_unlock(&ls->ls_lkbtbl[i].lock);
608         }
609         return lkb_found;
610 }
611
612 static int release_lockspace(struct dlm_ls *ls, int force)
613 {
614         struct dlm_lkb *lkb;
615         struct dlm_rsb *rsb;
616         struct list_head *head;
617         int i;
618         int busy = lockspace_busy(ls);
619
620         if (busy > force)
621                 return -EBUSY;
622
623         if (force < 3)
624                 do_uevent(ls, 0);
625
626         dlm_recoverd_stop(ls);
627
628         remove_lockspace(ls);
629
630         dlm_delete_debug_file(ls);
631
632         dlm_astd_suspend();
633
634         kfree(ls->ls_recover_buf);
635
636         /*
637          * Free direntry structs.
638          */
639
640         dlm_dir_clear(ls);
641         kfree(ls->ls_dirtbl);
642
643         /*
644          * Free all lkb's on lkbtbl[] lists.
645          */
646
647         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
648                 head = &ls->ls_lkbtbl[i].list;
649                 while (!list_empty(head)) {
650                         lkb = list_entry(head->next, struct dlm_lkb,
651                                          lkb_idtbl_list);
652
653                         list_del(&lkb->lkb_idtbl_list);
654
655                         dlm_del_ast(lkb);
656
657                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
658                                 free_lvb(lkb->lkb_lvbptr);
659
660                         free_lkb(lkb);
661                 }
662         }
663         dlm_astd_resume();
664
665         kfree(ls->ls_lkbtbl);
666
667         /*
668          * Free all rsb's on rsbtbl[] lists
669          */
670
671         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
672                 head = &ls->ls_rsbtbl[i].list;
673                 while (!list_empty(head)) {
674                         rsb = list_entry(head->next, struct dlm_rsb,
675                                          res_hashchain);
676
677                         list_del(&rsb->res_hashchain);
678                         free_rsb(rsb);
679                 }
680
681                 head = &ls->ls_rsbtbl[i].toss;
682                 while (!list_empty(head)) {
683                         rsb = list_entry(head->next, struct dlm_rsb,
684                                          res_hashchain);
685                         list_del(&rsb->res_hashchain);
686                         free_rsb(rsb);
687                 }
688         }
689
690         kfree(ls->ls_rsbtbl);
691
692         /*
693          * Free structures on any other lists
694          */
695
696         dlm_purge_requestqueue(ls);
697         kfree(ls->ls_recover_args);
698         dlm_clear_free_entries(ls);
699         dlm_clear_members(ls);
700         dlm_clear_members_gone(ls);
701         kfree(ls->ls_node_array);
702         kobject_unregister(&ls->ls_kobj);
703         /* The ls structure will be freed when the kobject is done with */
704
705         mutex_lock(&ls_lock);
706         ls_count--;
707         if (!ls_count)
708                 threads_stop();
709         mutex_unlock(&ls_lock);
710
711         module_put(THIS_MODULE);
712         return 0;
713 }
714
715 /*
716  * Called when a system has released all its locks and is not going to use the
717  * lockspace any longer.  We free everything we're managing for this lockspace.
718  * Remaining nodes will go through the recovery process as if we'd died.  The
719  * lockspace must continue to function as usual, participating in recoveries,
720  * until this returns.
721  *
722  * Force has 4 possible values:
723  * 0 - don't destroy locksapce if it has any LKBs
724  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
725  * 2 - destroy lockspace regardless of LKBs
726  * 3 - destroy lockspace as part of a forced shutdown
727  */
728
729 int dlm_release_lockspace(void *lockspace, int force)
730 {
731         struct dlm_ls *ls;
732
733         ls = dlm_find_lockspace_local(lockspace);
734         if (!ls)
735                 return -EINVAL;
736         dlm_put_lockspace(ls);
737         return release_lockspace(ls, force);
738 }
739