]> nv-tegra.nvidia Code Review - linux-2.6.git/blob - kernel/slow-work.c
padata: Remove padata_get_cpumask
[linux-2.6.git] / kernel / slow-work.c
1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
2  *
3  * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public Licence
8  * as published by the Free Software Foundation; either version
9  * 2 of the Licence, or (at your option) any later version.
10  *
11  * See Documentation/slow-work.txt
12  */
13
14 #include <linux/module.h>
15 #include <linux/slow-work.h>
16 #include <linux/kthread.h>
17 #include <linux/freezer.h>
18 #include <linux/wait.h>
19 #include <linux/debugfs.h>
20 #include "slow-work.h"
21
22 static void slow_work_cull_timeout(unsigned long);
23 static void slow_work_oom_timeout(unsigned long);
24
25 #ifdef CONFIG_SYSCTL
26 static int slow_work_min_threads_sysctl(struct ctl_table *, int,
27                                         void __user *, size_t *, loff_t *);
28
29 static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
30                                         void __user *, size_t *, loff_t *);
31 #endif
32
33 /*
34  * The pool of threads has at least min threads in it as long as someone is
35  * using the facility, and may have as many as max.
36  *
37  * A portion of the pool may be processing very slow operations.
38  */
39 static unsigned slow_work_min_threads = 2;
40 static unsigned slow_work_max_threads = 4;
41 static unsigned vslow_work_proportion = 50; /* % of threads that may process
42                                              * very slow work */
43
44 #ifdef CONFIG_SYSCTL
45 static const int slow_work_min_min_threads = 2;
46 static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
47 static const int slow_work_min_vslow = 1;
48 static const int slow_work_max_vslow = 99;
49
50 ctl_table slow_work_sysctls[] = {
51         {
52                 .procname       = "min-threads",
53                 .data           = &slow_work_min_threads,
54                 .maxlen         = sizeof(unsigned),
55                 .mode           = 0644,
56                 .proc_handler   = slow_work_min_threads_sysctl,
57                 .extra1         = (void *) &slow_work_min_min_threads,
58                 .extra2         = &slow_work_max_threads,
59         },
60         {
61                 .procname       = "max-threads",
62                 .data           = &slow_work_max_threads,
63                 .maxlen         = sizeof(unsigned),
64                 .mode           = 0644,
65                 .proc_handler   = slow_work_max_threads_sysctl,
66                 .extra1         = &slow_work_min_threads,
67                 .extra2         = (void *) &slow_work_max_max_threads,
68         },
69         {
70                 .procname       = "vslow-percentage",
71                 .data           = &vslow_work_proportion,
72                 .maxlen         = sizeof(unsigned),
73                 .mode           = 0644,
74                 .proc_handler   = proc_dointvec_minmax,
75                 .extra1         = (void *) &slow_work_min_vslow,
76                 .extra2         = (void *) &slow_work_max_vslow,
77         },
78         {}
79 };
80 #endif
81
82 /*
83  * The active state of the thread pool
84  */
85 static atomic_t slow_work_thread_count;
86 static atomic_t vslow_work_executing_count;
87
88 static bool slow_work_may_not_start_new_thread;
89 static bool slow_work_cull; /* cull a thread due to lack of activity */
90 static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
91 static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
92 static struct slow_work slow_work_new_thread; /* new thread starter */
93
94 /*
95  * slow work ID allocation (use slow_work_queue_lock)
96  */
97 static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
98
99 /*
100  * Unregistration tracking to prevent put_ref() from disappearing during module
101  * unload
102  */
103 #ifdef CONFIG_MODULES
104 static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
105 static struct module *slow_work_unreg_module;
106 static struct slow_work *slow_work_unreg_work_item;
107 static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
108 static DEFINE_MUTEX(slow_work_unreg_sync_lock);
109
110 static void slow_work_set_thread_processing(int id, struct slow_work *work)
111 {
112         if (work)
113                 slow_work_thread_processing[id] = work->owner;
114 }
115 static void slow_work_done_thread_processing(int id, struct slow_work *work)
116 {
117         struct module *module = slow_work_thread_processing[id];
118
119         slow_work_thread_processing[id] = NULL;
120         smp_mb();
121         if (slow_work_unreg_work_item == work ||
122             slow_work_unreg_module == module)
123                 wake_up_all(&slow_work_unreg_wq);
124 }
125 static void slow_work_clear_thread_processing(int id)
126 {
127         slow_work_thread_processing[id] = NULL;
128 }
129 #else
130 static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
131 static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
132 static void slow_work_clear_thread_processing(int id) {}
133 #endif
134
135 /*
136  * Data for tracking currently executing items for indication through /proc
137  */
138 #ifdef CONFIG_SLOW_WORK_DEBUG
139 struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
140 pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
141 DEFINE_RWLOCK(slow_work_execs_lock);
142 #endif
143
144 /*
145  * The queues of work items and the lock governing access to them.  These are
146  * shared between all the CPUs.  It doesn't make sense to have per-CPU queues
147  * as the number of threads bears no relation to the number of CPUs.
148  *
149  * There are two queues of work items: one for slow work items, and one for
150  * very slow work items.
151  */
152 LIST_HEAD(slow_work_queue);
153 LIST_HEAD(vslow_work_queue);
154 DEFINE_SPINLOCK(slow_work_queue_lock);
155
156 /*
157  * The following are two wait queues that get pinged when a work item is placed
158  * on an empty queue.  These allow work items that are hogging a thread by
159  * sleeping in a way that could be deferred to yield their thread and enqueue
160  * themselves.
161  */
162 static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
163 static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
164
165 /*
166  * The thread controls.  A variable used to signal to the threads that they
167  * should exit when the queue is empty, a waitqueue used by the threads to wait
168  * for signals, and a completion set by the last thread to exit.
169  */
170 static bool slow_work_threads_should_exit;
171 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
172 static DECLARE_COMPLETION(slow_work_last_thread_exited);
173
174 /*
175  * The number of users of the thread pool and its lock.  Whilst this is zero we
176  * have no threads hanging around, and when this reaches zero, we wait for all
177  * active or queued work items to complete and kill all the threads we do have.
178  */
179 static int slow_work_user_count;
180 static DEFINE_MUTEX(slow_work_user_lock);
181
182 static inline int slow_work_get_ref(struct slow_work *work)
183 {
184         if (work->ops->get_ref)
185                 return work->ops->get_ref(work);
186
187         return 0;
188 }
189
190 static inline void slow_work_put_ref(struct slow_work *work)
191 {
192         if (work->ops->put_ref)
193                 work->ops->put_ref(work);
194 }
195
196 /*
197  * Calculate the maximum number of active threads in the pool that are
198  * permitted to process very slow work items.
199  *
200  * The answer is rounded up to at least 1, but may not equal or exceed the
201  * maximum number of the threads in the pool.  This means we always have at
202  * least one thread that can process slow work items, and we always have at
203  * least one thread that won't get tied up doing so.
204  */
205 static unsigned slow_work_calc_vsmax(void)
206 {
207         unsigned vsmax;
208
209         vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
210         vsmax /= 100;
211         vsmax = max(vsmax, 1U);
212         return min(vsmax, slow_work_max_threads - 1);
213 }
214
215 /*
216  * Attempt to execute stuff queued on a slow thread.  Return true if we managed
217  * it, false if there was nothing to do.
218  */
219 static noinline bool slow_work_execute(int id)
220 {
221         struct slow_work *work = NULL;
222         unsigned vsmax;
223         bool very_slow;
224
225         vsmax = slow_work_calc_vsmax();
226
227         /* see if we can schedule a new thread to be started if we're not
228          * keeping up with the work */
229         if (!waitqueue_active(&slow_work_thread_wq) &&
230             (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
231             atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
232             !slow_work_may_not_start_new_thread)
233                 slow_work_enqueue(&slow_work_new_thread);
234
235         /* find something to execute */
236         spin_lock_irq(&slow_work_queue_lock);
237         if (!list_empty(&vslow_work_queue) &&
238             atomic_read(&vslow_work_executing_count) < vsmax) {
239                 work = list_entry(vslow_work_queue.next,
240                                   struct slow_work, link);
241                 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
242                         BUG();
243                 list_del_init(&work->link);
244                 atomic_inc(&vslow_work_executing_count);
245                 very_slow = true;
246         } else if (!list_empty(&slow_work_queue)) {
247                 work = list_entry(slow_work_queue.next,
248                                   struct slow_work, link);
249                 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
250                         BUG();
251                 list_del_init(&work->link);
252                 very_slow = false;
253         } else {
254                 very_slow = false; /* avoid the compiler warning */
255         }
256
257         slow_work_set_thread_processing(id, work);
258         if (work) {
259                 slow_work_mark_time(work);
260                 slow_work_begin_exec(id, work);
261         }
262
263         spin_unlock_irq(&slow_work_queue_lock);
264
265         if (!work)
266                 return false;
267
268         if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
269                 BUG();
270
271         /* don't execute if the work is in the process of being cancelled */
272         if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
273                 work->ops->execute(work);
274
275         if (very_slow)
276                 atomic_dec(&vslow_work_executing_count);
277         clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
278
279         /* wake up anyone waiting for this work to be complete */
280         wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
281
282         slow_work_end_exec(id, work);
283
284         /* if someone tried to enqueue the item whilst we were executing it,
285          * then it'll be left unenqueued to avoid multiple threads trying to
286          * execute it simultaneously
287          *
288          * there is, however, a race between us testing the pending flag and
289          * getting the spinlock, and between the enqueuer setting the pending
290          * flag and getting the spinlock, so we use a deferral bit to tell us
291          * if the enqueuer got there first
292          */
293         if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
294                 spin_lock_irq(&slow_work_queue_lock);
295
296                 if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
297                     test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
298                         goto auto_requeue;
299
300                 spin_unlock_irq(&slow_work_queue_lock);
301         }
302
303         /* sort out the race between module unloading and put_ref() */
304         slow_work_put_ref(work);
305         slow_work_done_thread_processing(id, work);
306
307         return true;
308
309 auto_requeue:
310         /* we must complete the enqueue operation
311          * - we transfer our ref on the item back to the appropriate queue
312          * - don't wake another thread up as we're awake already
313          */
314         slow_work_mark_time(work);
315         if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
316                 list_add_tail(&work->link, &vslow_work_queue);
317         else
318                 list_add_tail(&work->link, &slow_work_queue);
319         spin_unlock_irq(&slow_work_queue_lock);
320         slow_work_clear_thread_processing(id);
321         return true;
322 }
323
324 /**
325  * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
326  * work: The work item under execution that wants to sleep
327  * _timeout: Scheduler sleep timeout
328  *
329  * Allow a requeueable work item to sleep on a slow-work processor thread until
330  * that thread is needed to do some other work or the sleep is interrupted by
331  * some other event.
332  *
333  * The caller must set up a wake up event before calling this and must have set
334  * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
335  * condition before calling this function as no test is made here.
336  *
337  * False is returned if there is nothing on the queue; true is returned if the
338  * work item should be requeued
339  */
340 bool slow_work_sleep_till_thread_needed(struct slow_work *work,
341                                         signed long *_timeout)
342 {
343         wait_queue_head_t *wfo_wq;
344         struct list_head *queue;
345
346         DEFINE_WAIT(wait);
347
348         if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
349                 wfo_wq = &vslow_work_queue_waits_for_occupation;
350                 queue = &vslow_work_queue;
351         } else {
352                 wfo_wq = &slow_work_queue_waits_for_occupation;
353                 queue = &slow_work_queue;
354         }
355
356         if (!list_empty(queue))
357                 return true;
358
359         add_wait_queue_exclusive(wfo_wq, &wait);
360         if (list_empty(queue))
361                 *_timeout = schedule_timeout(*_timeout);
362         finish_wait(wfo_wq, &wait);
363
364         return !list_empty(queue);
365 }
366 EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
367
368 /**
369  * slow_work_enqueue - Schedule a slow work item for processing
370  * @work: The work item to queue
371  *
372  * Schedule a slow work item for processing.  If the item is already undergoing
373  * execution, this guarantees not to re-enter the execution routine until the
374  * first execution finishes.
375  *
376  * The item is pinned by this function as it retains a reference to it, managed
377  * through the item operations.  The item is unpinned once it has been
378  * executed.
379  *
380  * An item may hog the thread that is running it for a relatively large amount
381  * of time, sufficient, for example, to perform several lookup, mkdir, create
382  * and setxattr operations.  It may sleep on I/O and may sleep to obtain locks.
383  *
384  * Conversely, if a number of items are awaiting processing, it may take some
385  * time before any given item is given attention.  The number of threads in the
386  * pool may be increased to deal with demand, but only up to a limit.
387  *
388  * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
389  * the very slow queue, from which only a portion of the threads will be
390  * allowed to pick items to execute.  This ensures that very slow items won't
391  * overly block ones that are just ordinarily slow.
392  *
393  * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
394  * attempted queued)
395  */
396 int slow_work_enqueue(struct slow_work *work)
397 {
398         wait_queue_head_t *wfo_wq;
399         struct list_head *queue;
400         unsigned long flags;
401         int ret;
402
403         if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
404                 return -ECANCELED;
405
406         BUG_ON(slow_work_user_count <= 0);
407         BUG_ON(!work);
408         BUG_ON(!work->ops);
409
410         /* when honouring an enqueue request, we only promise that we will run
411          * the work function in the future; we do not promise to run it once
412          * per enqueue request
413          *
414          * we use the PENDING bit to merge together repeat requests without
415          * having to disable IRQs and take the spinlock, whilst still
416          * maintaining our promise
417          */
418         if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
419                 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
420                         wfo_wq = &vslow_work_queue_waits_for_occupation;
421                         queue = &vslow_work_queue;
422                 } else {
423                         wfo_wq = &slow_work_queue_waits_for_occupation;
424                         queue = &slow_work_queue;
425                 }
426
427                 spin_lock_irqsave(&slow_work_queue_lock, flags);
428
429                 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
430                         goto cancelled;
431
432                 /* we promise that we will not attempt to execute the work
433                  * function in more than one thread simultaneously
434                  *
435                  * this, however, leaves us with a problem if we're asked to
436                  * enqueue the work whilst someone is executing the work
437                  * function as simply queueing the work immediately means that
438                  * another thread may try executing it whilst it is already
439                  * under execution
440                  *
441                  * to deal with this, we set the ENQ_DEFERRED bit instead of
442                  * enqueueing, and the thread currently executing the work
443                  * function will enqueue the work item when the work function
444                  * returns and it has cleared the EXECUTING bit
445                  */
446                 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
447                         set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
448                 } else {
449                         ret = slow_work_get_ref(work);
450                         if (ret < 0)
451                                 goto failed;
452                         slow_work_mark_time(work);
453                         list_add_tail(&work->link, queue);
454                         wake_up(&slow_work_thread_wq);
455
456                         /* if someone who could be requeued is sleeping on a
457                          * thread, then ask them to yield their thread */
458                         if (work->link.prev == queue)
459                                 wake_up(wfo_wq);
460                 }
461
462                 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
463         }
464         return 0;
465
466 cancelled:
467         ret = -ECANCELED;
468 failed:
469         spin_unlock_irqrestore(&slow_work_queue_lock, flags);
470         return ret;
471 }
472 EXPORT_SYMBOL(slow_work_enqueue);
473
474 static int slow_work_wait(void *word)
475 {
476         schedule();
477         return 0;
478 }
479
480 /**
481  * slow_work_cancel - Cancel a slow work item
482  * @work: The work item to cancel
483  *
484  * This function will cancel a previously enqueued work item. If we cannot
485  * cancel the work item, it is guarenteed to have run when this function
486  * returns.
487  */
488 void slow_work_cancel(struct slow_work *work)
489 {
490         bool wait = true, put = false;
491
492         set_bit(SLOW_WORK_CANCELLING, &work->flags);
493         smp_mb();
494
495         /* if the work item is a delayed work item with an active timer, we
496          * need to wait for the timer to finish _before_ getting the spinlock,
497          * lest we deadlock against the timer routine
498          *
499          * the timer routine will leave DELAYED set if it notices the
500          * CANCELLING flag in time
501          */
502         if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
503                 struct delayed_slow_work *dwork =
504                         container_of(work, struct delayed_slow_work, work);
505                 del_timer_sync(&dwork->timer);
506         }
507
508         spin_lock_irq(&slow_work_queue_lock);
509
510         if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
511                 /* the timer routine aborted or never happened, so we are left
512                  * holding the timer's reference on the item and should just
513                  * drop the pending flag and wait for any ongoing execution to
514                  * finish */
515                 struct delayed_slow_work *dwork =
516                         container_of(work, struct delayed_slow_work, work);
517
518                 BUG_ON(timer_pending(&dwork->timer));
519                 BUG_ON(!list_empty(&work->link));
520
521                 clear_bit(SLOW_WORK_DELAYED, &work->flags);
522                 put = true;
523                 clear_bit(SLOW_WORK_PENDING, &work->flags);
524
525         } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
526                    !list_empty(&work->link)) {
527                 /* the link in the pending queue holds a reference on the item
528                  * that we will need to release */
529                 list_del_init(&work->link);
530                 wait = false;
531                 put = true;
532                 clear_bit(SLOW_WORK_PENDING, &work->flags);
533
534         } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
535                 /* the executor is holding our only reference on the item, so
536                  * we merely need to wait for it to finish executing */
537                 clear_bit(SLOW_WORK_PENDING, &work->flags);
538         }
539
540         spin_unlock_irq(&slow_work_queue_lock);
541
542         /* the EXECUTING flag is set by the executor whilst the spinlock is set
543          * and before the item is dequeued - so assuming the above doesn't
544          * actually dequeue it, simply waiting for the EXECUTING flag to be
545          * released here should be sufficient */
546         if (wait)
547                 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
548                             TASK_UNINTERRUPTIBLE);
549
550         clear_bit(SLOW_WORK_CANCELLING, &work->flags);
551         if (put)
552                 slow_work_put_ref(work);
553 }
554 EXPORT_SYMBOL(slow_work_cancel);
555
556 /*
557  * Handle expiry of the delay timer, indicating that a delayed slow work item
558  * should now be queued if not cancelled
559  */
560 static void delayed_slow_work_timer(unsigned long data)
561 {
562         wait_queue_head_t *wfo_wq;
563         struct list_head *queue;
564         struct slow_work *work = (struct slow_work *) data;
565         unsigned long flags;
566         bool queued = false, put = false, first = false;
567
568         if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
569                 wfo_wq = &vslow_work_queue_waits_for_occupation;
570                 queue = &vslow_work_queue;
571         } else {
572                 wfo_wq = &slow_work_queue_waits_for_occupation;
573                 queue = &slow_work_queue;
574         }
575
576         spin_lock_irqsave(&slow_work_queue_lock, flags);
577         if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
578                 clear_bit(SLOW_WORK_DELAYED, &work->flags);
579
580                 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
581                         /* we discard the reference the timer was holding in
582                          * favour of the one the executor holds */
583                         set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
584                         put = true;
585                 } else {
586                         slow_work_mark_time(work);
587                         list_add_tail(&work->link, queue);
588                         queued = true;
589                         if (work->link.prev == queue)
590                                 first = true;
591                 }
592         }
593
594         spin_unlock_irqrestore(&slow_work_queue_lock, flags);
595         if (put)
596                 slow_work_put_ref(work);
597         if (first)
598                 wake_up(wfo_wq);
599         if (queued)
600                 wake_up(&slow_work_thread_wq);
601 }
602
603 /**
604  * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
605  * @dwork: The delayed work item to queue
606  * @delay: When to start executing the work, in jiffies from now
607  *
608  * This is similar to slow_work_enqueue(), but it adds a delay before the work
609  * is actually queued for processing.
610  *
611  * The item can have delayed processing requested on it whilst it is being
612  * executed.  The delay will begin immediately, and if it expires before the
613  * item finishes executing, the item will be placed back on the queue when it
614  * has done executing.
615  */
616 int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
617                               unsigned long delay)
618 {
619         struct slow_work *work = &dwork->work;
620         unsigned long flags;
621         int ret;
622
623         if (delay == 0)
624                 return slow_work_enqueue(&dwork->work);
625
626         BUG_ON(slow_work_user_count <= 0);
627         BUG_ON(!work);
628         BUG_ON(!work->ops);
629
630         if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
631                 return -ECANCELED;
632
633         if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
634                 spin_lock_irqsave(&slow_work_queue_lock, flags);
635
636                 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
637                         goto cancelled;
638
639                 /* the timer holds a reference whilst it is pending */
640                 ret = slow_work_get_ref(work);
641                 if (ret < 0)
642                         goto cant_get_ref;
643
644                 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
645                         BUG();
646                 dwork->timer.expires = jiffies + delay;
647                 dwork->timer.data = (unsigned long) work;
648                 dwork->timer.function = delayed_slow_work_timer;
649                 add_timer(&dwork->timer);
650
651                 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
652         }
653
654         return 0;
655
656 cancelled:
657         ret = -ECANCELED;
658 cant_get_ref:
659         spin_unlock_irqrestore(&slow_work_queue_lock, flags);
660         return ret;
661 }
662 EXPORT_SYMBOL(delayed_slow_work_enqueue);
663
664 /*
665  * Schedule a cull of the thread pool at some time in the near future
666  */
667 static void slow_work_schedule_cull(void)
668 {
669         mod_timer(&slow_work_cull_timer,
670                   round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
671 }
672
673 /*
674  * Worker thread culling algorithm
675  */
676 static bool slow_work_cull_thread(void)
677 {
678         unsigned long flags;
679         bool do_cull = false;
680
681         spin_lock_irqsave(&slow_work_queue_lock, flags);
682
683         if (slow_work_cull) {
684                 slow_work_cull = false;
685
686                 if (list_empty(&slow_work_queue) &&
687                     list_empty(&vslow_work_queue) &&
688                     atomic_read(&slow_work_thread_count) >
689                     slow_work_min_threads) {
690                         slow_work_schedule_cull();
691                         do_cull = true;
692                 }
693         }
694
695         spin_unlock_irqrestore(&slow_work_queue_lock, flags);
696         return do_cull;
697 }
698
699 /*
700  * Determine if there is slow work available for dispatch
701  */
702 static inline bool slow_work_available(int vsmax)
703 {
704         return !list_empty(&slow_work_queue) ||
705                 (!list_empty(&vslow_work_queue) &&
706                  atomic_read(&vslow_work_executing_count) < vsmax);
707 }
708
709 /*
710  * Worker thread dispatcher
711  */
712 static int slow_work_thread(void *_data)
713 {
714         int vsmax, id;
715
716         DEFINE_WAIT(wait);
717
718         set_freezable();
719         set_user_nice(current, -5);
720
721         /* allocate ourselves an ID */
722         spin_lock_irq(&slow_work_queue_lock);
723         id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
724         BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
725         __set_bit(id, slow_work_ids);
726         slow_work_set_thread_pid(id, current->pid);
727         spin_unlock_irq(&slow_work_queue_lock);
728
729         sprintf(current->comm, "kslowd%03u", id);
730
731         for (;;) {
732                 vsmax = vslow_work_proportion;
733                 vsmax *= atomic_read(&slow_work_thread_count);
734                 vsmax /= 100;
735
736                 prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
737                                           TASK_INTERRUPTIBLE);
738                 if (!freezing(current) &&
739                     !slow_work_threads_should_exit &&
740                     !slow_work_available(vsmax) &&
741                     !slow_work_cull)
742                         schedule();
743                 finish_wait(&slow_work_thread_wq, &wait);
744
745                 try_to_freeze();
746
747                 vsmax = vslow_work_proportion;
748                 vsmax *= atomic_read(&slow_work_thread_count);
749                 vsmax /= 100;
750
751                 if (slow_work_available(vsmax) && slow_work_execute(id)) {
752                         cond_resched();
753                         if (list_empty(&slow_work_queue) &&
754                             list_empty(&vslow_work_queue) &&
755                             atomic_read(&slow_work_thread_count) >
756                             slow_work_min_threads)
757                                 slow_work_schedule_cull();
758                         continue;
759                 }
760
761                 if (slow_work_threads_should_exit)
762                         break;
763
764                 if (slow_work_cull && slow_work_cull_thread())
765                         break;
766         }
767
768         spin_lock_irq(&slow_work_queue_lock);
769         slow_work_set_thread_pid(id, 0);
770         __clear_bit(id, slow_work_ids);
771         spin_unlock_irq(&slow_work_queue_lock);
772
773         if (atomic_dec_and_test(&slow_work_thread_count))
774                 complete_and_exit(&slow_work_last_thread_exited, 0);
775         return 0;
776 }
777
778 /*
779  * Handle thread cull timer expiration
780  */
781 static void slow_work_cull_timeout(unsigned long data)
782 {
783         slow_work_cull = true;
784         wake_up(&slow_work_thread_wq);
785 }
786
787 /*
788  * Start a new slow work thread
789  */
790 static void slow_work_new_thread_execute(struct slow_work *work)
791 {
792         struct task_struct *p;
793
794         if (slow_work_threads_should_exit)
795                 return;
796
797         if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
798                 return;
799
800         if (!mutex_trylock(&slow_work_user_lock))
801                 return;
802
803         slow_work_may_not_start_new_thread = true;
804         atomic_inc(&slow_work_thread_count);
805         p = kthread_run(slow_work_thread, NULL, "kslowd");
806         if (IS_ERR(p)) {
807                 printk(KERN_DEBUG "Slow work thread pool: OOM\n");
808                 if (atomic_dec_and_test(&slow_work_thread_count))
809                         BUG(); /* we're running on a slow work thread... */
810                 mod_timer(&slow_work_oom_timer,
811                           round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
812         } else {
813                 /* ratelimit the starting of new threads */
814                 mod_timer(&slow_work_oom_timer, jiffies + 1);
815         }
816
817         mutex_unlock(&slow_work_user_lock);
818 }
819
820 static const struct slow_work_ops slow_work_new_thread_ops = {
821         .owner          = THIS_MODULE,
822         .execute        = slow_work_new_thread_execute,
823 #ifdef CONFIG_SLOW_WORK_DEBUG
824         .desc           = slow_work_new_thread_desc,
825 #endif
826 };
827
828 /*
829  * post-OOM new thread start suppression expiration
830  */
831 static void slow_work_oom_timeout(unsigned long data)
832 {
833         slow_work_may_not_start_new_thread = false;
834 }
835
836 #ifdef CONFIG_SYSCTL
837 /*
838  * Handle adjustment of the minimum number of threads
839  */
840 static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
841                                         void __user *buffer,
842                                         size_t *lenp, loff_t *ppos)
843 {
844         int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
845         int n;
846
847         if (ret == 0) {
848                 mutex_lock(&slow_work_user_lock);
849                 if (slow_work_user_count > 0) {
850                         /* see if we need to start or stop threads */
851                         n = atomic_read(&slow_work_thread_count) -
852                                 slow_work_min_threads;
853
854                         if (n < 0 && !slow_work_may_not_start_new_thread)
855                                 slow_work_enqueue(&slow_work_new_thread);
856                         else if (n > 0)
857                                 slow_work_schedule_cull();
858                 }
859                 mutex_unlock(&slow_work_user_lock);
860         }
861
862         return ret;
863 }
864
865 /*
866  * Handle adjustment of the maximum number of threads
867  */
868 static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
869                                         void __user *buffer,
870                                         size_t *lenp, loff_t *ppos)
871 {
872         int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
873         int n;
874
875         if (ret == 0) {
876                 mutex_lock(&slow_work_user_lock);
877                 if (slow_work_user_count > 0) {
878                         /* see if we need to stop threads */
879                         n = slow_work_max_threads -
880                                 atomic_read(&slow_work_thread_count);
881
882                         if (n < 0)
883                                 slow_work_schedule_cull();
884                 }
885                 mutex_unlock(&slow_work_user_lock);
886         }
887
888         return ret;
889 }
890 #endif /* CONFIG_SYSCTL */
891
892 /**
893  * slow_work_register_user - Register a user of the facility
894  * @module: The module about to make use of the facility
895  *
896  * Register a user of the facility, starting up the initial threads if there
897  * aren't any other users at this point.  This will return 0 if successful, or
898  * an error if not.
899  */
900 int slow_work_register_user(struct module *module)
901 {
902         struct task_struct *p;
903         int loop;
904
905         mutex_lock(&slow_work_user_lock);
906
907         if (slow_work_user_count == 0) {
908                 printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
909                 init_completion(&slow_work_last_thread_exited);
910
911                 slow_work_threads_should_exit = false;
912                 slow_work_init(&slow_work_new_thread,
913                                &slow_work_new_thread_ops);
914                 slow_work_may_not_start_new_thread = false;
915                 slow_work_cull = false;
916
917                 /* start the minimum number of threads */
918                 for (loop = 0; loop < slow_work_min_threads; loop++) {
919                         atomic_inc(&slow_work_thread_count);
920                         p = kthread_run(slow_work_thread, NULL, "kslowd");
921                         if (IS_ERR(p))
922                                 goto error;
923                 }
924                 printk(KERN_NOTICE "Slow work thread pool: Ready\n");
925         }
926
927         slow_work_user_count++;
928         mutex_unlock(&slow_work_user_lock);
929         return 0;
930
931 error:
932         if (atomic_dec_and_test(&slow_work_thread_count))
933                 complete(&slow_work_last_thread_exited);
934         if (loop > 0) {
935                 printk(KERN_ERR "Slow work thread pool:"
936                        " Aborting startup on ENOMEM\n");
937                 slow_work_threads_should_exit = true;
938                 wake_up_all(&slow_work_thread_wq);
939                 wait_for_completion(&slow_work_last_thread_exited);
940                 printk(KERN_ERR "Slow work thread pool: Aborted\n");
941         }
942         mutex_unlock(&slow_work_user_lock);
943         return PTR_ERR(p);
944 }
945 EXPORT_SYMBOL(slow_work_register_user);
946
947 /*
948  * wait for all outstanding items from the calling module to complete
949  * - note that more items may be queued whilst we're waiting
950  */
951 static void slow_work_wait_for_items(struct module *module)
952 {
953 #ifdef CONFIG_MODULES
954         DECLARE_WAITQUEUE(myself, current);
955         struct slow_work *work;
956         int loop;
957
958         mutex_lock(&slow_work_unreg_sync_lock);
959         add_wait_queue(&slow_work_unreg_wq, &myself);
960
961         for (;;) {
962                 spin_lock_irq(&slow_work_queue_lock);
963
964                 /* first of all, we wait for the last queued item in each list
965                  * to be processed */
966                 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
967                         if (work->owner == module) {
968                                 set_current_state(TASK_UNINTERRUPTIBLE);
969                                 slow_work_unreg_work_item = work;
970                                 goto do_wait;
971                         }
972                 }
973                 list_for_each_entry_reverse(work, &slow_work_queue, link) {
974                         if (work->owner == module) {
975                                 set_current_state(TASK_UNINTERRUPTIBLE);
976                                 slow_work_unreg_work_item = work;
977                                 goto do_wait;
978                         }
979                 }
980
981                 /* then we wait for the items being processed to finish */
982                 slow_work_unreg_module = module;
983                 smp_mb();
984                 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
985                         if (slow_work_thread_processing[loop] == module)
986                                 goto do_wait;
987                 }
988                 spin_unlock_irq(&slow_work_queue_lock);
989                 break; /* okay, we're done */
990
991         do_wait:
992                 spin_unlock_irq(&slow_work_queue_lock);
993                 schedule();
994                 slow_work_unreg_work_item = NULL;
995                 slow_work_unreg_module = NULL;
996         }
997
998         remove_wait_queue(&slow_work_unreg_wq, &myself);
999         mutex_unlock(&slow_work_unreg_sync_lock);
1000 #endif /* CONFIG_MODULES */
1001 }
1002
1003 /**
1004  * slow_work_unregister_user - Unregister a user of the facility
1005  * @module: The module whose items should be cleared
1006  *
1007  * Unregister a user of the facility, killing all the threads if this was the
1008  * last one.
1009  *
1010  * This waits for all the work items belonging to the nominated module to go
1011  * away before proceeding.
1012  */
1013 void slow_work_unregister_user(struct module *module)
1014 {
1015         /* first of all, wait for all outstanding items from the calling module
1016          * to complete */
1017         if (module)
1018                 slow_work_wait_for_items(module);
1019
1020         /* then we can actually go about shutting down the facility if need
1021          * be */
1022         mutex_lock(&slow_work_user_lock);
1023
1024         BUG_ON(slow_work_user_count <= 0);
1025
1026         slow_work_user_count--;
1027         if (slow_work_user_count == 0) {
1028                 printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1029                 slow_work_threads_should_exit = true;
1030                 del_timer_sync(&slow_work_cull_timer);
1031                 del_timer_sync(&slow_work_oom_timer);
1032                 wake_up_all(&slow_work_thread_wq);
1033                 wait_for_completion(&slow_work_last_thread_exited);
1034                 printk(KERN_NOTICE "Slow work thread pool:"
1035                        " Shut down complete\n");
1036         }
1037
1038         mutex_unlock(&slow_work_user_lock);
1039 }
1040 EXPORT_SYMBOL(slow_work_unregister_user);
1041
1042 /*
1043  * Initialise the slow work facility
1044  */
1045 static int __init init_slow_work(void)
1046 {
1047         unsigned nr_cpus = num_possible_cpus();
1048
1049         if (slow_work_max_threads < nr_cpus)
1050                 slow_work_max_threads = nr_cpus;
1051 #ifdef CONFIG_SYSCTL
1052         if (slow_work_max_max_threads < nr_cpus * 2)
1053                 slow_work_max_max_threads = nr_cpus * 2;
1054 #endif
1055 #ifdef CONFIG_SLOW_WORK_DEBUG
1056         {
1057                 struct dentry *dbdir;
1058
1059                 dbdir = debugfs_create_dir("slow_work", NULL);
1060                 if (dbdir && !IS_ERR(dbdir))
1061                         debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
1062                                             NULL, &slow_work_runqueue_fops);
1063         }
1064 #endif
1065         return 0;
1066 }
1067
1068 subsys_initcall(init_slow_work);