Merge branch 'core-rcu-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static const struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195                       struct kobj_uevent_env *env)
196 {
197         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198
199         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200         return 0;
201 }
202
203 static struct kset_uevent_ops dlm_uevent_ops = {
204         .uevent = dlm_uevent,
205 };
206
207 int __init dlm_lockspace_init(void)
208 {
209         ls_count = 0;
210         mutex_init(&ls_lock);
211         INIT_LIST_HEAD(&lslist);
212         spin_lock_init(&lslist_lock);
213
214         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215         if (!dlm_kset) {
216                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
217                 return -ENOMEM;
218         }
219         return 0;
220 }
221
222 void dlm_lockspace_exit(void)
223 {
224         kset_unregister(dlm_kset);
225 }
226
227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229         struct dlm_ls *ls;
230
231         spin_lock(&lslist_lock);
232         list_for_each_entry(ls, &lslist, ls_list) {
233                 if (time_after_eq(jiffies, ls->ls_scan_time +
234                                             dlm_config.ci_scan_secs * HZ)) {
235                         spin_unlock(&lslist_lock);
236                         return ls;
237                 }
238         }
239         spin_unlock(&lslist_lock);
240         return NULL;
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 ls = find_ls_to_scan();
249                 if (ls) {
250                         if (dlm_lock_recovery_try(ls)) {
251                                 ls->ls_scan_time = jiffies;
252                                 dlm_scan_rsbs(ls);
253                                 dlm_scan_timeout(ls);
254                                 dlm_scan_waiters(ls);
255                                 dlm_unlock_recovery(ls);
256                         } else {
257                                 ls->ls_scan_time += HZ;
258                         }
259                         continue;
260                 }
261                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262         }
263         return 0;
264 }
265
266 static int dlm_scand_start(void)
267 {
268         struct task_struct *p;
269         int error = 0;
270
271         p = kthread_run(dlm_scand, NULL, "dlm_scand");
272         if (IS_ERR(p))
273                 error = PTR_ERR(p);
274         else
275                 scand_task = p;
276         return error;
277 }
278
279 static void dlm_scand_stop(void)
280 {
281         kthread_stop(scand_task);
282 }
283
284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289
290         list_for_each_entry(ls, &lslist, ls_list) {
291                 if (ls->ls_global_id == id) {
292                         ls->ls_count++;
293                         goto out;
294                 }
295         }
296         ls = NULL;
297  out:
298         spin_unlock(&lslist_lock);
299         return ls;
300 }
301
302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304         struct dlm_ls *ls;
305
306         spin_lock(&lslist_lock);
307         list_for_each_entry(ls, &lslist, ls_list) {
308                 if (ls->ls_local_handle == lockspace) {
309                         ls->ls_count++;
310                         goto out;
311                 }
312         }
313         ls = NULL;
314  out:
315         spin_unlock(&lslist_lock);
316         return ls;
317 }
318
319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324         list_for_each_entry(ls, &lslist, ls_list) {
325                 if (ls->ls_device.minor == minor) {
326                         ls->ls_count++;
327                         goto out;
328                 }
329         }
330         ls = NULL;
331  out:
332         spin_unlock(&lslist_lock);
333         return ls;
334 }
335
336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338         spin_lock(&lslist_lock);
339         ls->ls_count--;
340         spin_unlock(&lslist_lock);
341 }
342
343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345         for (;;) {
346                 spin_lock(&lslist_lock);
347                 if (ls->ls_count == 0) {
348                         WARN_ON(ls->ls_create_count != 0);
349                         list_del(&ls->ls_list);
350                         spin_unlock(&lslist_lock);
351                         return;
352                 }
353                 spin_unlock(&lslist_lock);
354                 ssleep(1);
355         }
356 }
357
358 static int threads_start(void)
359 {
360         int error;
361
362         error = dlm_scand_start();
363         if (error) {
364                 log_print("cannot start dlm_scand thread %d", error);
365                 goto fail;
366         }
367
368         /* Thread for sending/receiving messages for all lockspace's */
369         error = dlm_lowcomms_start();
370         if (error) {
371                 log_print("cannot start dlm lowcomms %d", error);
372                 goto scand_fail;
373         }
374
375         return 0;
376
377  scand_fail:
378         dlm_scand_stop();
379  fail:
380         return error;
381 }
382
383 static void threads_stop(void)
384 {
385         dlm_scand_stop();
386         dlm_lowcomms_stop();
387 }
388
389 static int new_lockspace(const char *name, int namelen, void **lockspace,
390                          uint32_t flags, int lvblen)
391 {
392         struct dlm_ls *ls;
393         int i, size, error;
394         int do_unreg = 0;
395
396         if (namelen > DLM_LOCKSPACE_LEN)
397                 return -EINVAL;
398
399         if (!lvblen || (lvblen % 8))
400                 return -EINVAL;
401
402         if (!try_module_get(THIS_MODULE))
403                 return -EINVAL;
404
405         if (!dlm_user_daemon_available()) {
406                 module_put(THIS_MODULE);
407                 return -EUNATCH;
408         }
409
410         error = 0;
411
412         spin_lock(&lslist_lock);
413         list_for_each_entry(ls, &lslist, ls_list) {
414                 WARN_ON(ls->ls_create_count <= 0);
415                 if (ls->ls_namelen != namelen)
416                         continue;
417                 if (memcmp(ls->ls_name, name, namelen))
418                         continue;
419                 if (flags & DLM_LSFL_NEWEXCL) {
420                         error = -EEXIST;
421                         break;
422                 }
423                 ls->ls_create_count++;
424                 *lockspace = ls;
425                 error = 1;
426                 break;
427         }
428         spin_unlock(&lslist_lock);
429
430         if (error)
431                 goto out;
432
433         error = -ENOMEM;
434
435         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
436         if (!ls)
437                 goto out;
438         memcpy(ls->ls_name, name, namelen);
439         ls->ls_namelen = namelen;
440         ls->ls_lvblen = lvblen;
441         ls->ls_count = 0;
442         ls->ls_flags = 0;
443         ls->ls_scan_time = jiffies;
444
445         if (flags & DLM_LSFL_TIMEWARN)
446                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447
448         /* ls_exflags are forced to match among nodes, and we don't
449            need to require all nodes to have some flags set */
450         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
451                                     DLM_LSFL_NEWEXCL));
452
453         size = dlm_config.ci_rsbtbl_size;
454         ls->ls_rsbtbl_size = size;
455
456         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
457         if (!ls->ls_rsbtbl)
458                 goto out_lsfree;
459         for (i = 0; i < size; i++) {
460                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
463         }
464
465         idr_init(&ls->ls_lkbidr);
466         spin_lock_init(&ls->ls_lkbidr_spin);
467
468         size = dlm_config.ci_dirtbl_size;
469         ls->ls_dirtbl_size = size;
470
471         ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
472         if (!ls->ls_dirtbl)
473                 goto out_lkbfree;
474         for (i = 0; i < size; i++) {
475                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
476                 spin_lock_init(&ls->ls_dirtbl[i].lock);
477         }
478
479         INIT_LIST_HEAD(&ls->ls_waiters);
480         mutex_init(&ls->ls_waiters_mutex);
481         INIT_LIST_HEAD(&ls->ls_orphans);
482         mutex_init(&ls->ls_orphans_mutex);
483         INIT_LIST_HEAD(&ls->ls_timeout);
484         mutex_init(&ls->ls_timeout_mutex);
485
486         INIT_LIST_HEAD(&ls->ls_new_rsb);
487         spin_lock_init(&ls->ls_new_rsb_spin);
488
489         INIT_LIST_HEAD(&ls->ls_nodes);
490         INIT_LIST_HEAD(&ls->ls_nodes_gone);
491         ls->ls_num_nodes = 0;
492         ls->ls_low_nodeid = 0;
493         ls->ls_total_weight = 0;
494         ls->ls_node_array = NULL;
495
496         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
497         ls->ls_stub_rsb.res_ls = ls;
498
499         ls->ls_debug_rsb_dentry = NULL;
500         ls->ls_debug_waiters_dentry = NULL;
501
502         init_waitqueue_head(&ls->ls_uevent_wait);
503         ls->ls_uevent_result = 0;
504         init_completion(&ls->ls_members_done);
505         ls->ls_members_result = -1;
506
507         mutex_init(&ls->ls_cb_mutex);
508         INIT_LIST_HEAD(&ls->ls_cb_delay);
509
510         ls->ls_recoverd_task = NULL;
511         mutex_init(&ls->ls_recoverd_active);
512         spin_lock_init(&ls->ls_recover_lock);
513         spin_lock_init(&ls->ls_rcom_spin);
514         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
515         ls->ls_recover_status = 0;
516         ls->ls_recover_seq = 0;
517         ls->ls_recover_args = NULL;
518         init_rwsem(&ls->ls_in_recovery);
519         init_rwsem(&ls->ls_recv_active);
520         INIT_LIST_HEAD(&ls->ls_requestqueue);
521         mutex_init(&ls->ls_requestqueue_mutex);
522         mutex_init(&ls->ls_clear_proc_locks);
523
524         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
525         if (!ls->ls_recover_buf)
526                 goto out_dirfree;
527
528         INIT_LIST_HEAD(&ls->ls_recover_list);
529         spin_lock_init(&ls->ls_recover_list_lock);
530         ls->ls_recover_list_count = 0;
531         ls->ls_local_handle = ls;
532         init_waitqueue_head(&ls->ls_wait_general);
533         INIT_LIST_HEAD(&ls->ls_root_list);
534         init_rwsem(&ls->ls_root_sem);
535
536         down_write(&ls->ls_in_recovery);
537
538         spin_lock(&lslist_lock);
539         ls->ls_create_count = 1;
540         list_add(&ls->ls_list, &lslist);
541         spin_unlock(&lslist_lock);
542
543         if (flags & DLM_LSFL_FS) {
544                 error = dlm_callback_start(ls);
545                 if (error) {
546                         log_error(ls, "can't start dlm_callback %d", error);
547                         goto out_delist;
548                 }
549         }
550
551         /* needs to find ls in lslist */
552         error = dlm_recoverd_start(ls);
553         if (error) {
554                 log_error(ls, "can't start dlm_recoverd %d", error);
555                 goto out_callback;
556         }
557
558         ls->ls_kobj.kset = dlm_kset;
559         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560                                      "%s", ls->ls_name);
561         if (error)
562                 goto out_recoverd;
563         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564
565         /* let kobject handle freeing of ls if there's an error */
566         do_unreg = 1;
567
568         /* This uevent triggers dlm_controld in userspace to add us to the
569            group of nodes that are members of this lockspace (managed by the
570            cluster infrastructure.)  Once it's done that, it tells us who the
571            current lockspace members are (via configfs) and then tells the
572            lockspace to start running (via sysfs) in dlm_ls_start(). */
573
574         error = do_uevent(ls, 1);
575         if (error)
576                 goto out_recoverd;
577
578         wait_for_completion(&ls->ls_members_done);
579         error = ls->ls_members_result;
580         if (error)
581                 goto out_members;
582
583         dlm_create_debug_file(ls);
584
585         log_debug(ls, "join complete");
586         *lockspace = ls;
587         return 0;
588
589  out_members:
590         do_uevent(ls, 0);
591         dlm_clear_members(ls);
592         kfree(ls->ls_node_array);
593  out_recoverd:
594         dlm_recoverd_stop(ls);
595  out_callback:
596         dlm_callback_stop(ls);
597  out_delist:
598         spin_lock(&lslist_lock);
599         list_del(&ls->ls_list);
600         spin_unlock(&lslist_lock);
601         kfree(ls->ls_recover_buf);
602  out_dirfree:
603         vfree(ls->ls_dirtbl);
604  out_lkbfree:
605         idr_destroy(&ls->ls_lkbidr);
606         vfree(ls->ls_rsbtbl);
607  out_lsfree:
608         if (do_unreg)
609                 kobject_put(&ls->ls_kobj);
610         else
611                 kfree(ls);
612  out:
613         module_put(THIS_MODULE);
614         return error;
615 }
616
617 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
618                       uint32_t flags, int lvblen)
619 {
620         int error = 0;
621
622         mutex_lock(&ls_lock);
623         if (!ls_count)
624                 error = threads_start();
625         if (error)
626                 goto out;
627
628         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
629         if (!error)
630                 ls_count++;
631         if (error > 0)
632                 error = 0;
633         if (!ls_count)
634                 threads_stop();
635  out:
636         mutex_unlock(&ls_lock);
637         return error;
638 }
639
640 static int lkb_idr_is_local(int id, void *p, void *data)
641 {
642         struct dlm_lkb *lkb = p;
643
644         if (!lkb->lkb_nodeid)
645                 return 1;
646         return 0;
647 }
648
649 static int lkb_idr_is_any(int id, void *p, void *data)
650 {
651         return 1;
652 }
653
654 static int lkb_idr_free(int id, void *p, void *data)
655 {
656         struct dlm_lkb *lkb = p;
657
658         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
659                 dlm_free_lvb(lkb->lkb_lvbptr);
660
661         dlm_free_lkb(lkb);
662         return 0;
663 }
664
665 /* NOTE: We check the lkbidr here rather than the resource table.
666    This is because there may be LKBs queued as ASTs that have been unlinked
667    from their RSBs and are pending deletion once the AST has been delivered */
668
669 static int lockspace_busy(struct dlm_ls *ls, int force)
670 {
671         int rv;
672
673         spin_lock(&ls->ls_lkbidr_spin);
674         if (force == 0) {
675                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
676         } else if (force == 1) {
677                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
678         } else {
679                 rv = 0;
680         }
681         spin_unlock(&ls->ls_lkbidr_spin);
682         return rv;
683 }
684
685 static int release_lockspace(struct dlm_ls *ls, int force)
686 {
687         struct dlm_rsb *rsb;
688         struct list_head *head;
689         int i, busy, rv;
690
691         busy = lockspace_busy(ls, force);
692
693         spin_lock(&lslist_lock);
694         if (ls->ls_create_count == 1) {
695                 if (busy) {
696                         rv = -EBUSY;
697                 } else {
698                         /* remove_lockspace takes ls off lslist */
699                         ls->ls_create_count = 0;
700                         rv = 0;
701                 }
702         } else if (ls->ls_create_count > 1) {
703                 rv = --ls->ls_create_count;
704         } else {
705                 rv = -EINVAL;
706         }
707         spin_unlock(&lslist_lock);
708
709         if (rv) {
710                 log_debug(ls, "release_lockspace no remove %d", rv);
711                 return rv;
712         }
713
714         dlm_device_deregister(ls);
715
716         if (force < 3 && dlm_user_daemon_available())
717                 do_uevent(ls, 0);
718
719         dlm_recoverd_stop(ls);
720
721         dlm_callback_stop(ls);
722
723         remove_lockspace(ls);
724
725         dlm_delete_debug_file(ls);
726
727         kfree(ls->ls_recover_buf);
728
729         /*
730          * Free direntry structs.
731          */
732
733         dlm_dir_clear(ls);
734         vfree(ls->ls_dirtbl);
735
736         /*
737          * Free all lkb's in idr
738          */
739
740         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
741         idr_remove_all(&ls->ls_lkbidr);
742         idr_destroy(&ls->ls_lkbidr);
743
744         /*
745          * Free all rsb's on rsbtbl[] lists
746          */
747
748         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749                 head = &ls->ls_rsbtbl[i].list;
750                 while (!list_empty(head)) {
751                         rsb = list_entry(head->next, struct dlm_rsb,
752                                          res_hashchain);
753
754                         list_del(&rsb->res_hashchain);
755                         dlm_free_rsb(rsb);
756                 }
757
758                 head = &ls->ls_rsbtbl[i].toss;
759                 while (!list_empty(head)) {
760                         rsb = list_entry(head->next, struct dlm_rsb,
761                                          res_hashchain);
762                         list_del(&rsb->res_hashchain);
763                         dlm_free_rsb(rsb);
764                 }
765         }
766
767         vfree(ls->ls_rsbtbl);
768
769         while (!list_empty(&ls->ls_new_rsb)) {
770                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
771                                        res_hashchain);
772                 list_del(&rsb->res_hashchain);
773                 dlm_free_rsb(rsb);
774         }
775
776         /*
777          * Free structures on any other lists
778          */
779
780         dlm_purge_requestqueue(ls);
781         kfree(ls->ls_recover_args);
782         dlm_clear_free_entries(ls);
783         dlm_clear_members(ls);
784         dlm_clear_members_gone(ls);
785         kfree(ls->ls_node_array);
786         log_debug(ls, "release_lockspace final free");
787         kobject_put(&ls->ls_kobj);
788         /* The ls structure will be freed when the kobject is done with */
789
790         module_put(THIS_MODULE);
791         return 0;
792 }
793
794 /*
795  * Called when a system has released all its locks and is not going to use the
796  * lockspace any longer.  We free everything we're managing for this lockspace.
797  * Remaining nodes will go through the recovery process as if we'd died.  The
798  * lockspace must continue to function as usual, participating in recoveries,
799  * until this returns.
800  *
801  * Force has 4 possible values:
802  * 0 - don't destroy locksapce if it has any LKBs
803  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804  * 2 - destroy lockspace regardless of LKBs
805  * 3 - destroy lockspace as part of a forced shutdown
806  */
807
808 int dlm_release_lockspace(void *lockspace, int force)
809 {
810         struct dlm_ls *ls;
811         int error;
812
813         ls = dlm_find_lockspace_local(lockspace);
814         if (!ls)
815                 return -EINVAL;
816         dlm_put_lockspace(ls);
817
818         mutex_lock(&ls_lock);
819         error = release_lockspace(ls, force);
820         if (!error)
821                 ls_count--;
822         if (!ls_count)
823                 threads_stop();
824         mutex_unlock(&ls_lock);
825
826         return error;
827 }
828
829 void dlm_stop_lockspaces(void)
830 {
831         struct dlm_ls *ls;
832
833  restart:
834         spin_lock(&lslist_lock);
835         list_for_each_entry(ls, &lslist, ls_list) {
836                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
837                         continue;
838                 spin_unlock(&lslist_lock);
839                 log_error(ls, "no userland control daemon, stopping lockspace");
840                 dlm_ls_stop(ls);
841                 goto restart;
842         }
843         spin_unlock(&lslist_lock);
844 }
845