[DLM] The core of the DLM for GFS2/CLVM
[linux-3.10.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24
25 #ifdef CONFIG_DLM_DEBUG
26 int dlm_create_debug_file(struct dlm_ls *ls);
27 void dlm_delete_debug_file(struct dlm_ls *ls);
28 #else
29 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
30 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
31 #endif
32
33 static int                      ls_count;
34 static struct semaphore         ls_lock;
35 static struct list_head         lslist;
36 static spinlock_t               lslist_lock;
37 static struct task_struct *     scand_task;
38
39
40 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
41 {
42         ssize_t ret = len;
43         int n = simple_strtol(buf, NULL, 0);
44
45         switch (n) {
46         case 0:
47                 dlm_ls_stop(ls);
48                 break;
49         case 1:
50                 dlm_ls_start(ls);
51                 break;
52         default:
53                 ret = -EINVAL;
54         }
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return sprintf(buf, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 struct dlm_attr {
78         struct attribute attr;
79         ssize_t (*show)(struct dlm_ls *, char *);
80         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
81 };
82
83 static struct dlm_attr dlm_attr_control = {
84         .attr  = {.name = "control", .mode = S_IWUSR},
85         .store = dlm_control_store
86 };
87
88 static struct dlm_attr dlm_attr_event = {
89         .attr  = {.name = "event_done", .mode = S_IWUSR},
90         .store = dlm_event_store
91 };
92
93 static struct dlm_attr dlm_attr_id = {
94         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
95         .show  = dlm_id_show,
96         .store = dlm_id_store
97 };
98
99 static struct attribute *dlm_attrs[] = {
100         &dlm_attr_control.attr,
101         &dlm_attr_event.attr,
102         &dlm_attr_id.attr,
103         NULL,
104 };
105
106 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
107                              char *buf)
108 {
109         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
110         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
111         return a->show ? a->show(ls, buf) : 0;
112 }
113
114 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
115                               const char *buf, size_t len)
116 {
117         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
118         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
119         return a->store ? a->store(ls, buf, len) : len;
120 }
121
122 static struct sysfs_ops dlm_attr_ops = {
123         .show  = dlm_attr_show,
124         .store = dlm_attr_store,
125 };
126
127 static struct kobj_type dlm_ktype = {
128         .default_attrs = dlm_attrs,
129         .sysfs_ops     = &dlm_attr_ops,
130 };
131
132 static struct kset dlm_kset = {
133         .subsys = &kernel_subsys,
134         .kobj   = {.name = "dlm",},
135         .ktype  = &dlm_ktype,
136 };
137
138 static int kobject_setup(struct dlm_ls *ls)
139 {
140         char lsname[DLM_LOCKSPACE_LEN];
141         int error;
142
143         memset(lsname, 0, DLM_LOCKSPACE_LEN);
144         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
145
146         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
147         if (error)
148                 return error;
149
150         ls->ls_kobj.kset = &dlm_kset;
151         ls->ls_kobj.ktype = &dlm_ktype;
152         return 0;
153 }
154
155 static int do_uevent(struct dlm_ls *ls, int in)
156 {
157         int error;
158
159         if (in)
160                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
161         else
162                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
163
164         error = wait_event_interruptible(ls->ls_uevent_wait,
165                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
166         if (error)
167                 goto out;
168
169         error = ls->ls_uevent_result;
170  out:
171         return error;
172 }
173
174
175 int dlm_lockspace_init(void)
176 {
177         int error;
178
179         ls_count = 0;
180         init_MUTEX(&ls_lock);
181         INIT_LIST_HEAD(&lslist);
182         spin_lock_init(&lslist_lock);
183
184         error = kset_register(&dlm_kset);
185         if (error)
186                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
187         return error;
188 }
189
190 void dlm_lockspace_exit(void)
191 {
192         kset_unregister(&dlm_kset);
193 }
194
195 static int dlm_scand(void *data)
196 {
197         struct dlm_ls *ls;
198
199         while (!kthread_should_stop()) {
200                 list_for_each_entry(ls, &lslist, ls_list)
201                         dlm_scan_rsbs(ls);
202                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
203         }
204         return 0;
205 }
206
207 static int dlm_scand_start(void)
208 {
209         struct task_struct *p;
210         int error = 0;
211
212         p = kthread_run(dlm_scand, NULL, "dlm_scand");
213         if (IS_ERR(p))
214                 error = PTR_ERR(p);
215         else
216                 scand_task = p;
217         return error;
218 }
219
220 static void dlm_scand_stop(void)
221 {
222         kthread_stop(scand_task);
223 }
224
225 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
226 {
227         struct dlm_ls *ls;
228
229         spin_lock(&lslist_lock);
230
231         list_for_each_entry(ls, &lslist, ls_list) {
232                 if (ls->ls_namelen == namelen &&
233                     memcmp(ls->ls_name, name, namelen) == 0)
234                         goto out;
235         }
236         ls = NULL;
237  out:
238         spin_unlock(&lslist_lock);
239         return ls;
240 }
241
242 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
243 {
244         struct dlm_ls *ls;
245
246         spin_lock(&lslist_lock);
247
248         list_for_each_entry(ls, &lslist, ls_list) {
249                 if (ls->ls_global_id == id) {
250                         ls->ls_count++;
251                         goto out;
252                 }
253         }
254         ls = NULL;
255  out:
256         spin_unlock(&lslist_lock);
257         return ls;
258 }
259
260 struct dlm_ls *dlm_find_lockspace_local(void *id)
261 {
262         struct dlm_ls *ls = id;
263
264         spin_lock(&lslist_lock);
265         ls->ls_count++;
266         spin_unlock(&lslist_lock);
267         return ls;
268 }
269
270 void dlm_put_lockspace(struct dlm_ls *ls)
271 {
272         spin_lock(&lslist_lock);
273         ls->ls_count--;
274         spin_unlock(&lslist_lock);
275 }
276
277 static void remove_lockspace(struct dlm_ls *ls)
278 {
279         for (;;) {
280                 spin_lock(&lslist_lock);
281                 if (ls->ls_count == 0) {
282                         list_del(&ls->ls_list);
283                         spin_unlock(&lslist_lock);
284                         return;
285                 }
286                 spin_unlock(&lslist_lock);
287                 ssleep(1);
288         }
289 }
290
291 static int threads_start(void)
292 {
293         int error;
294
295         /* Thread which process lock requests for all lockspace's */
296         error = dlm_astd_start();
297         if (error) {
298                 log_print("cannot start dlm_astd thread %d", error);
299                 goto fail;
300         }
301
302         error = dlm_scand_start();
303         if (error) {
304                 log_print("cannot start dlm_scand thread %d", error);
305                 goto astd_fail;
306         }
307
308         /* Thread for sending/receiving messages for all lockspace's */
309         error = dlm_lowcomms_start();
310         if (error) {
311                 log_print("cannot start dlm lowcomms %d", error);
312                 goto scand_fail;
313         }
314
315         return 0;
316
317  scand_fail:
318         dlm_scand_stop();
319  astd_fail:
320         dlm_astd_stop();
321  fail:
322         return error;
323 }
324
325 static void threads_stop(void)
326 {
327         dlm_scand_stop();
328         dlm_lowcomms_stop();
329         dlm_astd_stop();
330 }
331
332 static int new_lockspace(char *name, int namelen, void **lockspace,
333                          uint32_t flags, int lvblen)
334 {
335         struct dlm_ls *ls;
336         int i, size, error = -ENOMEM;
337
338         if (namelen > DLM_LOCKSPACE_LEN)
339                 return -EINVAL;
340
341         if (!lvblen || (lvblen % 8))
342                 return -EINVAL;
343
344         if (!try_module_get(THIS_MODULE))
345                 return -EINVAL;
346
347         ls = dlm_find_lockspace_name(name, namelen);
348         if (ls) {
349                 *lockspace = ls;
350                 module_put(THIS_MODULE);
351                 return -EEXIST;
352         }
353
354         ls = kmalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
355         if (!ls)
356                 goto out;
357         memset(ls, 0, sizeof(struct dlm_ls) + namelen);
358         memcpy(ls->ls_name, name, namelen);
359         ls->ls_namelen = namelen;
360         ls->ls_exflags = flags;
361         ls->ls_lvblen = lvblen;
362         ls->ls_count = 0;
363         ls->ls_flags = 0;
364
365         size = dlm_config.rsbtbl_size;
366         ls->ls_rsbtbl_size = size;
367
368         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
369         if (!ls->ls_rsbtbl)
370                 goto out_lsfree;
371         for (i = 0; i < size; i++) {
372                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
373                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
374                 rwlock_init(&ls->ls_rsbtbl[i].lock);
375         }
376
377         size = dlm_config.lkbtbl_size;
378         ls->ls_lkbtbl_size = size;
379
380         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
381         if (!ls->ls_lkbtbl)
382                 goto out_rsbfree;
383         for (i = 0; i < size; i++) {
384                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
385                 rwlock_init(&ls->ls_lkbtbl[i].lock);
386                 ls->ls_lkbtbl[i].counter = 1;
387         }
388
389         size = dlm_config.dirtbl_size;
390         ls->ls_dirtbl_size = size;
391
392         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
393         if (!ls->ls_dirtbl)
394                 goto out_lkbfree;
395         for (i = 0; i < size; i++) {
396                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
397                 rwlock_init(&ls->ls_dirtbl[i].lock);
398         }
399
400         INIT_LIST_HEAD(&ls->ls_waiters);
401         init_MUTEX(&ls->ls_waiters_sem);
402
403         INIT_LIST_HEAD(&ls->ls_nodes);
404         INIT_LIST_HEAD(&ls->ls_nodes_gone);
405         ls->ls_num_nodes = 0;
406         ls->ls_low_nodeid = 0;
407         ls->ls_total_weight = 0;
408         ls->ls_node_array = NULL;
409
410         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
411         ls->ls_stub_rsb.res_ls = ls;
412
413         ls->ls_debug_dentry = NULL;
414
415         init_waitqueue_head(&ls->ls_uevent_wait);
416         ls->ls_uevent_result = 0;
417
418         ls->ls_recoverd_task = NULL;
419         init_MUTEX(&ls->ls_recoverd_active);
420         spin_lock_init(&ls->ls_recover_lock);
421         ls->ls_recover_status = 0;
422         ls->ls_recover_seq = 0;
423         ls->ls_recover_args = NULL;
424         init_rwsem(&ls->ls_in_recovery);
425         INIT_LIST_HEAD(&ls->ls_requestqueue);
426         init_MUTEX(&ls->ls_requestqueue_lock);
427
428         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
429         if (!ls->ls_recover_buf)
430                 goto out_dirfree;
431
432         INIT_LIST_HEAD(&ls->ls_recover_list);
433         spin_lock_init(&ls->ls_recover_list_lock);
434         ls->ls_recover_list_count = 0;
435         init_waitqueue_head(&ls->ls_wait_general);
436         INIT_LIST_HEAD(&ls->ls_root_list);
437         init_rwsem(&ls->ls_root_sem);
438
439         down_write(&ls->ls_in_recovery);
440
441         error = dlm_recoverd_start(ls);
442         if (error) {
443                 log_error(ls, "can't start dlm_recoverd %d", error);
444                 goto out_rcomfree;
445         }
446
447         spin_lock(&lslist_lock);
448         list_add(&ls->ls_list, &lslist);
449         spin_unlock(&lslist_lock);
450
451         dlm_create_debug_file(ls);
452
453         error = kobject_setup(ls);
454         if (error)
455                 goto out_del;
456
457         error = kobject_register(&ls->ls_kobj);
458         if (error)
459                 goto out_del;
460
461         error = do_uevent(ls, 1);
462         if (error)
463                 goto out_unreg;
464
465         *lockspace = ls;
466         return 0;
467
468  out_unreg:
469         kobject_unregister(&ls->ls_kobj);
470  out_del:
471         dlm_delete_debug_file(ls);
472         spin_lock(&lslist_lock);
473         list_del(&ls->ls_list);
474         spin_unlock(&lslist_lock);
475         dlm_recoverd_stop(ls);
476  out_rcomfree:
477         kfree(ls->ls_recover_buf);
478  out_dirfree:
479         kfree(ls->ls_dirtbl);
480  out_lkbfree:
481         kfree(ls->ls_lkbtbl);
482  out_rsbfree:
483         kfree(ls->ls_rsbtbl);
484  out_lsfree:
485         kfree(ls);
486  out:
487         module_put(THIS_MODULE);
488         return error;
489 }
490
491 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
492                       uint32_t flags, int lvblen)
493 {
494         int error = 0;
495
496         down(&ls_lock);
497         if (!ls_count)
498                 error = threads_start();
499         if (error)
500                 goto out;
501
502         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
503         if (!error)
504                 ls_count++;
505  out:
506         up(&ls_lock);
507         return error;
508 }
509
510 /* Return 1 if the lockspace still has active remote locks,
511  *        2 if the lockspace still has active local locks.
512  */
513 static int lockspace_busy(struct dlm_ls *ls)
514 {
515         int i, lkb_found = 0;
516         struct dlm_lkb *lkb;
517
518         /* NOTE: We check the lockidtbl here rather than the resource table.
519            This is because there may be LKBs queued as ASTs that have been
520            unlinked from their RSBs and are pending deletion once the AST has
521            been delivered */
522
523         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
524                 read_lock(&ls->ls_lkbtbl[i].lock);
525                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
526                         lkb_found = 1;
527                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
528                                             lkb_idtbl_list) {
529                                 if (!lkb->lkb_nodeid) {
530                                         read_unlock(&ls->ls_lkbtbl[i].lock);
531                                         return 2;
532                                 }
533                         }
534                 }
535                 read_unlock(&ls->ls_lkbtbl[i].lock);
536         }
537         return lkb_found;
538 }
539
540 static int release_lockspace(struct dlm_ls *ls, int force)
541 {
542         struct dlm_lkb *lkb;
543         struct dlm_rsb *rsb;
544         struct list_head *head;
545         int i;
546         int busy = lockspace_busy(ls);
547
548         if (busy > force)
549                 return -EBUSY;
550
551         if (force < 3)
552                 do_uevent(ls, 0);
553
554         dlm_recoverd_stop(ls);
555
556         remove_lockspace(ls);
557
558         dlm_delete_debug_file(ls);
559
560         dlm_astd_suspend();
561
562         kfree(ls->ls_recover_buf);
563
564         /*
565          * Free direntry structs.
566          */
567
568         dlm_dir_clear(ls);
569         kfree(ls->ls_dirtbl);
570
571         /*
572          * Free all lkb's on lkbtbl[] lists.
573          */
574
575         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
576                 head = &ls->ls_lkbtbl[i].list;
577                 while (!list_empty(head)) {
578                         lkb = list_entry(head->next, struct dlm_lkb,
579                                          lkb_idtbl_list);
580
581                         list_del(&lkb->lkb_idtbl_list);
582
583                         dlm_del_ast(lkb);
584
585                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
586                                 free_lvb(lkb->lkb_lvbptr);
587
588                         free_lkb(lkb);
589                 }
590         }
591         dlm_astd_resume();
592
593         kfree(ls->ls_lkbtbl);
594
595         /*
596          * Free all rsb's on rsbtbl[] lists
597          */
598
599         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
600                 head = &ls->ls_rsbtbl[i].list;
601                 while (!list_empty(head)) {
602                         rsb = list_entry(head->next, struct dlm_rsb,
603                                          res_hashchain);
604
605                         list_del(&rsb->res_hashchain);
606                         free_rsb(rsb);
607                 }
608
609                 head = &ls->ls_rsbtbl[i].toss;
610                 while (!list_empty(head)) {
611                         rsb = list_entry(head->next, struct dlm_rsb,
612                                          res_hashchain);
613                         list_del(&rsb->res_hashchain);
614                         free_rsb(rsb);
615                 }
616         }
617
618         kfree(ls->ls_rsbtbl);
619
620         /*
621          * Free structures on any other lists
622          */
623
624         kfree(ls->ls_recover_args);
625         dlm_clear_free_entries(ls);
626         dlm_clear_members(ls);
627         dlm_clear_members_gone(ls);
628         kfree(ls->ls_node_array);
629         kobject_unregister(&ls->ls_kobj);
630         kfree(ls);
631
632         down(&ls_lock);
633         ls_count--;
634         if (!ls_count)
635                 threads_stop();
636         up(&ls_lock);
637
638         module_put(THIS_MODULE);
639         return 0;
640 }
641
642 /*
643  * Called when a system has released all its locks and is not going to use the
644  * lockspace any longer.  We free everything we're managing for this lockspace.
645  * Remaining nodes will go through the recovery process as if we'd died.  The
646  * lockspace must continue to function as usual, participating in recoveries,
647  * until this returns.
648  *
649  * Force has 4 possible values:
650  * 0 - don't destroy locksapce if it has any LKBs
651  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
652  * 2 - destroy lockspace regardless of LKBs
653  * 3 - destroy lockspace as part of a forced shutdown
654  */
655
656 int dlm_release_lockspace(void *lockspace, int force)
657 {
658         struct dlm_ls *ls;
659
660         ls = dlm_find_lockspace_local(lockspace);
661         if (!ls)
662                 return -EINVAL;
663         dlm_put_lockspace(ls);
664         return release_lockspace(ls, force);
665 }
666