blk-throttle: Dynamically allocate root group
[linux-3.10.git] / block / blk-throttle.c
index a467002..68f2ac3 100644 (file)
@@ -20,6 +20,11 @@ static int throtl_quantum = 32;
 /* Throttling is performed over 100ms slice and after that slice is renewed */
 static unsigned long throtl_slice = HZ/10;     /* 100 ms */
 
+/* A workqueue to queue throttle related work */
+static struct workqueue_struct *kthrotld_workqueue;
+static void throtl_schedule_delayed_work(struct throtl_data *td,
+                               unsigned long delay);
+
 struct throtl_rb_root {
        struct rb_root rb;
        struct rb_node *left;
@@ -72,7 +77,7 @@ struct throtl_grp {
        unsigned long slice_end[2];
 
        /* Some throttle limits got updated for the group */
-       bool limits_changed;
+       int limits_changed;
 };
 
 struct throtl_data
@@ -83,7 +88,7 @@ struct throtl_data
        /* service tree for active throtl groups */
        struct throtl_rb_root tg_service_tree;
 
-       struct throtl_grp root_tg;
+       struct throtl_grp *root_tg;
        struct request_queue *queue;
 
        /* Total Number of queued bios on READ and WRITE lists */
@@ -97,7 +102,7 @@ struct throtl_data
        /* Work for dispatching throttled bios */
        struct delayed_work throtl_work;
 
-       atomic_t limits_changed;
+       int limits_changed;
 };
 
 enum tg_state_flags {
@@ -154,40 +159,17 @@ static void throtl_put_tg(struct throtl_grp *tg)
        kfree(tg);
 }
 
