dlm: convert rsb list to rb_tree
Bob Peterson [Wed, 26 Oct 2011 20:24:55 +0000 (15:24 -0500)]
Change the linked lists to rb_tree's in the rsb
hash table to speed up searches.  Slow rsb searches
were having a large impact on gfs2 performance due
to the large number of dlm locks gfs2 uses.

Signed-off-by: Bob Peterson <rpeterso@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>

fs/dlm/debug_fs.c
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lockspace.c
fs/dlm/recover.c

index 5977923..3dca2b3 100644 (file)
@@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+       struct rb_node *node;
        struct dlm_ls *ls = seq->private;
        struct rsbtbl_iter *ri;
        struct dlm_rsb *r;
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
                ri->format = 3;
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
-       if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-               list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-                                   res_hashchain) {
+       if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+               for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
+                    node = rb_next(node)) {
+                       r = rb_entry(node, struct dlm_rsb, res_hashnode);
                        if (!entry--) {
                                dlm_hold_rsb(r);
                                ri->rsb = r;
@@ -449,9 +451,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
                }
 
                spin_lock(&ls->ls_rsbtbl[bucket].lock);
-               if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-                       r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-                                            struct dlm_rsb, res_hashchain);
+               if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+                       node = rb_first(&ls->ls_rsbtbl[bucket].keep);
+                       r = rb_entry(node, struct dlm_rsb, res_hashnode);
                        dlm_hold_rsb(r);
                        ri->rsb = r;
                        ri->bucket = bucket;
@@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
        struct dlm_ls *ls = seq->private;
        struct rsbtbl_iter *ri = iter_ptr;
-       struct list_head *next;
+       struct rb_node *next;
        struct dlm_rsb *r, *rp;
        loff_t n = *pos;
        unsigned bucket;
@@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
        rp = ri->rsb;
-       next = rp->res_hashchain.next;
+       next = rb_next(&rp->res_hashnode);
 
-       if (next != &ls->ls_rsbtbl[bucket].list) {
-               r = list_entry(next, struct dlm_rsb, res_hashchain);
+       if (next) {
+               r = rb_entry(next, struct dlm_rsb, res_hashnode);
                dlm_hold_rsb(r);
                ri->rsb = r;
                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
                }
 
                spin_lock(&ls->ls_rsbtbl[bucket].lock);
-               if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-                       r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-                                            struct dlm_rsb, res_hashchain);
+               if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+                       next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+                       r = rb_entry(next, struct dlm_rsb, res_hashnode);
                        dlm_hold_rsb(r);
                        ri->rsb = r;
                        ri->bucket = bucket;
index fe2860c..5685a9a 100644 (file)
@@ -103,8 +103,8 @@ struct dlm_dirtable {
 };
 
 struct dlm_rsbtable {
-       struct list_head        list;
-       struct list_head        toss;
+       struct rb_root          keep;
+       struct rb_root          toss;
        spinlock_t              lock;
 };
 
@@ -285,7 +285,10 @@ struct dlm_rsb {
        unsigned long           res_toss_time;
        uint32_t                res_first_lkid;
        struct list_head        res_lookup;     /* lkbs waiting on first */
-       struct list_head        res_hashchain;  /* rsbtbl */
+       union {
+               struct list_head        res_hashchain;
+               struct rb_node          res_hashnode;   /* rsbtbl */
+       };
        struct list_head        res_grantqueue;
        struct list_head        res_convertqueue;
        struct list_head        res_waitqueue;
index 83b5e32..d471830 100644 (file)
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 
        r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
        list_del(&r->res_hashchain);
+       /* Convert the empty list_head to a NULL rb_node for tree usage: */
+       memset(&r->res_hashnode, 0, sizeof(struct rb_node));
        ls->ls_new_rsb_count--;
        spin_unlock(&ls->ls_new_rsb_spin);
 
@@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
        memcpy(r->res_name, name, len);
        mutex_init(&r->res_mutex);
 
-       INIT_LIST_HEAD(&r->res_hashchain);
        INIT_LIST_HEAD(&r->res_lookup);
        INIT_LIST_HEAD(&r->res_grantqueue);
        INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
        return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+{
+       char maxname[DLM_RESNAME_MAXLEN];
+
+       memset(maxname, 0, DLM_RESNAME_MAXLEN);
+       memcpy(maxname, name, nlen);
+       return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
+}
+
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
                           unsigned int flags, struct dlm_rsb **r_ret)
 {
+       struct rb_node *node = tree->rb_node;
        struct dlm_rsb *r;
        int error = 0;
-
-       list_for_each_entry(r, head, res_hashchain) {
-               if (len == r->res_length && !memcmp(name, r->res_name, len))
+       int rc;
+
+       while (node) {
+               r = rb_entry(node, struct dlm_rsb, res_hashnode);
+               rc = rsb_cmp(r, name, len);
+               if (rc < 0)
+                       node = node->rb_left;
+               else if (rc > 0)
+                       node = node->rb_right;
+               else
                        goto found;
        }
        *r_ret = NULL;
@@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
        return error;
 }
 
+static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+       struct rb_node **newn = &tree->rb_node;
+       struct rb_node *parent = NULL;
+       int rc;
+
+       while (*newn) {
+               struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+                                              res_hashnode);
+
+               parent = *newn;
+               rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
+               if (rc < 0)
+                       newn = &parent->rb_left;
+               else if (rc > 0)
+                       newn = &parent->rb_right;
+               else {
+                       log_print("rsb_insert match");
+                       dlm_dump_rsb(rsb);
+                       dlm_dump_rsb(cur);
+                       return -EEXIST;
+               }
+       }
+
+       rb_link_node(&rsb->res_hashnode, parent, newn);
+       rb_insert_color(&rsb->res_hashnode, tree);
+       return 0;
+}
+
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
                       unsigned int flags, struct dlm_rsb **r_ret)
 {
        struct dlm_rsb *r;
        int error;
 
-       error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+       error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
        if (!error) {
                kref_get(&r->res_ref);
                goto out;
        }
-       error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+       error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
        if (error)
                goto out;
 
-       list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       if (error)
+               return error;
 
        if (dlm_no_directory(ls))
                goto out;
@@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                        nodeid = 0;
                r->res_nodeid = nodeid;
        }
