[PATCH] OCFS2: The Second Oracle Cluster Filesystem
[linux-2.6.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdebug.h"
45 #include "dlmdomain.h"
46
47 #include "dlmver.h"
48
49 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50 #include "cluster/masklog.h"
51
52 /*
53  *
54  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
55  *    dlm_domain_lock
56  *    struct dlm_ctxt->spinlock
57  *    struct dlm_lock_resource->spinlock
58  *    struct dlm_ctxt->master_lock
59  *    struct dlm_ctxt->ast_lock
60  *    dlm_master_list_entry->spinlock
61  *    dlm_lock->spinlock
62  *
63  */
64
65 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
66 LIST_HEAD(dlm_domains);
67 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
68
69 #define DLM_DOMAIN_BACKOFF_MS 200
70
71 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
72 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
73 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
74 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
75
76 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
77
78 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
79 {
80         list_del_init(&lockres->list);
81         dlm_lockres_put(lockres);
82 }
83
84 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
85                        struct dlm_lock_resource *res)
86 {
87         struct list_head *bucket;
88         struct qstr *q;
89
90         assert_spin_locked(&dlm->spinlock);
91
92         q = &res->lockname;
93         q->hash = full_name_hash(q->name, q->len);
94         bucket = &(dlm->resources[q->hash & DLM_HASH_MASK]);
95
96         /* get a reference for our hashtable */
97         dlm_lockres_get(res);
98
99         list_add_tail(&res->list, bucket);
100 }
101
102 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
103                                          const char *name,
104                                          unsigned int len)
105 {
106         unsigned int hash;
107         struct list_head *iter;
108         struct dlm_lock_resource *tmpres=NULL;
109         struct list_head *bucket;
110
111         mlog_entry("%.*s\n", len, name);
112
113         assert_spin_locked(&dlm->spinlock);
114
115         hash = full_name_hash(name, len);
116
117         bucket = &(dlm->resources[hash & DLM_HASH_MASK]);
118
119         /* check for pre-existing lock */
120         list_for_each(iter, bucket) {
121                 tmpres = list_entry(iter, struct dlm_lock_resource, list);
122                 if (tmpres->lockname.len == len &&
123                     memcmp(tmpres->lockname.name, name, len) == 0) {
124                         dlm_lockres_get(tmpres);
125                         break;
126                 }
127
128                 tmpres = NULL;
129         }
130         return tmpres;
131 }
132
133 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
134                                     const char *name,
135                                     unsigned int len)
136 {
137         struct dlm_lock_resource *res;
138
139         spin_lock(&dlm->spinlock);
140         res = __dlm_lookup_lockres(dlm, name, len);
141         spin_unlock(&dlm->spinlock);
142         return res;
143 }
144
145 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
146 {
147         struct dlm_ctxt *tmp = NULL;
148         struct list_head *iter;
149
150         assert_spin_locked(&dlm_domain_lock);
151
152         /* tmp->name here is always NULL terminated,
153          * but domain may not be! */
154         list_for_each(iter, &dlm_domains) {
155                 tmp = list_entry (iter, struct dlm_ctxt, list);
156                 if (strlen(tmp->name) == len &&
157                     memcmp(tmp->name, domain, len)==0)
158                         break;
159                 tmp = NULL;
160         }
161
162         return tmp;
163 }
164
165 /* For null terminated domain strings ONLY */
166 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
167 {
168         assert_spin_locked(&dlm_domain_lock);
169
170         return __dlm_lookup_domain_full(domain, strlen(domain));
171 }
172
173
174 /* returns true on one of two conditions:
175  * 1) the domain does not exist
176  * 2) the domain exists and it's state is "joined" */
177 static int dlm_wait_on_domain_helper(const char *domain)
178 {
179         int ret = 0;
180         struct dlm_ctxt *tmp = NULL;
181
182         spin_lock(&dlm_domain_lock);
183
184         tmp = __dlm_lookup_domain(domain);
185         if (!tmp)
186                 ret = 1;
187         else if (tmp->dlm_state == DLM_CTXT_JOINED)
188                 ret = 1;
189
190         spin_unlock(&dlm_domain_lock);
191         return ret;
192 }
193
194 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
195 {
196         if (dlm->resources)
197                 free_page((unsigned long) dlm->resources);
198
199         if (dlm->name)
200                 kfree(dlm->name);
201
202         kfree(dlm);
203 }
204
205 /* A little strange - this function will be called while holding
206  * dlm_domain_lock and is expected to be holding it on the way out. We
207  * will however drop and reacquire it multiple times */
208 static void dlm_ctxt_release(struct kref *kref)
209 {
210         struct dlm_ctxt *dlm;
211
212         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
213
214         BUG_ON(dlm->num_joins);
215         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
216
217         /* we may still be in the list if we hit an error during join. */
218         list_del_init(&dlm->list);
219
220         spin_unlock(&dlm_domain_lock);
221
222         mlog(0, "freeing memory from domain %s\n", dlm->name);
223
224         wake_up(&dlm_domain_events);
225
226         dlm_free_ctxt_mem(dlm);
227
228         spin_lock(&dlm_domain_lock);
229 }
230
231 void dlm_put(struct dlm_ctxt *dlm)
232 {
233         spin_lock(&dlm_domain_lock);
234         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
235         spin_unlock(&dlm_domain_lock);
236 }
237
238 static void __dlm_get(struct dlm_ctxt *dlm)
239 {
240         kref_get(&dlm->dlm_refs);
241 }
242
243 /* given a questionable reference to a dlm object, gets a reference if
244  * it can find it in the list, otherwise returns NULL in which case
245  * you shouldn't trust your pointer. */
246 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
247 {
248         struct list_head *iter;
249         struct dlm_ctxt *target = NULL;
250
251         spin_lock(&dlm_domain_lock);
252
253         list_for_each(iter, &dlm_domains) {
254                 target = list_entry (iter, struct dlm_ctxt, list);
255
256                 if (target == dlm) {
257                         __dlm_get(target);
258                         break;
259                 }
260
261                 target = NULL;
262         }
263
264         spin_unlock(&dlm_domain_lock);
265
266         return target;
267 }
268
269 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
270 {
271         int ret;
272
273         spin_lock(&dlm_domain_lock);
274         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
275                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
276         spin_unlock(&dlm_domain_lock);
277
278         return ret;
279 }
280
281 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
282 {
283         dlm_unregister_domain_handlers(dlm);
284         dlm_complete_thread(dlm);
285         dlm_complete_recovery_thread(dlm);
286
287         /* We've left the domain. Now we can take ourselves out of the
288          * list and allow the kref stuff to help us free the
289          * memory. */
290         spin_lock(&dlm_domain_lock);
291         list_del_init(&dlm->list);
292         spin_unlock(&dlm_domain_lock);
293
294         /* Wake up anyone waiting for us to remove this domain */
295         wake_up(&dlm_domain_events);
296 }
297
298 static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
299 {
300         int i;
301         struct dlm_lock_resource *res;
302
303         mlog(0, "Migrating locks from domain %s\n", dlm->name);
304 restart:
305         spin_lock(&dlm->spinlock);
306         for (i=0; i<DLM_HASH_SIZE; i++) {
307                 while (!list_empty(&dlm->resources[i])) {
308                         res = list_entry(dlm->resources[i].next,
309                                      struct dlm_lock_resource, list);
310                         /* need reference when manually grabbing lockres */
311                         dlm_lockres_get(res);
312                         /* this should unhash the lockres
313                          * and exit with dlm->spinlock */
314                         mlog(0, "purging res=%p\n", res);
315                         if (dlm_lockres_is_dirty(dlm, res)) {
316                                 /* HACK!  this should absolutely go.
317                                  * need to figure out why some empty
318                                  * lockreses are still marked dirty */
319                                 mlog(ML_ERROR, "lockres %.*s dirty!\n",
320                                      res->lockname.len, res->lockname.name);
321
322                                 spin_unlock(&dlm->spinlock);
323                                 dlm_kick_thread(dlm, res);
324                                 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
325                                 dlm_lockres_put(res);
326                                 goto restart;
327                         }
328                         dlm_purge_lockres(dlm, res);
329                         dlm_lockres_put(res);
330                 }
331         }
332         spin_unlock(&dlm->spinlock);
333
334         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
335 }
336
337 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
338 {
339         int ret;
340
341         spin_lock(&dlm->spinlock);
342         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
343         spin_unlock(&dlm->spinlock);
344
345         return ret;
346 }
347
348 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
349 {
350         /* Yikes, a double spinlock! I need domain_lock for the dlm
351          * state and the dlm spinlock for join state... Sorry! */
352 again:
353         spin_lock(&dlm_domain_lock);
354         spin_lock(&dlm->spinlock);
355
356         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
357                 mlog(0, "Node %d is joining, we wait on it.\n",
358                           dlm->joining_node);
359                 spin_unlock(&dlm->spinlock);
360                 spin_unlock(&dlm_domain_lock);
361
362                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
363                 goto again;
364         }
365
366         dlm->dlm_state = DLM_CTXT_LEAVING;
367         spin_unlock(&dlm->spinlock);
368         spin_unlock(&dlm_domain_lock);
369 }
370
371 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
372 {
373         int node = -1;
374
375         assert_spin_locked(&dlm->spinlock);
376
377         mlog(ML_NOTICE, "Nodes in my domain (\"%s\"):\n", dlm->name);
378
379         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
380                                      node + 1)) < O2NM_MAX_NODES) {
381                 mlog(ML_NOTICE, " node %d\n", node);
382         }
383 }
384
385 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
386 {
387         struct dlm_ctxt *dlm = data;
388         unsigned int node;
389         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
390
391         mlog_entry("%p %u %p", msg, len, data);
392
393         if (!dlm_grab(dlm))
394                 return 0;
395
396         node = exit_msg->node_idx;
397
398         mlog(0, "Node %u leaves domain %s\n", node, dlm->name);
399
400         spin_lock(&dlm->spinlock);
401         clear_bit(node, dlm->domain_map);
402         __dlm_print_nodes(dlm);
403
404         /* notify anything attached to the heartbeat events */
405         dlm_hb_event_notify_attached(dlm, node, 0);
406
407         spin_unlock(&dlm->spinlock);
408
409         dlm_put(dlm);
410
411         return 0;
412 }
413
414 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
415                                     unsigned int node)
416 {
417         int status;
418         struct dlm_exit_domain leave_msg;
419
420         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
421                   node, dlm->name, dlm->node_num);
422
423         memset(&leave_msg, 0, sizeof(leave_msg));
424         leave_msg.node_idx = dlm->node_num;
425
426         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
427                                     &leave_msg, sizeof(leave_msg), node,
428                                     NULL);
429
430         mlog(0, "status return %d from o2net_send_message\n", status);
431
432         return status;
433 }
434
435
436 static void dlm_leave_domain(struct dlm_ctxt *dlm)
437 {
438         int node, clear_node, status;
439
440         /* At this point we've migrated away all our locks and won't
441          * accept mastership of new ones. The dlm is responsible for
442          * almost nothing now. We make sure not to confuse any joining
443          * nodes and then commence shutdown procedure. */
444
445         spin_lock(&dlm->spinlock);
446         /* Clear ourselves from the domain map */
447         clear_bit(dlm->node_num, dlm->domain_map);
448         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
449                                      0)) < O2NM_MAX_NODES) {
450                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
451                  * -nodes cannot be added now as the
452                  *   query_join_handlers knows to respond with OK_NO_MAP
453                  * -we catch the right network errors if a node is
454                  *   removed from the map while we're sending him the
455                  *   exit message. */
456                 spin_unlock(&dlm->spinlock);
457
458                 clear_node = 1;
459
460                 status = dlm_send_one_domain_exit(dlm, node);
461                 if (status < 0 &&
462                     status != -ENOPROTOOPT &&
463                     status != -ENOTCONN) {
464                         mlog(ML_NOTICE, "Error %d sending domain exit message "
465                              "to node %d\n", status, node);
466
467                         /* Not sure what to do here but lets sleep for
468                          * a bit in case this was a transient
469                          * error... */
470                         msleep(DLM_DOMAIN_BACKOFF_MS);
471                         clear_node = 0;
472                 }
473
474                 spin_lock(&dlm->spinlock);
475                 /* If we're not clearing the node bit then we intend
476                  * to loop back around to try again. */
477                 if (clear_node)
478                         clear_bit(node, dlm->domain_map);
479         }
480         spin_unlock(&dlm->spinlock);
481 }
482
483 int dlm_joined(struct dlm_ctxt *dlm)
484 {
485         int ret = 0;
486
487         spin_lock(&dlm_domain_lock);
488
489         if (dlm->dlm_state == DLM_CTXT_JOINED)
490                 ret = 1;
491
492         spin_unlock(&dlm_domain_lock);
493
494         return ret;
495 }
496
497 int dlm_shutting_down(struct dlm_ctxt *dlm)
498 {
499         int ret = 0;
500
501         spin_lock(&dlm_domain_lock);
502
503         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
504                 ret = 1;
505
506         spin_unlock(&dlm_domain_lock);
507
508         return ret;
509 }
510
511 void dlm_unregister_domain(struct dlm_ctxt *dlm)
512 {
513         int leave = 0;
514
515         spin_lock(&dlm_domain_lock);
516         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
517         BUG_ON(!dlm->num_joins);
518
519         dlm->num_joins--;
520         if (!dlm->num_joins) {
521                 /* We mark it "in shutdown" now so new register
522                  * requests wait until we've completely left the
523                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
524                  * want new domain joins to communicate with us at
525                  * least until we've completed migration of our
526                  * resources. */
527                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
528                 leave = 1;
529         }
530         spin_unlock(&dlm_domain_lock);
531
532         if (leave) {
533                 mlog(0, "shutting down domain %s\n", dlm->name);
534
535                 /* We changed dlm state, notify the thread */
536                 dlm_kick_thread(dlm, NULL);
537
538                 dlm_migrate_all_locks(dlm);
539                 dlm_mark_domain_leaving(dlm);
540                 dlm_leave_domain(dlm);
541                 dlm_complete_dlm_shutdown(dlm);
542         }
543         dlm_put(dlm);
544 }
545 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
546
547 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
548 {
549         struct dlm_query_join_request *query;
550         enum dlm_query_join_response response;
551         struct dlm_ctxt *dlm = NULL;
552
553         query = (struct dlm_query_join_request *) msg->buf;
554
555         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
556                   query->domain);
557
558         /*
559          * If heartbeat doesn't consider the node live, tell it
560          * to back off and try again.  This gives heartbeat a chance
561          * to catch up.
562          */
563         if (!o2hb_check_node_heartbeating(query->node_idx)) {
564                 mlog(0, "node %u is not in our live map yet\n",
565                      query->node_idx);
566
567                 response = JOIN_DISALLOW;
568                 goto respond;
569         }
570
571         response = JOIN_OK_NO_MAP;
572
573         spin_lock(&dlm_domain_lock);
574         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
575         /* Once the dlm ctxt is marked as leaving then we don't want
576          * to be put in someone's domain map. */
577         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
578                 spin_lock(&dlm->spinlock);
579
580                 if (dlm->dlm_state == DLM_CTXT_NEW &&
581                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
582                         /*If this is a brand new context and we
583                          * haven't started our join process yet, then
584                          * the other node won the race. */
585                         response = JOIN_OK_NO_MAP;
586                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
587                         /* Disallow parallel joins. */
588                         response = JOIN_DISALLOW;
589                 } else {
590                         /* Alright we're fully a part of this domain
591                          * so we keep some state as to who's joining
592                          * and indicate to him that needs to be fixed
593                          * up. */
594                         response = JOIN_OK;
595                         __dlm_set_joining_node(dlm, query->node_idx);
596                 }
597
598                 spin_unlock(&dlm->spinlock);
599         }
600         spin_unlock(&dlm_domain_lock);
601
602 respond:
603         mlog(0, "We respond with %u\n", response);
604
605         return response;
606 }
607
608 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
609 {
610         struct dlm_assert_joined *assert;
611         struct dlm_ctxt *dlm = NULL;
612
613         assert = (struct dlm_assert_joined *) msg->buf;
614
615         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
616                   assert->domain);
617
618         spin_lock(&dlm_domain_lock);
619         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
620         /* XXX should we consider no dlm ctxt an error? */
621         if (dlm) {
622                 spin_lock(&dlm->spinlock);
623
624                 /* Alright, this node has officially joined our
625                  * domain. Set him in the map and clean up our
626                  * leftover join state. */
627                 BUG_ON(dlm->joining_node != assert->node_idx);
628                 set_bit(assert->node_idx, dlm->domain_map);
629                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
630
631                 __dlm_print_nodes(dlm);
632
633                 /* notify anything attached to the heartbeat events */
634                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
635
636                 spin_unlock(&dlm->spinlock);
637         }
638         spin_unlock(&dlm_domain_lock);
639
640         return 0;
641 }
642
643 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
644 {
645         struct dlm_cancel_join *cancel;
646         struct dlm_ctxt *dlm = NULL;
647
648         cancel = (struct dlm_cancel_join *) msg->buf;
649
650         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
651                   cancel->domain);
652
653         spin_lock(&dlm_domain_lock);
654         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
655
656         if (dlm) {
657                 spin_lock(&dlm->spinlock);
658
659                 /* Yikes, this guy wants to cancel his join. No
660                  * problem, we simply cleanup our join state. */
661                 BUG_ON(dlm->joining_node != cancel->node_idx);
662                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
663
664                 spin_unlock(&dlm->spinlock);
665         }
666         spin_unlock(&dlm_domain_lock);
667
668         return 0;
669 }
670
671 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
672                                     unsigned int node)
673 {
674         int status;
675         struct dlm_cancel_join cancel_msg;
676
677         memset(&cancel_msg, 0, sizeof(cancel_msg));
678         cancel_msg.node_idx = dlm->node_num;
679         cancel_msg.name_len = strlen(dlm->name);
680         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
681
682         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
683                                     &cancel_msg, sizeof(cancel_msg), node,
684                                     NULL);
685         if (status < 0) {
686                 mlog_errno(status);
687                 goto bail;
688         }
689
690 bail:
691         return status;
692 }
693
694 /* map_size should be in bytes. */
695 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
696                                  unsigned long *node_map,
697                                  unsigned int map_size)
698 {
699         int status, tmpstat;
700         unsigned int node;
701
702         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
703                          sizeof(unsigned long))) {
704                 mlog(ML_ERROR,
705                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
706                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
707                 return -EINVAL;
708         }
709
710         status = 0;
711         node = -1;
712         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
713                                      node + 1)) < O2NM_MAX_NODES) {
714                 if (node == dlm->node_num)
715                         continue;
716
717                 tmpstat = dlm_send_one_join_cancel(dlm, node);
718                 if (tmpstat) {
719                         mlog(ML_ERROR, "Error return %d cancelling join on "
720                              "node %d\n", tmpstat, node);
721                         if (!status)
722                                 status = tmpstat;
723                 }
724         }
725
726         if (status)
727                 mlog_errno(status);
728         return status;
729 }
730
731 static int dlm_request_join(struct dlm_ctxt *dlm,
732                             int node,
733                             enum dlm_query_join_response *response)
734 {
735         int status, retval;
736         struct dlm_query_join_request join_msg;
737
738         mlog(0, "querying node %d\n", node);
739
740         memset(&join_msg, 0, sizeof(join_msg));
741         join_msg.node_idx = dlm->node_num;
742         join_msg.name_len = strlen(dlm->name);
743         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
744
745         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
746                                     sizeof(join_msg), node, &retval);
747         if (status < 0 && status != -ENOPROTOOPT) {
748                 mlog_errno(status);
749                 goto bail;
750         }
751
752         /* -ENOPROTOOPT from the net code means the other side isn't
753             listening for our message type -- that's fine, it means
754             his dlm isn't up, so we can consider him a 'yes' but not
755             joined into the domain.  */
756         if (status == -ENOPROTOOPT) {
757                 status = 0;
758                 *response = JOIN_OK_NO_MAP;
759         } else if (retval == JOIN_DISALLOW ||
760                    retval == JOIN_OK ||
761                    retval == JOIN_OK_NO_MAP) {
762                 *response = retval;
763         } else {
764                 status = -EINVAL;
765                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
766                      node);
767         }
768
769         mlog(0, "status %d, node %d response is %d\n", status, node,
770                   *response);
771
772 bail:
773         return status;
774 }
775
776 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
777                                     unsigned int node)
778 {
779         int status;
780         struct dlm_assert_joined assert_msg;
781
782         mlog(0, "Sending join assert to node %u\n", node);
783
784         memset(&assert_msg, 0, sizeof(assert_msg));
785         assert_msg.node_idx = dlm->node_num;
786         assert_msg.name_len = strlen(dlm->name);
787         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
788
789         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
790                                     &assert_msg, sizeof(assert_msg), node,
791                                     NULL);
792         if (status < 0)
793                 mlog_errno(status);
794
795         return status;
796 }
797
798 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
799                                   unsigned long *node_map)
800 {
801         int status, node, live;
802
803         status = 0;
804         node = -1;
805         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
806                                      node + 1)) < O2NM_MAX_NODES) {
807                 if (node == dlm->node_num)
808                         continue;
809
810                 do {
811                         /* It is very important that this message be
812                          * received so we spin until either the node
813                          * has died or it gets the message. */
814                         status = dlm_send_one_join_assert(dlm, node);
815
816                         spin_lock(&dlm->spinlock);
817                         live = test_bit(node, dlm->live_nodes_map);
818                         spin_unlock(&dlm->spinlock);
819
820                         if (status) {
821                                 mlog(ML_ERROR, "Error return %d asserting "
822                                      "join on node %d\n", status, node);
823
824                                 /* give us some time between errors... */
825                                 if (live)
826                                         msleep(DLM_DOMAIN_BACKOFF_MS);
827                         }
828                 } while (status && live);
829         }
830 }
831
832 struct domain_join_ctxt {
833         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
834         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
835 };
836
837 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
838                                    struct domain_join_ctxt *ctxt,
839                                    enum dlm_query_join_response response)
840 {
841         int ret;
842
843         if (response == JOIN_DISALLOW) {
844                 mlog(0, "Latest response of disallow -- should restart\n");
845                 return 1;
846         }
847
848         spin_lock(&dlm->spinlock);
849         /* For now, we restart the process if the node maps have
850          * changed at all */
851         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
852                      sizeof(dlm->live_nodes_map));
853         spin_unlock(&dlm->spinlock);
854
855         if (ret)
856                 mlog(0, "Node maps changed -- should restart\n");
857
858         return ret;
859 }
860
861 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
862 {
863         int status = 0, tmpstat, node;
864         struct domain_join_ctxt *ctxt;
865         enum dlm_query_join_response response;
866
867         mlog_entry("%p", dlm);
868
869         ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
870         if (!ctxt) {
871                 status = -ENOMEM;
872                 mlog_errno(status);
873                 goto bail;
874         }
875
876         /* group sem locking should work for us here -- we're already
877          * registered for heartbeat events so filling this should be
878          * atomic wrt getting those handlers called. */
879         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
880
881         spin_lock(&dlm->spinlock);
882         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
883
884         __dlm_set_joining_node(dlm, dlm->node_num);
885
886         spin_unlock(&dlm->spinlock);
887
888         node = -1;
889         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
890                                      node + 1)) < O2NM_MAX_NODES) {
891                 if (node == dlm->node_num)
892                         continue;
893
894                 status = dlm_request_join(dlm, node, &response);
895                 if (status < 0) {
896                         mlog_errno(status);
897                         goto bail;
898                 }
899
900                 /* Ok, either we got a response or the node doesn't have a
901                  * dlm up. */
902                 if (response == JOIN_OK)
903                         set_bit(node, ctxt->yes_resp_map);
904
905                 if (dlm_should_restart_join(dlm, ctxt, response)) {
906                         status = -EAGAIN;
907                         goto bail;
908                 }
909         }
910
911         mlog(0, "Yay, done querying nodes!\n");
912
913         /* Yay, everyone agree's we can join the domain. My domain is
914          * comprised of all nodes who were put in the
915          * yes_resp_map. Copy that into our domain map and send a join
916          * assert message to clean up everyone elses state. */
917         spin_lock(&dlm->spinlock);
918         memcpy(dlm->domain_map, ctxt->yes_resp_map,
919                sizeof(ctxt->yes_resp_map));
920         set_bit(dlm->node_num, dlm->domain_map);
921         spin_unlock(&dlm->spinlock);
922
923         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
924
925         /* Joined state *must* be set before the joining node
926          * information, otherwise the query_join handler may read no
927          * current joiner but a state of NEW and tell joining nodes
928          * we're not in the domain. */
929         spin_lock(&dlm_domain_lock);
930         dlm->dlm_state = DLM_CTXT_JOINED;
931         dlm->num_joins++;
932         spin_unlock(&dlm_domain_lock);
933
934 bail:
935         spin_lock(&dlm->spinlock);
936         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
937         if (!status)
938                 __dlm_print_nodes(dlm);
939         spin_unlock(&dlm->spinlock);
940
941         if (ctxt) {
942                 /* Do we need to send a cancel message to any nodes? */
943                 if (status < 0) {
944                         tmpstat = dlm_send_join_cancels(dlm,
945                                                         ctxt->yes_resp_map,
946                                                         sizeof(ctxt->yes_resp_map));
947                         if (tmpstat < 0)
948                                 mlog_errno(tmpstat);
949                 }
950                 kfree(ctxt);
951         }
952
953         mlog(0, "returning %d\n", status);
954         return status;
955 }
956
957 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
958 {
959         o2hb_unregister_callback(&dlm->dlm_hb_up);
960         o2hb_unregister_callback(&dlm->dlm_hb_down);
961         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
962 }
963
964 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
965 {
966         int status;
967
968         mlog(0, "registering handlers.\n");
969
970         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
971                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
972         status = o2hb_register_callback(&dlm->dlm_hb_down);
973         if (status)
974                 goto bail;
975
976         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
977                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
978         status = o2hb_register_callback(&dlm->dlm_hb_up);
979         if (status)
980                 goto bail;
981
982         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
983                                         sizeof(struct dlm_master_request),
984                                         dlm_master_request_handler,
985                                         dlm, &dlm->dlm_domain_handlers);
986         if (status)
987                 goto bail;
988
989         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
990                                         sizeof(struct dlm_assert_master),
991                                         dlm_assert_master_handler,
992                                         dlm, &dlm->dlm_domain_handlers);
993         if (status)
994                 goto bail;
995
996         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
997                                         sizeof(struct dlm_create_lock),
998                                         dlm_create_lock_handler,
999                                         dlm, &dlm->dlm_domain_handlers);
1000         if (status)
1001                 goto bail;
1002
1003         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1004                                         DLM_CONVERT_LOCK_MAX_LEN,
1005                                         dlm_convert_lock_handler,
1006                                         dlm, &dlm->dlm_domain_handlers);
1007         if (status)
1008                 goto bail;
1009
1010         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1011                                         DLM_UNLOCK_LOCK_MAX_LEN,
1012                                         dlm_unlock_lock_handler,
1013                                         dlm, &dlm->dlm_domain_handlers);
1014         if (status)
1015                 goto bail;
1016
1017         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1018                                         DLM_PROXY_AST_MAX_LEN,
1019                                         dlm_proxy_ast_handler,
1020                                         dlm, &dlm->dlm_domain_handlers);
1021         if (status)
1022                 goto bail;
1023
1024         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1025                                         sizeof(struct dlm_exit_domain),
1026                                         dlm_exit_domain_handler,
1027                                         dlm, &dlm->dlm_domain_handlers);
1028         if (status)
1029                 goto bail;
1030
1031         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1032                                         sizeof(struct dlm_migrate_request),
1033                                         dlm_migrate_request_handler,
1034                                         dlm, &dlm->dlm_domain_handlers);
1035         if (status)
1036                 goto bail;
1037
1038         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1039                                         DLM_MIG_LOCKRES_MAX_LEN,
1040                                         dlm_mig_lockres_handler,
1041                                         dlm, &dlm->dlm_domain_handlers);
1042         if (status)
1043                 goto bail;
1044
1045         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1046                                         sizeof(struct dlm_master_requery),
1047                                         dlm_master_requery_handler,
1048                                         dlm, &dlm->dlm_domain_handlers);
1049         if (status)
1050                 goto bail;
1051
1052         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1053                                         sizeof(struct dlm_lock_request),
1054                                         dlm_request_all_locks_handler,
1055                                         dlm, &dlm->dlm_domain_handlers);
1056         if (status)
1057                 goto bail;
1058
1059         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1060                                         sizeof(struct dlm_reco_data_done),
1061                                         dlm_reco_data_done_handler,
1062                                         dlm, &dlm->dlm_domain_handlers);
1063         if (status)
1064                 goto bail;
1065
1066         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1067                                         sizeof(struct dlm_begin_reco),
1068                                         dlm_begin_reco_handler,
1069                                         dlm, &dlm->dlm_domain_handlers);
1070         if (status)
1071                 goto bail;
1072
1073         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1074                                         sizeof(struct dlm_finalize_reco),
1075                                         dlm_finalize_reco_handler,
1076                                         dlm, &dlm->dlm_domain_handlers);
1077         if (status)
1078                 goto bail;
1079
1080 bail:
1081         if (status)
1082                 dlm_unregister_domain_handlers(dlm);
1083
1084         return status;
1085 }
1086
1087 static int dlm_join_domain(struct dlm_ctxt *dlm)
1088 {
1089         int status;
1090
1091         BUG_ON(!dlm);
1092
1093         mlog(0, "Join domain %s\n", dlm->name);
1094
1095         status = dlm_register_domain_handlers(dlm);
1096         if (status) {
1097                 mlog_errno(status);
1098                 goto bail;
1099         }
1100
1101         status = dlm_launch_thread(dlm);
1102         if (status < 0) {
1103                 mlog_errno(status);
1104                 goto bail;
1105         }
1106
1107         status = dlm_launch_recovery_thread(dlm);
1108         if (status < 0) {
1109                 mlog_errno(status);
1110                 goto bail;
1111         }
1112
1113         do {
1114                 unsigned int backoff;
1115                 status = dlm_try_to_join_domain(dlm);
1116
1117                 /* If we're racing another node to the join, then we
1118                  * need to back off temporarily and let them
1119                  * complete. */
1120                 if (status == -EAGAIN) {
1121                         if (signal_pending(current)) {
1122                                 status = -ERESTARTSYS;
1123                                 goto bail;
1124                         }
1125
1126                         /*
1127                          * <chip> After you!
1128                          * <dale> No, after you!
1129                          * <chip> I insist!
1130                          * <dale> But you first!
1131                          * ...
1132                          */
1133                         backoff = (unsigned int)(jiffies & 0x3);
1134                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1135                         mlog(0, "backoff %d\n", backoff);
1136                         msleep(backoff);
1137                 }
1138         } while (status == -EAGAIN);
1139
1140         if (status < 0) {
1141                 mlog_errno(status);
1142                 goto bail;
1143         }
1144
1145         status = 0;
1146 bail:
1147         wake_up(&dlm_domain_events);
1148
1149         if (status) {
1150                 dlm_unregister_domain_handlers(dlm);
1151                 dlm_complete_thread(dlm);
1152                 dlm_complete_recovery_thread(dlm);
1153         }
1154
1155         return status;
1156 }
1157
1158 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1159                                 u32 key)
1160 {
1161         int i;
1162         struct dlm_ctxt *dlm = NULL;
1163
1164         dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
1165         if (!dlm) {
1166                 mlog_errno(-ENOMEM);
1167                 goto leave;
1168         }
1169
1170         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1171         if (dlm->name == NULL) {
1172                 mlog_errno(-ENOMEM);
1173                 kfree(dlm);
1174                 dlm = NULL;
1175                 goto leave;
1176         }
1177
1178         dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
1179         if (!dlm->resources) {
1180                 mlog_errno(-ENOMEM);
1181                 kfree(dlm->name);
1182                 kfree(dlm);
1183                 dlm = NULL;
1184                 goto leave;
1185         }
1186         memset(dlm->resources, 0, PAGE_SIZE);
1187
1188         for (i=0; i<DLM_HASH_SIZE; i++)
1189                 INIT_LIST_HEAD(&dlm->resources[i]);
1190
1191         strcpy(dlm->name, domain);
1192         dlm->key = key;
1193         dlm->node_num = o2nm_this_node();
1194
1195         spin_lock_init(&dlm->spinlock);
1196         spin_lock_init(&dlm->master_lock);
1197         spin_lock_init(&dlm->ast_lock);
1198         INIT_LIST_HEAD(&dlm->list);
1199         INIT_LIST_HEAD(&dlm->dirty_list);
1200         INIT_LIST_HEAD(&dlm->reco.resources);
1201         INIT_LIST_HEAD(&dlm->reco.received);
1202         INIT_LIST_HEAD(&dlm->reco.node_data);
1203         INIT_LIST_HEAD(&dlm->purge_list);
1204         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1205         dlm->reco.state = 0;
1206
1207         INIT_LIST_HEAD(&dlm->pending_asts);
1208         INIT_LIST_HEAD(&dlm->pending_basts);
1209
1210         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1211                   dlm->recovery_map, &(dlm->recovery_map[0]));
1212
1213         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1214         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1215         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1216
1217         dlm->dlm_thread_task = NULL;
1218         dlm->dlm_reco_thread_task = NULL;
1219         init_waitqueue_head(&dlm->dlm_thread_wq);
1220         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1221         init_waitqueue_head(&dlm->reco.event);
1222         init_waitqueue_head(&dlm->ast_wq);
1223         init_waitqueue_head(&dlm->migration_wq);
1224         INIT_LIST_HEAD(&dlm->master_list);
1225         INIT_LIST_HEAD(&dlm->mle_hb_events);
1226
1227         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1228         init_waitqueue_head(&dlm->dlm_join_events);
1229
1230         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1231         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1232         atomic_set(&dlm->local_resources, 0);
1233         atomic_set(&dlm->remote_resources, 0);
1234         atomic_set(&dlm->unknown_resources, 0);
1235
1236         spin_lock_init(&dlm->work_lock);
1237         INIT_LIST_HEAD(&dlm->work_list);
1238         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
1239
1240         kref_init(&dlm->dlm_refs);
1241         dlm->dlm_state = DLM_CTXT_NEW;
1242
1243         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1244
1245         mlog(0, "context init: refcount %u\n",
1246                   atomic_read(&dlm->dlm_refs.refcount));
1247
1248 leave:
1249         return dlm;
1250 }
1251
1252 /*
1253  * dlm_register_domain: one-time setup per "domain"
1254  */
1255 struct dlm_ctxt * dlm_register_domain(const char *domain,
1256                                u32 key)
1257 {
1258         int ret;
1259         struct dlm_ctxt *dlm = NULL;
1260         struct dlm_ctxt *new_ctxt = NULL;
1261
1262         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1263                 ret = -ENAMETOOLONG;
1264                 mlog(ML_ERROR, "domain name length too long\n");
1265                 goto leave;
1266         }
1267
1268         if (!o2hb_check_local_node_heartbeating()) {
1269                 mlog(ML_ERROR, "the local node has not been configured, or is "
1270                      "not heartbeating\n");
1271                 ret = -EPROTO;
1272                 goto leave;
1273         }
1274
1275         mlog(0, "register called for domain \"%s\"\n", domain);
1276
1277 retry:
1278         dlm = NULL;
1279         if (signal_pending(current)) {
1280                 ret = -ERESTARTSYS;
1281                 mlog_errno(ret);
1282                 goto leave;
1283         }
1284
1285         spin_lock(&dlm_domain_lock);
1286
1287         dlm = __dlm_lookup_domain(domain);
1288         if (dlm) {
1289                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1290                         spin_unlock(&dlm_domain_lock);
1291
1292                         mlog(0, "This ctxt is not joined yet!\n");
1293                         wait_event_interruptible(dlm_domain_events,
1294                                                  dlm_wait_on_domain_helper(
1295                                                          domain));
1296                         goto retry;
1297                 }
1298
1299                 __dlm_get(dlm);
1300                 dlm->num_joins++;
1301
1302                 spin_unlock(&dlm_domain_lock);
1303
1304                 ret = 0;
1305                 goto leave;
1306         }
1307
1308         /* doesn't exist */
1309         if (!new_ctxt) {
1310                 spin_unlock(&dlm_domain_lock);
1311
1312                 new_ctxt = dlm_alloc_ctxt(domain, key);
1313                 if (new_ctxt)
1314                         goto retry;
1315
1316                 ret = -ENOMEM;
1317                 mlog_errno(ret);
1318                 goto leave;
1319         }
1320
1321         /* a little variable switch-a-roo here... */
1322         dlm = new_ctxt;
1323         new_ctxt = NULL;
1324
1325         /* add the new domain */
1326         list_add_tail(&dlm->list, &dlm_domains);
1327         spin_unlock(&dlm_domain_lock);
1328
1329         ret = dlm_join_domain(dlm);
1330         if (ret) {
1331                 mlog_errno(ret);
1332                 dlm_put(dlm);
1333                 goto leave;
1334         }
1335
1336         ret = 0;
1337 leave:
1338         if (new_ctxt)
1339                 dlm_free_ctxt_mem(new_ctxt);
1340
1341         if (ret < 0)
1342                 dlm = ERR_PTR(ret);
1343
1344         return dlm;
1345 }
1346 EXPORT_SYMBOL_GPL(dlm_register_domain);
1347
1348 static LIST_HEAD(dlm_join_handlers);
1349
1350 static void dlm_unregister_net_handlers(void)
1351 {
1352         o2net_unregister_handler_list(&dlm_join_handlers);
1353 }
1354
1355 static int dlm_register_net_handlers(void)
1356 {
1357         int status = 0;
1358
1359         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1360                                         sizeof(struct dlm_query_join_request),
1361                                         dlm_query_join_handler,
1362                                         NULL, &dlm_join_handlers);
1363         if (status)
1364                 goto bail;
1365
1366         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1367                                         sizeof(struct dlm_assert_joined),
1368                                         dlm_assert_joined_handler,
1369                                         NULL, &dlm_join_handlers);
1370         if (status)
1371                 goto bail;
1372
1373         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1374                                         sizeof(struct dlm_cancel_join),
1375                                         dlm_cancel_join_handler,
1376                                         NULL, &dlm_join_handlers);
1377
1378 bail:
1379         if (status < 0)
1380                 dlm_unregister_net_handlers();
1381
1382         return status;
1383 }
1384
1385 /* Domain eviction callback handling.
1386  *
1387  * The file system requires notification of node death *before* the
1388  * dlm completes it's recovery work, otherwise it may be able to
1389  * acquire locks on resources requiring recovery. Since the dlm can
1390  * evict a node from it's domain *before* heartbeat fires, a similar
1391  * mechanism is required. */
1392
1393 /* Eviction is not expected to happen often, so a per-domain lock is
1394  * not necessary. Eviction callbacks are allowed to sleep for short
1395  * periods of time. */
1396 static DECLARE_RWSEM(dlm_callback_sem);
1397
1398 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1399                                         int node_num)
1400 {
1401         struct list_head *iter;
1402         struct dlm_eviction_cb *cb;
1403
1404         down_read(&dlm_callback_sem);
1405         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1406                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1407
1408                 cb->ec_func(node_num, cb->ec_data);
1409         }
1410         up_read(&dlm_callback_sem);
1411 }
1412
1413 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1414                            dlm_eviction_func *f,
1415                            void *data)
1416 {
1417         INIT_LIST_HEAD(&cb->ec_item);
1418         cb->ec_func = f;
1419         cb->ec_data = data;
1420 }
1421 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1422
1423 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1424                               struct dlm_eviction_cb *cb)
1425 {
1426         down_write(&dlm_callback_sem);
1427         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1428         up_write(&dlm_callback_sem);
1429 }
1430 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1431
1432 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1433 {
1434         down_write(&dlm_callback_sem);
1435         list_del_init(&cb->ec_item);
1436         up_write(&dlm_callback_sem);
1437 }
1438 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1439
1440 static int __init dlm_init(void)
1441 {
1442         int status;
1443
1444         dlm_print_version();
1445
1446         status = dlm_init_mle_cache();
1447         if (status)
1448                 return -1;
1449
1450         status = dlm_register_net_handlers();
1451         if (status) {
1452                 dlm_destroy_mle_cache();
1453                 return -1;
1454         }
1455
1456         return 0;
1457 }
1458
1459 static void __exit dlm_exit (void)
1460 {
1461         dlm_unregister_net_handlers();
1462         dlm_destroy_mle_cache();
1463 }
1464
1465 MODULE_AUTHOR("Oracle");
1466 MODULE_LICENSE("GPL");
1467
1468 module_init(dlm_init);
1469 module_exit(dlm_exit);