dlm: add node slots and generation
David Teigland [Thu, 20 Oct 2011 18:26:28 +0000 (13:26 -0500)]
Slot numbers are assigned to nodes when they join the lockspace.
The slot number chosen is the minimum unused value starting at 1.
Once a node is assigned a slot, that slot number will not change
while the node remains a lockspace member.  If the node leaves
and rejoins it can be assigned a new slot number.

A new generation number is also added to a lockspace.  It is
set and incremented during each recovery along with the slot
collection/assignment.

The slot numbers will be passed to gfs2 which will use them as
journal id's.

Signed-off-by: David Teigland <teigland@redhat.com>

fs/dlm/dlm_internal.h
fs/dlm/lockspace.c
fs/dlm/member.c
fs/dlm/member.h
fs/dlm/rcom.c
fs/dlm/rcom.h
fs/dlm/recover.c

index 5685a9a..f4d132c 100644 (file)
@@ -117,6 +117,18 @@ struct dlm_member {
        struct list_head        list;
        int                     nodeid;
        int                     weight;
+       int                     slot;
+       int                     slot_prev;
+       uint32_t                generation;
+};
+
+/*
+ * low nodeid saves array of these in ls_slots
+ */
+
+struct dlm_slot {
+       int                     nodeid;
+       int                     slot;
 };
 
 /*
@@ -337,7 +349,9 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
 /* dlm_header is first element of all structs sent between nodes */
 
 #define DLM_HEADER_MAJOR       0x00030000
-#define DLM_HEADER_MINOR       0x00000000
+#define DLM_HEADER_MINOR       0x00000001
+
+#define DLM_HEADER_SLOTS       0x00000001
 
 #define DLM_MSG                        1
 #define DLM_RCOM               2
@@ -425,10 +439,34 @@ union dlm_packet {
        struct dlm_rcom         rcom;
 };
 