-static struct throtl_grp * throtl_find_alloc_tg(struct throtl_data *td,
-                       struct cgroup *cgroup)
+static void throtl_init_group(struct throtl_grp *tg)
 {
-       struct blkio_cgroup *blkcg = cgroup_to_blkio_cgroup(cgroup);
-       struct throtl_grp *tg = NULL;
-       void *key = td;
-       struct backing_dev_info *bdi = &td->queue->backing_dev_info;
-       unsigned int major, minor;
-
-       /*
-        * TODO: Speed up blkiocg_lookup_group() by maintaining a radix
-        * tree of blkg (instead of traversing through hash list all
-        * the time.
-        */
-       tg = tg_of_blkg(blkiocg_lookup_group(blkcg, key));
-
-       /* Fill in device details for root group */
-       if (tg && !tg->blkg.dev && bdi->dev && dev_name(bdi->dev)) {
-               sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
-               tg->blkg.dev = MKDEV(major, minor);
-               goto done;
-       }
-
-       if (tg)
-               goto done;
-
-       tg = kzalloc_node(sizeof(*tg), GFP_ATOMIC, td->queue->node);
-       if (!tg)
-               goto done;
-
        INIT_HLIST_NODE(&tg->tg_node);
        RB_CLEAR_NODE(&tg->rb_node);
        bio_list_init(&tg->bio_lists[0]);
        bio_list_init(&tg->bio_lists[1]);
+       tg->limits_changed = false;
+
+       /* Practically unlimited BW */
+       tg->bps[0] = tg->bps[1] = -1;
+       tg->iops[0] = tg->iops[1] = -1;
 
        /*
         * Take the initial reference that will be released on destroy
@@ -196,6 +178,21 @@ static struct throtl_grp * throtl_find_alloc_tg(struct throtl_data *td,
         * exit or cgroup deletion path depending on who is exiting first.
         */
        atomic_set(&tg->ref, 1);
+}
+
+/* Should be called with rcu read lock held (needed for blkcg) */
+static void
+throtl_add_group_to_td_list(struct throtl_data *td, struct throtl_grp *tg)
+{
+       hlist_add_head(&tg->tg_node, &td->tg_list);
+       td->nr_undestroyed_grps++;
+}
+
+static void throtl_init_add_tg_lists(struct throtl_data *td,
+                       struct throtl_grp *tg, struct blkio_cgroup *blkcg)
+{
+       struct backing_dev_info *bdi = &td->queue->backing_dev_info;
+       unsigned int major, minor;
 
        /* Add group onto cgroup list */
        sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
@@ -207,22 +204,120 @@ static struct throtl_grp * throtl_find_alloc_tg(struct throtl_data *td,
        tg->iops[READ] = blkcg_get_read_iops(blkcg, tg->blkg.dev);
        tg->iops[WRITE] = blkcg_get_write_iops(blkcg, tg->blkg.dev);
 
-       hlist_add_head(&tg->tg_node, &td->tg_list);
-       td->nr_undestroyed_grps++;
-done:
+       throtl_add_group_to_td_list(td, tg);
+}
+
+/* Should be called without queue lock and outside of rcu period */
+static struct throtl_grp *throtl_alloc_tg(struct throtl_data *td)
+{
+       struct throtl_grp *tg = NULL;
+
+       tg = kzalloc_node(sizeof(*tg), GFP_ATOMIC, td->queue->node);
+       if (!tg)
+               return NULL;
+
+       throtl_init_group(tg);
        return tg;
 }
 
-static struct throtl_grp * throtl_get_tg(struct throtl_data *td)
+static struct
+throtl_grp *throtl_find_tg(struct throtl_data *td, struct blkio_cgroup *blkcg)
 {
-       struct cgroup *cgroup;
        struct throtl_grp *tg = NULL;
+       void *key = td;
+       struct backing_dev_info *bdi = &td->queue->backing_dev_info;
+       unsigned int major, minor;
+
+       /*
+        * This is the common case when there are no blkio cgroups.
+        * Avoid lookup in this case
+        */
+       if (blkcg == &blkio_root_cgroup)
+               tg = td->root_tg;
+       else
+               tg = tg_of_blkg(blkiocg_lookup_group(blkcg, key));
+
+       /* Fill in device details for root group */
+       if (tg && !tg->blkg.dev && bdi->dev && dev_name(bdi->dev)) {
+               sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
+               tg->blkg.dev = MKDEV(major, minor);
+       }
+
+       return tg;
+}
+
+/*
+ * This function returns with queue lock unlocked in case of error, like
+ * request queue is no more
+ */
+static struct throtl_grp * throtl_get_tg(struct throtl_data *td)
+{
+       struct throtl_grp *tg = NULL, *__tg = NULL;
+       struct blkio_cgroup *blkcg;
+       struct request_queue *q = td->queue;
 
        rcu_read_lock();
-       cgroup = task_cgroup(current, blkio_subsys_id);
-       tg = throtl_find_alloc_tg(td, cgroup);
-       if (!tg)
-               tg = &td->root_tg;
+       blkcg = task_blkio_cgroup(current);
+       tg = throtl_find_tg(td, blkcg);
+       if (tg) {
+               rcu_read_unlock();
+               return tg;
+       }
+
+       /*
+        * Need to allocate a group. Allocation of group also needs allocation
+        * of per cpu stats which in-turn takes a mutex() and can block. Hence
+        * we need to drop rcu lock and queue_lock before we call alloc
+        *
+        * Take the request queue reference to make sure queue does not
+        * go away once we return from allocation.
+        */
+       blk_get_queue(q);
+       rcu_read_unlock();
+       spin_unlock_irq(q->queue_lock);
+
+       tg = throtl_alloc_tg(td);
+       /*
+        * We might have slept in group allocation. Make sure queue is not
+        * dead
+        */
+       if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))) {
+               blk_put_queue(q);
+               if (tg)
+                       kfree(tg);
+
+               return ERR_PTR(-ENODEV);
+       }
+       blk_put_queue(q);
+
+       /* Group allocated and queue is still alive. take the lock */
+       spin_lock_irq(q->queue_lock);
+
+       /*
+        * Initialize the new group. After sleeping, read the blkcg again.
+        */
+       rcu_read_lock();
+       blkcg = task_blkio_cgroup(current);
+
+       /*
+        * If some other thread already allocated the group while we were
+        * not holding queue lock, free up the group
+        */
+       __tg = throtl_find_tg(td, blkcg);
+
+       if (__tg) {
+               kfree(tg);
+               rcu_read_unlock();
+               return __tg;
+       }
+
+       /* Group allocation failed. Account the IO to root group */
+       if (!tg) {
+               tg = td->root_tg;
+               return tg;
+       }
+
+       throtl_init_add_tg_lists(td, tg, blkcg);
        rcu_read_unlock();
        return tg;
 }
@@ -337,10 +432,9 @@ static void throtl_schedule_next_dispatch(struct throtl_data *td)
        update_min_dispatch_time(st);
 
        if (time_before_eq(st->min_disptime, jiffies))