-       list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-       error = 0;
+       error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
  out_unlock:
        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
  out:
@@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref)
 
        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
        kref_init(&r->res_ref);
-       list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+       rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
        r->res_toss_time = jiffies;
        if (r->res_lvbptr) {
                dlm_free_lvb(r->res_lvbptr);
@@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r)
                                     r->res_name, r->res_length);
 }
 
-/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
-   found since they are in order of newest to oldest? */
+/* FIXME: make this more efficient */
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+       struct rb_node *n;
        struct dlm_rsb *r;
        int count = 0, found;
 
        for (;;) {
                found = 0;
                spin_lock(&ls->ls_rsbtbl[b].lock);
-               list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-                                           res_hashchain) {
+               for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
                        if (!time_after_eq(jiffies, r->res_toss_time +
                                           dlm_config.ci_toss_secs * HZ))
                                continue;
@@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
                }
 
                if (kref_put(&r->res_ref, kill_rsb)) {
-                       list_del(&r->res_hashchain);
+                       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
                        spin_unlock(&ls->ls_rsbtbl[b].lock);
 
                        if (is_master(r))
@@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls)
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+       struct rb_node *n;
        struct dlm_rsb *r, *r_ret = NULL;
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
-       list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+       for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+               r = rb_entry(n, struct dlm_rsb, res_hashnode);
                if (!rsb_flag(r, RSB_LOCKS_PURGED))
                        continue;
                hold_rsb(r);
index a1d8f1a..1d16a23 100644 (file)
@@ -457,8 +457,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        if (!ls->ls_rsbtbl)
                goto out_lsfree;
        for (i = 0; i < size; i++) {
-               INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-               INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+               ls->ls_rsbtbl[i].keep.rb_node = NULL;
+               ls->ls_rsbtbl[i].toss.rb_node = NULL;
                spin_lock_init(&ls->ls_rsbtbl[i].lock);
        }
 
@@ -685,7 +685,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
        struct dlm_rsb *rsb;
-       struct list_head *head;
+       struct rb_node *n;
        int i, busy, rv;
 
        busy = lockspace_busy(ls, force);
@@ -746,20 +746,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
         */
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               head = &ls->ls_rsbtbl[i].list;
-               while (!list_empty(head)) {
-                       rsb = list_entry(head->next, struct dlm_rsb,
-                                        res_hashchain);
-
-                       list_del(&rsb->res_hashchain);
+               while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       rb_erase(n, &ls->ls_rsbtbl[i].keep);
                        dlm_free_rsb(rsb);
                }
 
-               head = &ls->ls_rsbtbl[i].toss;
-               while (!list_empty(head)) {
-                       rsb = list_entry(head->next, struct dlm_rsb,
-                                        res_hashchain);
-                       list_del(&rsb->res_hashchain);
+               while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       rb_erase(n, &ls->ls_rsbtbl[i].toss);
                        dlm_free_rsb(rsb);
                }
        }
index 1463823..50467ce 100644 (file)
@@ -715,6 +715,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+       struct rb_node *n;
        struct dlm_rsb *r;
        int i, error = 0;
 
@@ -727,7 +728,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
                spin_lock(&ls->ls_rsbtbl[i].lock);
-               list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
                        list_add(&r->res_root_list, &ls->ls_root_list);
                        dlm_hold_rsb(r);
                }
@@ -741,7 +743,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
                        continue;
                }
 
-               list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
                        list_add(&r->res_root_list, &ls->ls_root_list);
                        dlm_hold_rsb(r);
                }
@@ -771,16 +774,18 @@ void dlm_release_root_list(struct dlm_ls *ls)
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-       struct dlm_rsb *r, *safe;
+       struct rb_node *n, *next;
+       struct dlm_rsb *rsb;
        int i;
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
                spin_lock(&ls->ls_rsbtbl[i].lock);
-               list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-                                        res_hashchain) {
-                       if (dlm_no_directory(ls) || !is_master(r)) {
-                               list_del(&r->res_hashchain);
-                               dlm_free_rsb(r);
+               for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+                       next = rb_next(n);;
+                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (dlm_no_directory(ls) || !is_master(rsb)) {
+                               rb_erase(n, &ls->ls_rsbtbl[i].toss);
+                               dlm_free_rsb(rsb);
                        }
                }
                spin_unlock(&ls->ls_rsbtbl[i].lock);