+#define DLM_RSF_NEED_SLOTS     0x00000001
+
+/* RCOM_STATUS data */
+struct rcom_status {
+       __le32                  rs_flags;
+       __le32                  rs_unused1;
+       __le64                  rs_unused2;
+};
+
+/* RCOM_STATUS_REPLY data */
 struct rcom_config {
        __le32                  rf_lvblen;
        __le32                  rf_lsflags;
-       __le64                  rf_unused;
+
+       /* DLM_HEADER_SLOTS adds: */
+       __le32                  rf_flags;
+       __le16                  rf_our_slot;
+       __le16                  rf_num_slots;
+       __le32                  rf_generation;
+       __le32                  rf_unused1;
+       __le64                  rf_unused2;
+};
+
+struct rcom_slot {
+       __le32                  ro_nodeid;
+       __le16                  ro_slot;
+       __le16                  ro_unused1;
+       __le64                  ro_unused2;
 };
 
 struct rcom_lock {
@@ -455,6 +493,7 @@ struct dlm_ls {
        struct list_head        ls_list;        /* list of lockspaces */
        dlm_lockspace_t         *ls_local_handle;
        uint32_t                ls_global_id;   /* global unique lockspace ID */
+       uint32_t                ls_generation;
        uint32_t                ls_exflags;
        int                     ls_lvblen;
        int                     ls_count;       /* refcount of processes in
@@ -493,6 +532,11 @@ struct dlm_ls {
        int                     ls_total_weight;
        int                     *ls_node_array;
 
+       int                     ls_slot;
+       int                     ls_num_slots;
+       int                     ls_slots_size;
+       struct dlm_slot         *ls_slots;
+
        struct dlm_rsb          ls_stub_rsb;    /* for returning errors */
        struct dlm_lkb          ls_stub_lkb;    /* for returning errors */
        struct dlm_message      ls_stub_ms;     /* for faking a reply */
index 1d16a23..1441f04 100644 (file)
@@ -525,6 +525,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        if (!ls->ls_recover_buf)
                goto out_dirfree;
 
+       ls->ls_slot = 0;
+       ls->ls_num_slots = 0;
+       ls->ls_slots_size = 0;
+       ls->ls_slots = NULL;
+
        INIT_LIST_HEAD(&ls->ls_recover_list);
        spin_lock_init(&ls->ls_recover_list_lock);
        ls->ls_recover_list_count = 0;
index 5ebd1df..eebc52a 100644 (file)
 #include "config.h"
 #include "lowcomms.h"
 
+int dlm_slots_version(struct dlm_header *h)
+{
+       if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
+               return 0;
+       return 1;
+}
+
+void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
+                 struct dlm_member *memb)
+{
+       struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
+
+       if (!dlm_slots_version(&rc->rc_header))
+               return;
+
+       memb->slot = le16_to_cpu(rf->rf_our_slot);
+       memb->generation = le32_to_cpu(rf->rf_generation);
+}
+
+void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+       struct dlm_slot *slot;
+       struct rcom_slot *ro;
+       int i;
+
+       ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
+
+       /* ls_slots array is sparse, but not rcom_slots */
+
+       for (i = 0; i < ls->ls_slots_size; i++) {
+               slot = &ls->ls_slots[i];
+               if (!slot->nodeid)
+                       continue;
+               ro->ro_nodeid = cpu_to_le32(slot->nodeid);
+               ro->ro_slot = cpu_to_le16(slot->slot);
+               ro++;
+       }
+}
+
+#define SLOT_DEBUG_LINE 128
+
+static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
+                           struct rcom_slot *ro0, struct dlm_slot *array,
+                           int array_size)
+{
+       char line[SLOT_DEBUG_LINE];
+       int len = SLOT_DEBUG_LINE - 1;
+       int pos = 0;
+       int ret, i;
+
+       if (!dlm_config.ci_log_debug)
+               return;
+
+       memset(line, 0, sizeof(line));
+
+       if (array) {
+               for (i = 0; i < array_size; i++) {
+                       if (!array[i].nodeid)
+                               continue;
+
+                       ret = snprintf(line + pos, len - pos, " %d:%d",
+                                      array[i].slot, array[i].nodeid);
+                       if (ret >= len - pos)
+                               break;
+                       pos += ret;
+               }
+       } else if (ro0) {
+               for (i = 0; i < num_slots; i++) {
+                       ret = snprintf(line + pos, len - pos, " %d:%d",
+                                      ro0[i].ro_slot, ro0[i].ro_nodeid);
+                       if (ret >= len - pos)
+                               break;
+                       pos += ret;
+               }
+       }
+
+       log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
+}
+
+int dlm_slots_copy_in(struct dlm_ls *ls)
+{
+       struct dlm_member *memb;
+       struct dlm_rcom *rc = ls->ls_recover_buf;
+       struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
+       struct rcom_slot *ro0, *ro;
+       int our_nodeid = dlm_our_nodeid();
+       int i, num_slots;
+       uint32_t gen;
+
+       if (!dlm_slots_version(&rc->rc_header))
+               return -1;
+
+       gen = le32_to_cpu(rf->rf_generation);
+       if (gen <= ls->ls_generation) {
+               log_error(ls, "dlm_slots_copy_in gen %u old %u",
+                         gen, ls->ls_generation);
+       }
+       ls->ls_generation = gen;
+
+       num_slots = le16_to_cpu(rf->rf_num_slots);
+       if (!num_slots)
+               return -1;
+
+       ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
+
+       for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
+               ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
+               ro->ro_slot = le16_to_cpu(ro->ro_slot);
+       }
+
+       log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
+                       if (ro->ro_nodeid != memb->nodeid)
+                               continue;
+                       memb->slot = ro->ro_slot;
+                       memb->slot_prev = memb->slot;
+                       break;
+               }
+
+               if (memb->nodeid == our_nodeid) {
+                       if (ls->ls_slot && ls->ls_slot != memb->slot) {
+                               log_error(ls, "dlm_slots_copy_in our slot "
+                                         "changed %d %d", ls->ls_slot,
+                                         memb->slot);
+                               return -1;
+                       }
+
+                       if (!ls->ls_slot)
+                               ls->ls_slot = memb->slot;
+               }
+
+               if (!memb->slot) {
+                       log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
+                                  memb->nodeid);
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+/* for any nodes that do not support slots, we will not have set memb->slot
+   in wait_status_all(), so memb->slot will remain -1, and we will not
+   assign slots or set ls_num_slots here */
+
+int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
+                    struct dlm_slot **slots_out, uint32_t *gen_out)
+{
+       struct dlm_member *memb;
+       struct dlm_slot *array;
+       int our_nodeid = dlm_our_nodeid();
+       int array_size, max_slots, i;
+       int need = 0;
+       int max = 0;
+       int num = 0;
+       uint32_t gen = 0;
+
+       /* our own memb struct will have slot -1 gen 0 */
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               if (memb->nodeid == our_nodeid) {
+                       memb->slot = ls->ls_slot;
+                       memb->generation = ls->ls_generation;
+                       break;
+               }
+       }
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               if (memb->generation > gen)
+                       gen = memb->generation;
+
+               /* node doesn't support slots */
+
+               if (memb->slot == -1)
+                       return -1;
+
+               /* node needs a slot assigned */
+
+               if (!memb->slot)
+                       need++;
+
+               /* node has a slot assigned */
+
+               num++;
+
+               if (!max || max < memb->slot)
+                       max = memb->slot;
+
+               /* sanity check, once slot is assigned it shouldn't change */
+
+               if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
+                       log_error(ls, "nodeid %d slot changed %d %d",
+                                 memb->nodeid, memb->slot_prev, memb->slot);
+                       return -1;
+               }
+               memb->slot_prev = memb->slot;
+       }
+
+       array_size = max + need;
+
+       array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
+       if (!array)
+               return -ENOMEM;
+
+       num = 0;
+
+       /* fill in slots (offsets) that are used */
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               if (!memb->slot)
+                       continue;
+
+               if (memb->slot > array_size) {
+                       log_error(ls, "invalid slot number %d", memb->slot);
+                       kfree(array);
+                       return -1;
+               }
+
+               array[memb->slot - 1].nodeid = memb->nodeid;
+               array[memb->slot - 1].slot = memb->slot;
+               num++;
+       }
+
+       /* assign new slots from unused offsets */
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               if (memb->slot)
+                       continue;
+
+               for (i = 0; i < array_size; i++) {
+                       if (array[i].nodeid)
+                               continue;
+
+                       memb->slot = i + 1;
+                       memb->slot_prev = memb->slot;
+                       array[i].nodeid = memb->nodeid;
+                       array[i].slot = memb->slot;
+                       num++;
+
+                       if (!ls->ls_slot && memb->nodeid == our_nodeid)
+                               ls->ls_slot = memb->slot;
+                       break;
+               }
+
+               if (!memb->slot) {
+                       log_error(ls, "no free slot found");
+                       kfree(array);
+                       return -1;
+               }
+       }
+
+       gen++;
+
+       log_debug_slots(ls, gen, num, NULL, array, array_size);
+
+       max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
+                    sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
+
+       if (num > max_slots) {
+               log_error(ls, "num_slots %d exceeds max_slots %d",
+                         num, max_slots);
+               kfree(array);
+               return -1;
+       }
+
+       *gen_out = gen;
+       *slots_out = array;
+       *slots_size = array_size;
+       *num_slots = num;
+       return 0;
+}
+
 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 {
        struct dlm_member *memb = NULL;
@@ -176,7 +450,7 @@ static int ping_members(struct dlm_ls *ls)
                error = dlm_recovery_stopped(ls);
                if (error)
                        break;
-               error = dlm_rcom_status(ls, memb->nodeid);
+               error = dlm_rcom_status(ls, memb->nodeid, 0);
                if (error)
                        break;
        }
