ceph: put unused osd connections on lru
Yehuda Sadeh [Wed, 3 Feb 2010 19:00:26 +0000 (11:00 -0800)]
Instead of removing osd connection immediately when the
requests list is empty, put the osd connection on an lru.
Only if that osd has not been used for more than a specified
time, will it be removed.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>

fs/ceph/osd_client.c
fs/ceph/osd_client.h
fs/ceph/super.c
fs/ceph/super.h

index 35c8afe..7f8a26f 100644 (file)
@@ -389,6 +389,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        INIT_LIST_HEAD(&osd->o_requests);
+       INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
        ceph_con_init(osdc->client->msgr, &osd->o_con);
@@ -422,25 +423,56 @@ static void put_osd(struct ceph_osd *osd)
 /*
  * remove an osd from our map
  */
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       dout("remove_osd %p\n", osd);
+       dout("__remove_osd %p\n", osd);
        BUG_ON(!list_empty(&osd->o_requests));
        rb_erase(&osd->o_node, &osdc->osds);
+       list_del_init(&osd->o_osd_lru);
        ceph_con_close(&osd->o_con);
        put_osd(osd);
 }
 
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+                             struct ceph_osd *osd)
+{
+       dout("__move_osd_to_lru %p\n", osd);
+       BUG_ON(!list_empty(&osd->o_osd_lru));
+       list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+       osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+       dout("__remove_osd_from_lru %p\n", osd);
+       if (!list_empty(&osd->o_osd_lru))
+               list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+       struct ceph_osd *osd, *nosd;
+
+       dout("__remove_old_osds %p\n", osdc);
+       mutex_lock(&osdc->request_mutex);
+       list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+               if (!remove_all && time_before(jiffies, osd->lru_ttl))
+                       break;
+               __remove_osd(osdc, osd);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+
 /*
  * reset osd connect
  */
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
        int ret = 0;
 
-       dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+       dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests)) {
-               remove_osd(osdc, osd);
+               __remove_osd(osdc, osd);
        } else {
                ceph_con_close(&osd->o_con);
                ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -533,7 +565,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests))
-                       remove_osd(osdc, req->r_osd);
+                       __move_osd_to_lru(osdc, req->r_osd);
                req->r_osd = NULL;
        }
 
@@ -611,7 +643,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
                if (list_empty(&req->r_osd->o_requests)) {
                        /* try to re-use r_osd if possible */
                        newosd = get_osd(req->r_osd);
-                       remove_osd(osdc, newosd);
+                       __remove_osd(osdc, newosd);
                }
                req->r_osd = NULL;
        }
@@ -636,8 +668,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
                ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
        }
 
-       if (req->r_osd)
+       if (req->r_osd) {
+               __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
+       }
        err = 1;   /* osd changed */
 
 out:
@@ -744,6 +778,23 @@ static void handle_timeout(struct work_struct *work)
        up_read(&osdc->map_sem);
 }
 
+static void handle_osds_timeout(struct work_struct *work)
+{
+       struct ceph_osd_client *osdc =
+               container_of(work, struct ceph_osd_client,
+                            osds_timeout_work.work);
+       unsigned long delay =
+               osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+       dout("osds timeout\n");
+       down_read(&osdc->map_sem);
+       remove_old_osds(osdc, 0);
+       up_read(&osdc->map_sem);
+
+       schedule_delayed_work(&osdc->osds_timeout_work,
+                             round_jiffies_relative(delay));
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -881,7 +932,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
                                   ceph_osd_addr(osdc->osdmap,
                                                 osd->o_osd),
                                   sizeof(struct ceph_entity_addr)) != 0)
-                               reset_osd(osdc, osd);
+                               __reset_osd(osdc, osd);
                }
        }
 
@@ -1195,9 +1246,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+       INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+
+       schedule_delayed_work(&osdc->osds_timeout_work,
+          round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
 
        err = -ENOMEM;
        osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -1219,10 +1275,12 @@ out:
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
        cancel_delayed_work_sync(&osdc->timeout_work);
+       cancel_delayed_work_sync(&osdc->osds_timeout_work);
        if (osdc->osdmap) {
                ceph_osdmap_destroy(osdc->osdmap);
                osdc->osdmap = NULL;
        }
+       remove_old_osds(osdc, 1);
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
 }
index 8d533d9..70f31b6 100644 (file)
@@ -31,9 +31,11 @@ struct ceph_osd {
        struct rb_node o_node;
        struct ceph_connection o_con;
        struct list_head o_requests;
+       struct list_head o_osd_lru;
        struct ceph_authorizer *o_authorizer;
        void *o_authorizer_buf, *o_authorizer_reply_buf;
        size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
+       unsigned long lru_ttl;
 };
 
 /* an in-flight request */
@@ -90,11 +92,13 @@ struct ceph_osd_client {
 
        struct mutex           request_mutex;
        struct rb_root         osds;          /* osds */
+       struct list_head       osd_lru;       /* idle osds */
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */
        int                    num_requests;
        struct delayed_work    timeout_work;
+       struct delayed_work    osds_timeout_work;
 #ifdef CONFIG_DEBUG_FS
        struct dentry          *debugfs_file;
 #endif
index 3a25489..39aaf29 100644 (file)
@@ -293,6 +293,7 @@ enum {
        Opt_rsize,
        Opt_osdtimeout,
        Opt_mount_timeout,
+       Opt_osd_idle_ttl,
        Opt_caps_wanted_delay_min,
        Opt_caps_wanted_delay_max,
        Opt_readdir_max_entries,
@@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
        {Opt_rsize, "rsize=%d"},
        {Opt_osdtimeout, "osdtimeout=%d"},
        {Opt_mount_timeout, "mount_timeout=%d"},
+       {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
        {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
        {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
        {Opt_readdir_max_entries, "readdir_max_entries=%d"},
@@ -367,6 +369,7 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
        args->flags = CEPH_OPT_DEFAULT;
        args->osd_timeout = 5;    /* seconds */
        args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
+       args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
        args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
        args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
        args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
index 770f7b5..3930fb6 100644 (file)
@@ -53,6 +53,7 @@ struct ceph_mount_args {
        struct ceph_entity_addr *mon_addr;
        int flags;
        int mount_timeout;
+       int osd_idle_ttl;
        int caps_wanted_delay_min, caps_wanted_delay_max;
        struct ceph_fsid fsid;
        struct ceph_entity_addr my_addr;
@@ -71,6 +72,7 @@ struct ceph_mount_args {
  * defaults
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
+#define CEPH_OSD_IDLE_TTL_DEFAULT    60
 #define CEPH_MOUNT_RSIZE_DEFAULT    (512*1024) /* readahead */
 
 #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)