-               throtl_schedule_delayed_work(td->queue, 0);
+               throtl_schedule_delayed_work(td, 0);
        else
-               throtl_schedule_delayed_work(td->queue,
-                               (st->min_disptime - jiffies));
+               throtl_schedule_delayed_work(td, (st->min_disptime - jiffies));
 }
 
 static inline void
@@ -355,6 +449,12 @@ throtl_start_new_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
                        tg->slice_end[rw], jiffies);
 }
 
+static inline void throtl_set_slice_end(struct throtl_data *td,
+               struct throtl_grp *tg, bool rw, unsigned long jiffy_end)
+{
+       tg->slice_end[rw] = roundup(jiffy_end, throtl_slice);
+}
+
 static inline void throtl_extend_slice(struct throtl_data *td,
                struct throtl_grp *tg, bool rw, unsigned long jiffy_end)
 {
@@ -391,6 +491,16 @@ throtl_trim_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
        if (throtl_slice_used(td, tg, rw))
                return;
 
+       /*
+        * A bio has been dispatched. Also adjust slice_end. It might happen
+        * that initially cgroup limit was very low resulting in high
+        * slice_end, but later limit was bumped up and bio was dispached
+        * sooner, then we need to reduce slice_end. A high bogus slice_end
+        * is bad because it does not allow new slice to start.
+        */
+
+       throtl_set_slice_end(td, tg, rw, jiffies + throtl_slice);
+
        time_elapsed = jiffies - tg->slice_start[rw];
 
        nr_slices = time_elapsed / throtl_slice;
@@ -430,6 +540,7 @@ static bool tg_with_in_iops_limit(struct throtl_data *td, struct throtl_grp *tg,
        bool rw = bio_data_dir(bio);
        unsigned int io_allowed;
        unsigned long jiffy_elapsed, jiffy_wait, jiffy_elapsed_rnd;
+       u64 tmp;
 
        jiffy_elapsed = jiffy_elapsed_rnd = jiffies - tg->slice_start[rw];
 
@@ -439,8 +550,20 @@ static bool tg_with_in_iops_limit(struct throtl_data *td, struct throtl_grp *tg,
 
        jiffy_elapsed_rnd = roundup(jiffy_elapsed_rnd, throtl_slice);
 
-       io_allowed = (tg->iops[rw] * jiffies_to_msecs(jiffy_elapsed_rnd))
-                               / MSEC_PER_SEC;
+       /*
+        * jiffy_elapsed_rnd should not be a big value as minimum iops can be
+        * 1 then at max jiffy elapsed should be equivalent of 1 second as we
+        * will allow dispatch after 1 second and after that slice should
+        * have been trimmed.
+        */
+
+       tmp = (u64)tg->iops[rw] * jiffy_elapsed_rnd;
+       do_div(tmp, HZ);
+
+       if (tmp > UINT_MAX)
+               io_allowed = UINT_MAX;
+       else
+               io_allowed = tmp;
 
        if (tg->io_disp[rw] + 1 <= io_allowed) {
                if (wait)
@@ -476,8 +599,8 @@ static bool tg_with_in_bps_limit(struct throtl_data *td, struct throtl_grp *tg,
 
        jiffy_elapsed_rnd = roundup(jiffy_elapsed_rnd, throtl_slice);
 
-       tmp = tg->bps[rw] * jiffies_to_msecs(jiffy_elapsed_rnd);
-       do_div(tmp, MSEC_PER_SEC);
+       tmp = tg->bps[rw] * jiffy_elapsed_rnd;
+       do_div(tmp, HZ);
        bytes_allowed = tmp;
 
        if (tg->bytes_disp[rw] + bio->bi_size <= bytes_allowed) {
@@ -632,7 +755,7 @@ static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg,
 {
        unsigned int nr_reads = 0, nr_writes = 0;
        unsigned int max_nr_reads = throtl_grp_quantum*3/4;
-       unsigned int max_nr_writes = throtl_grp_quantum - nr_reads;
+       unsigned int max_nr_writes = throtl_grp_quantum - max_nr_reads;
        struct bio *bio;
 
        /* Try to dispatch 75% READS and 25% WRITES */
@@ -696,39 +819,36 @@ static void throtl_process_limit_change(struct throtl_data *td)
        struct throtl_grp *tg;
        struct hlist_node *pos, *n;
 
-       /*
-        * Make sure atomic_inc() effects from
-        * throtl_update_blkio_group_read_bps(), group of functions are
-        * visible.
-        * Is this required or smp_mb__after_atomic_inc() was suffcient
-        * after the atomic_inc().
-        */
-       smp_rmb();
-       if (!atomic_read(&td->limits_changed))
+       if (!td->limits_changed)
                return;
 
-       throtl_log(td, "limit changed =%d", atomic_read(&td->limits_changed));
+       xchg(&td->limits_changed, false);
+
+       throtl_log(td, "limits changed");
 
        hlist_for_each_entry_safe(tg, pos, n, &td->tg_list, tg_node) {
+               if (!tg->limits_changed)
+                       continue;
+
+               if (!xchg(&tg->limits_changed, false))
+                       continue;
+
+               throtl_log_tg(td, tg, "limit change rbps=%llu wbps=%llu"
+                       " riops=%u wiops=%u", tg->bps[READ], tg->bps[WRITE],
+                       tg->iops[READ], tg->iops[WRITE]);
+
                /*
-                * Do I need an smp_rmb() here to make sure tg->limits_changed
-                * update is visible. I am relying on smp_rmb() at the
-                * beginning of function and not putting a new one here.
+                * Restart the slices for both READ and WRITES. It
+                * might happen that a group's limit are dropped
+                * suddenly and we don't want to account recently
+                * dispatched IO with new low rate
                 */
+               throtl_start_new_slice(td, tg, 0);
+               throtl_start_new_slice(td, tg, 1);
 
-               if (throtl_tg_on_rr(tg) && tg->limits_changed) {
-                       throtl_log_tg(td, tg, "limit change rbps=%llu wbps=%llu"
-                               " riops=%u wiops=%u", tg->bps[READ],
-                               tg->bps[WRITE], tg->iops[READ],
-                               tg->iops[WRITE]);
+               if (throtl_tg_on_rr(tg))
                        tg_update_disptime(td, tg);
-                       tg->limits_changed = false;
-               }
        }
-
-       smp_mb__before_atomic_dec();
-       atomic_dec(&td->limits_changed);
-       smp_mb__after_atomic_dec();
 }
 
 /* Dispatch throttled bios. Should be called without queue lock held. */
@@ -738,6 +858,7 @@ static int throtl_dispatch(struct request_queue *q)
        unsigned int nr_disp = 0;
        struct bio_list bio_list_on_stack;
        struct bio *bio;
+       struct blk_plug plug;
 
        spin_lock_irq(q->queue_lock);
 
@@ -766,9 +887,10 @@ out:
         * immediate dispatch
         */
        if (nr_disp) {
+               blk_start_plug(&plug);
                while((bio = bio_list_pop(&bio_list_on_stack)))
                        generic_make_request(bio);
-               blk_unplug(q);
+               blk_finish_plug(&plug);
        }
        return nr_disp;
 }
@@ -783,24 +905,24 @@ void blk_throtl_work(struct work_struct *work)
 }
 
 /* Call with queue lock held */
-void throtl_schedule_delayed_work(struct request_queue *q, unsigned long delay)
+static void
+throtl_schedule_delayed_work(struct throtl_data *td, unsigned long delay)
 {
 
-       struct throtl_data *td = q->td;
        struct delayed_work *dwork = &td->throtl_work;
 
-       if (total_nr_queued(td) > 0) {
+       /* schedule work if limits changed even if no bio is queued */
+       if (total_nr_queued(td) > 0 || td->limits_changed) {
                /*
                 * We might have a work scheduled to be executed in future.
                 * Cancel that and schedule a new one.
                 */
                __cancel_delayed_work(dwork);
-               kblockd_schedule_delayed_work(q, dwork, delay);
+               queue_delayed_work(kthrotld_workqueue, dwork, delay);
                throtl_log(td, "schedule work. delay=%lu jiffies=%lu",
                                delay, jiffies);
        }
 }
-EXPORT_SYMBOL(throtl_schedule_delayed_work);
 
 static void
 throtl_destroy_tg(struct throtl_data *td, struct throtl_grp *tg)
@@ -863,10 +985,19 @@ void throtl_unlink_blkio_group(void *key, struct blkio_group *blkg)
        spin_unlock_irqrestore(td->queue->queue_lock, flags);
 }
 
+static void throtl_update_blkio_group_common(struct throtl_data *td,
+                               struct throtl_grp *tg)
+{
+       xchg(&tg->limits_changed, true);
+       xchg(&td->limits_changed, true);
+       /* Schedule a work now to process the limit change */
+       throtl_schedule_delayed_work(td, 0);
+}
+
 /*
  * For all update functions, key should be a valid pointer because these
  * update functions are called under blkcg_lock, that means, blkg is
- * valid and in turn key is valid. queue exit path can not race becuase
+ * valid and in turn key is valid. queue exit path can not race because
  * of blkcg_lock
  *
  * Can not take queue lock in update functions as queue lock under blkcg_lock
@@ -876,64 +1007,43 @@ static void throtl_update_blkio_group_read_bps(void *key,
                                struct blkio_group *blkg, u64 read_bps)
 {
        struct throtl_data *td = key;
+       struct throtl_grp *tg = tg_of_blkg(blkg);
 
-       tg_of_blkg(blkg)->bps[READ] = read_bps;
-       /* Make sure read_bps is updated before setting limits_changed */
-       smp_wmb();
-       tg_of_blkg(blkg)->limits_changed = true;
-
-       /* Make sure tg->limits_changed is updated before td->limits_changed */
-       smp_mb__before_atomic_inc();
-       atomic_inc(&td->limits_changed);
-       smp_mb__after_atomic_inc();
-
-       /* Schedule a work now to process the limit change */
-       throtl_schedule_delayed_work(td->queue, 0);
+       tg->bps[READ] = read_bps;
+       throtl_update_blkio_group_common(td, tg);
 }
 
 static void throtl_update_blkio_group_write_bps(void *key,
                                struct blkio_group *blkg, u64 write_bps)
 {
        struct throtl_data *td = key;
+       struct throtl_grp *tg = tg_of_blkg(blkg);
 
-       tg_of_blkg(blkg)->bps[WRITE] = write_bps;
-       smp_wmb();
-       tg_of_blkg(blkg)->limits_changed = true;
-       smp_mb__before_atomic_inc();
-       atomic_inc(&td->limits_changed);
-       smp_mb__after_atomic_inc();
-       throtl_schedule_delayed_work(td->queue, 0);
+       tg->bps[WRITE] = write_bps;
+       throtl_update_blkio_group_common(td, tg);
 }
 
 static void throtl_update_blkio_group_read_iops(void *key,
                        struct blkio_group *blkg, unsigned int read_iops)
 {
        struct throtl_data *td = key;
+       struct throtl_grp *tg = tg_of_blkg(blkg);
 
-       tg_of_blkg(blkg)->iops[READ] = read_iops;
-       smp_wmb();
-       tg_of_blkg(blkg)->limits_changed = true;
-       smp_mb__before_atomic_inc();
-       atomic_inc(&td->limits_changed);
-       smp_mb__after_atomic_inc();
-       throtl_schedule_delayed_work(td->queue, 0);
+       tg->iops[READ] = read_iops;
+       throtl_update_blkio_group_common(td, tg);
 }
 
 static void throtl_update_blkio_group_write_iops(void *key,
                        struct blkio_group *blkg, unsigned int write_iops)
 {
        struct throtl_data *td = key;
+       struct throtl_grp *tg = tg_of_blkg(blkg);
 
-       tg_of_blkg(blkg)->iops[WRITE] = write_iops;
-       smp_wmb();
-       tg_of_blkg(blkg)->limits_changed = true;
-       smp_mb__before_atomic_inc();
-       atomic_inc(&td->limits_changed);
-       smp_mb__after_atomic_inc();
-       throtl_schedule_delayed_work(td->queue, 0);
+       tg->iops[WRITE] = write_iops;
+       throtl_update_blkio_group_common(td, tg);
 }
 
-void throtl_shutdown_timer_wq(struct request_queue *q)
+static void throtl_shutdown_wq(struct request_queue *q)
 {
        struct throtl_data *td = q->td;
 
@@ -970,24 +1080,41 @@ int blk_throtl_bio(struct request_queue *q, struct bio **biop)
        spin_lock_irq(q->queue_lock);
        tg = throtl_get_tg(td);
 
+       if (IS_ERR(tg)) {
+               if (PTR_ERR(tg) == -ENODEV) {
+                       /*
+                        * Queue is gone. No queue lock held here.
+                        */
+                       return -ENODEV;
+               }
+       }
+
        if (tg->nr_queued[rw]) {
                /*
                 * There is already another bio queued in same dir. No
                 * need to update dispatch time.
-                * Still update the disptime if rate limits on this group
-                * were changed.
                 */
-               if (!tg->limits_changed)
-                       update_disptime = false;
-               else
-                       tg->limits_changed = false;
-
+               update_disptime = false;
                goto queue_bio;
+
        }
 
        /* Bio is with-in rate limit of group */
        if (tg_may_dispatch(td, tg, bio, NULL)) {
                throtl_charge_bio(tg, bio);
+
+               /*
+                * We need to trim slice even when bios are not being queued
+                * otherwise it might happen that a bio is not queued for
+                * a long time and slice keeps on extending and trim is not
+                * called for a long time. Now if limits are reduced suddenly
+                * we take into account all the IO dispatched so far at new
+                * low rate and * newly queued IO gets a really long dispatch
+                * time.
+                *
+                * So keep on trimming slice even if bio is not queued.
+                */
+               throtl_trim_slice(td, tg, rw);
                goto out;
        }
 
@@ -1023,39 +1150,27 @@ int blk_throtl_init(struct request_queue *q)
 
        INIT_HLIST_HEAD(&td->tg_list);
        td->tg_service_tree = THROTL_RB_ROOT;
-       atomic_set(&td->limits_changed, 0);
-
-       /* Init root group */
-       tg = &td->root_tg;
-       INIT_HLIST_NODE(&tg->tg_node);
-       RB_CLEAR_NODE(&tg->rb_node);
-       bio_list_init(&tg->bio_lists[0]);
-       bio_list_init(&tg->bio_lists[1]);
+       td->limits_changed = false;
+       INIT_DELAYED_WORK(&td->throtl_work, blk_throtl_work);
 
-       /* Practically unlimited BW */
-       tg->bps[0] = tg->bps[1] = -1;
-       tg->iops[0] = tg->iops[1] = -1;
+       /* alloc and Init root group. */
+       td->queue = q;
+       tg = throtl_alloc_tg(td);
 
-       /*
-        * Set root group reference to 2. One reference will be dropped when
-        * all groups on tg_list are being deleted during queue exit. Other
-        * reference will remain there as we don't want to delete this group
-        * as it is statically allocated and gets destroyed when throtl_data
-        * goes away.
-        */
-       atomic_set(&tg->ref, 2);
-       hlist_add_head(&tg->tg_node, &td->tg_list);
-       td->nr_undestroyed_grps++;
+       if (!tg) {
+               kfree(td);
+               return -ENOMEM;
+       }
 
-       INIT_DELAYED_WORK(&td->throtl_work, blk_throtl_work);
+       td->root_tg = tg;
 
        rcu_read_lock();
        blkiocg_add_blkio_group(&blkio_root_cgroup, &tg->blkg, (void *)td,
                                        0, BLKIO_POLICY_THROTL);
        rcu_read_unlock();
+       throtl_add_group_to_td_list(td, tg);
 
        /* Attach throtl data to request queue */
-       td->queue = q;
        q->td = td;
        return 0;
 }
@@ -1067,7 +1182,7 @@ void blk_throtl_exit(struct request_queue *q)
 
        BUG_ON(!td);
 
-       throtl_shutdown_timer_wq(q);
+       throtl_shutdown_wq(q);
 
        spin_lock_irq(q->queue_lock);
        throtl_release_tgs(td);
@@ -1097,12 +1212,16 @@ void blk_throtl_exit(struct request_queue *q)
         * update limits through cgroup and another work got queued, cancel
         * it.
         */
-       throtl_shutdown_timer_wq(q);
+       throtl_shutdown_wq(q);
        throtl_td_free(td);
 }
 
 static int __init throtl_init(void)
 {
+       kthrotld_workqueue = alloc_workqueue("kthrotld", WQ_MEM_RECLAIM, 0);
+       if (!kthrotld_workqueue)
+               panic("Failed to create kthrotld\n");
+
        blkio_policy_register(&blkio_policy_throtl);
        return 0;
 }