@@ -322,7 +596,15 @@ int dlm_ls_stop(struct dlm_ls *ls)
         */
 
        dlm_recoverd_suspend(ls);
+
+       spin_lock(&ls->ls_recover_lock);
+       kfree(ls->ls_slots);
+       ls->ls_slots = NULL;
+       ls->ls_num_slots = 0;
+       ls->ls_slots_size = 0;
        ls->ls_recover_status = 0;
+       spin_unlock(&ls->ls_recover_lock);
+
        dlm_recoverd_resume(ls);
 
        if (!ls->ls_recover_begin)
index 7a26fca..7e87e8a 100644 (file)
@@ -20,6 +20,13 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
 int dlm_is_removed(struct dlm_ls *ls, int nodeid);
 int dlm_is_member(struct dlm_ls *ls, int nodeid);
+int dlm_slots_version(struct dlm_header *h);
+void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
+                  struct dlm_member *memb);
+void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_slots_copy_in(struct dlm_ls *ls);
+int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
+                    struct dlm_slot **slots_out, uint32_t *gen_out);
 
 #endif                          /* __MEMBER_DOT_H__ */
 
index f10a50f..ac5c616 100644 (file)
@@ -23,6 +23,7 @@
 #include "memory.h"
 #include "lock.h"
 #include "util.h"
+#include "member.h"
 
 
 static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
        dlm_lowcomms_commit_buffer(mh);
 }
 
+static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
+                           uint32_t flags)
+{
+       rs->rs_flags = cpu_to_le32(flags);
+}
+
 /* When replying to a status request, a node also sends back its
    configuration values.  The requesting node then checks that the remote
    node is configured the same way as itself. */
 
-static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
+static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
+                           uint32_t num_slots)
 {
        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
+
+       rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
+       rf->rf_num_slots = cpu_to_le16(num_slots);
+       rf->rf_generation =  cpu_to_le32(ls->ls_generation);
 }
 
-static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
+static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
-       size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 
        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
                log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
                return -EPROTO;
        }
 
-       if (rc->rc_header.h_length < conf_size) {
-               log_error(ls, "config too short: %d nodeid %d",
-                         rc->rc_header.h_length, nodeid);
-               return -EPROTO;
-       }
-
        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
        spin_unlock(&ls->ls_rcom_spin);
 }
 
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
+/*
+ * low nodeid gathers one slot value at a time from each node.
+ * it sets need_slots=0, and saves rf_our_slot returned from each
+ * rcom_config.
+ *
+ * other nodes gather all slot values at once from the low nodeid.
+ * they set need_slots=1, and ignore the rf_our_slot returned from each
+ * rcom_config.  they use the rf_num_slots returned from the low
+ * node's rcom_config.
+ */
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 {
        struct dlm_rcom *rc;
        struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
                goto out;
        }
 
-       error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
+       error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
+                           sizeof(struct rcom_status), &rc, &mh);
        if (error)
                goto out;
 
+       set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
+
        allow_sync_reply(ls, &rc->rc_id);
        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 
@@ -161,8 +180,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
                /* we pretend the remote lockspace exists with 0 status */
                log_debug(ls, "remote node %d not ready", nodeid);
                rc->rc_result = 0;
-       } else
-               error = check_config(ls, rc, nodeid);
+               error = 0;
+       } else {
+               error = check_rcom_config(ls, rc, nodeid);
+       }
+
        /* the caller looks at rc_result for the remote recovery status */
  out:
        return error;
@@ -172,17 +194,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 {
        struct dlm_rcom *rc;
        struct dlm_mhandle *mh;
-       int error, nodeid = rc_in->rc_header.h_nodeid;
+       struct rcom_status *rs;
+       uint32_t status;
+       int nodeid = rc_in->rc_header.h_nodeid;
+       int len = sizeof(struct rcom_config);
+       int num_slots = 0;
+       int error;
+
+       if (!dlm_slots_version(&rc_in->rc_header)) {
+               status = dlm_recover_status(ls);
+               goto do_create;
+       }
+
+       rs = (struct rcom_status *)rc_in->rc_buf;
 
+       if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
+               status = dlm_recover_status(ls);
+               goto do_create;
+       }
+
+       spin_lock(&ls->ls_recover_lock);
+       status = ls->ls_recover_status;
+       num_slots = ls->ls_num_slots;
+       spin_unlock(&ls->ls_recover_lock);
+       len += num_slots * sizeof(struct rcom_slot);
+
+ do_create:
        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
-                           sizeof(struct rcom_config), &rc, &mh);
+                           len, &rc, &mh);
        if (error)
                return;
+
        rc->rc_id = rc_in->rc_id;
        rc->rc_seq_reply = rc_in->rc_seq;
-       rc->rc_result = dlm_recover_status(ls);
-       make_config(ls, (struct rcom_config *) rc->rc_buf);
+       rc->rc_result = status;
+
+       set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
+
+       if (!num_slots)
+               goto do_send;
+
+       spin_lock(&ls->ls_recover_lock);
+       if (ls->ls_num_slots != num_slots) {
+               spin_unlock(&ls->ls_recover_lock);
+               log_debug(ls, "receive_rcom_status num_slots %d to %d",
+                         num_slots, ls->ls_num_slots);
+               rc->rc_result = 0;
+               set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
+               goto do_send;
+       }
+
+       dlm_slots_copy_out(ls, rc);
+       spin_unlock(&ls->ls_recover_lock);
 
+ do_send:
        send_rcom(ls, mh, rc);
 }
 
index b09abd2..206723a 100644 (file)
@@ -14,7 +14,7 @@
 #ifndef __RCOM_DOT_H__
 #define __RCOM_DOT_H__
 
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
index 81b2393..34d5adf 100644 (file)
@@ -85,14 +85,20 @@ uint32_t dlm_recover_status(struct dlm_ls *ls)
        return status;
 }
 
+static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
+{
+       ls->ls_recover_status |= status;
+}
+
 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 {
        spin_lock(&ls->ls_recover_lock);
-       ls->ls_recover_status |= status;
+       _set_recover_status(ls, status);
        spin_unlock(&ls->ls_recover_lock);
 }
 
-static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
+                          int save_slots)
 {
        struct dlm_rcom *rc = ls->ls_recover_buf;
        struct dlm_member *memb;
@@ -106,10 +112,13 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
                                goto out;
                        }
 
-                       error = dlm_rcom_status(ls, memb->nodeid);
+                       error = dlm_rcom_status(ls, memb->nodeid, 0);
                        if (error)
                                goto out;
 
+                       if (save_slots)
+                               dlm_slot_save(ls, rc, memb);
+
                        if (rc->rc_result & wait_status)
                                break;
                        if (delay < 1000)
@@ -121,7 +130,8 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
        return error;
 }
 
-static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
+                          uint32_t status_flags)
 {
        struct dlm_rcom *rc = ls->ls_recover_buf;
        int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -132,7 +142,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
                        goto out;
                }
 
-               error = dlm_rcom_status(ls, nodeid);
+               error = dlm_rcom_status(ls, nodeid, status_flags);
                if (error)
                        break;
 
@@ -152,18 +162,56 @@ static int wait_status(struct dlm_ls *ls, uint32_t status)
        int error;
 
        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
-               error = wait_status_all(ls, status);
+               error = wait_status_all(ls, status, 0);
                if (!error)
                        dlm_set_recover_status(ls, status_all);
        } else
-               error = wait_status_low(ls, status_all);
+               error = wait_status_low(ls, status_all, 0);
 
        return error;
 }
 
 int dlm_recover_members_wait(struct dlm_ls *ls)
 {
-       return wait_status(ls, DLM_RS_NODES);
+       struct dlm_member *memb;
+       struct dlm_slot *slots;
+       int num_slots, slots_size;
+       int error, rv;
+       uint32_t gen;
+
+       list_for_each_entry(memb, &ls->ls_nodes, list) {
+               memb->slot = -1;
+               memb->generation = 0;
+       }
+
+       if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+               error = wait_status_all(ls, DLM_RS_NODES, 1);
+               if (error)
+                       goto out;
+
+               /* slots array is sparse, slots_size may be > num_slots */
+
+               rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
+               if (!rv) {
+                       spin_lock(&ls->ls_recover_lock);
+                       _set_recover_status(ls, DLM_RS_NODES_ALL);
+                       ls->ls_num_slots = num_slots;
+                       ls->ls_slots_size = slots_size;
+                       ls->ls_slots = slots;
+                       ls->ls_generation = gen;
+                       spin_unlock(&ls->ls_recover_lock);
+               } else {
+                       dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
+               }
+       } else {
+               error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
+               if (error)
+                       goto out;
+
+               dlm_slots_copy_in(ls);
+       }
+ out:
+       return error;
 }
 
 int dlm_recover_directory_wait(struct dlm_ls *ls)