workqueue: Clarify that schedule_on_each_cpu is synchronous
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * kernel/workqueue.c - generic async execution with shared worker pool
3  *
4  * Copyright (C) 2002           Ingo Molnar
5  *
6  *   Derived from the taskqueue/keventd code by:
7  *     David Woodhouse <dwmw2@infradead.org>
8  *     Andrew Morton
9  *     Kai Petzke <wpp@marie.physik.tu-berlin.de>
10  *     Theodore Ts'o <tytso@mit.edu>
11  *
12  * Made to use alloc_percpu by Christoph Lameter.
13  *
14  * Copyright (C) 2010           SUSE Linux Products GmbH
15  * Copyright (C) 2010           Tejun Heo <tj@kernel.org>
16  *
17  * This is the generic async execution mechanism.  Work items as are
18  * executed in process context.  The worker pool is shared and
19  * automatically managed.  There is one worker pool for each CPU and
20  * one extra for works which are better served by workers which are
21  * not bound to any specific CPU.
22  *
23  * Please read Documentation/workqueue.txt for details.
24  */
25
26 #include <linux/module.h>
27 #include <linux/kernel.h>
28 #include <linux/sched.h>
29 #include <linux/init.h>
30 #include <linux/signal.h>
31 #include <linux/completion.h>
32 #include <linux/workqueue.h>
33 #include <linux/slab.h>
34 #include <linux/cpu.h>
35 #include <linux/notifier.h>
36 #include <linux/kthread.h>
37 #include <linux/hardirq.h>
38 #include <linux/mempolicy.h>
39 #include <linux/freezer.h>
40 #include <linux/kallsyms.h>
41 #include <linux/debug_locks.h>
42 #include <linux/lockdep.h>
43 #include <linux/idr.h>
44
45 #include "workqueue_sched.h"
46
47 enum {
48         /* global_cwq flags */
49         GCWQ_MANAGE_WORKERS     = 1 << 0,       /* need to manage workers */
50         GCWQ_MANAGING_WORKERS   = 1 << 1,       /* managing workers */
51         GCWQ_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
52         GCWQ_FREEZING           = 1 << 3,       /* freeze in progress */
53         GCWQ_HIGHPRI_PENDING    = 1 << 4,       /* highpri works on queue */
54
55         /* worker flags */
56         WORKER_STARTED          = 1 << 0,       /* started */
57         WORKER_DIE              = 1 << 1,       /* die die die */
58         WORKER_IDLE             = 1 << 2,       /* is idle */
59         WORKER_PREP             = 1 << 3,       /* preparing to run works */
60         WORKER_ROGUE            = 1 << 4,       /* not bound to any cpu */
61         WORKER_REBIND           = 1 << 5,       /* mom is home, come back */
62         WORKER_CPU_INTENSIVE    = 1 << 6,       /* cpu intensive */
63         WORKER_UNBOUND          = 1 << 7,       /* worker is unbound */
64
65         WORKER_NOT_RUNNING      = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
66                                   WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
67
68         /* gcwq->trustee_state */
69         TRUSTEE_START           = 0,            /* start */
70         TRUSTEE_IN_CHARGE       = 1,            /* trustee in charge of gcwq */
71         TRUSTEE_BUTCHER         = 2,            /* butcher workers */
72         TRUSTEE_RELEASE         = 3,            /* release workers */
73         TRUSTEE_DONE            = 4,            /* trustee is done */
74
75         BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
76         BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
77         BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
78
79         MAX_IDLE_WORKERS_RATIO  = 4,            /* 1/4 of busy can be idle */
80         IDLE_WORKER_TIMEOUT     = 300 * HZ,     /* keep idle ones for 5 mins */
81
82         MAYDAY_INITIAL_TIMEOUT  = HZ / 100,     /* call for help after 10ms */
83         MAYDAY_INTERVAL         = HZ / 10,      /* and then every 100ms */
84         CREATE_COOLDOWN         = HZ,           /* time to breath after fail */
85         TRUSTEE_COOLDOWN        = HZ / 10,      /* for trustee draining */
86
87         /*
88          * Rescue workers are used only on emergencies and shared by
89          * all cpus.  Give -20.
90          */
91         RESCUER_NICE_LEVEL      = -20,
92 };
93
94 /*
95  * Structure fields follow one of the following exclusion rules.
96  *
97  * I: Modifiable by initialization/destruction paths and read-only for
98  *    everyone else.
99  *
100  * P: Preemption protected.  Disabling preemption is enough and should
101  *    only be modified and accessed from the local cpu.
102  *
103  * L: gcwq->lock protected.  Access with gcwq->lock held.
104  *
105  * X: During normal operation, modification requires gcwq->lock and
106  *    should be done only from local cpu.  Either disabling preemption
107  *    on local cpu or grabbing gcwq->lock is enough for read access.
108  *    If GCWQ_DISASSOCIATED is set, it's identical to L.
109  *
110  * F: wq->flush_mutex protected.
111  *
112  * W: workqueue_lock protected.
113  */
114
115 struct global_cwq;
116
117 /*
118  * The poor guys doing the actual heavy lifting.  All on-duty workers
119  * are either serving the manager role, on idle list or on busy hash.
120  */
121 struct worker {
122         /* on idle list while idle, on busy hash table while busy */
123         union {
124                 struct list_head        entry;  /* L: while idle */
125                 struct hlist_node       hentry; /* L: while busy */
126         };
127
128         struct work_struct      *current_work;  /* L: work being processed */
129         struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
130         struct list_head        scheduled;      /* L: scheduled works */
131         struct task_struct      *task;          /* I: worker task */
132         struct global_cwq       *gcwq;          /* I: the associated gcwq */
133         /* 64 bytes boundary on 64bit, 32 on 32bit */
134         unsigned long           last_active;    /* L: last active timestamp */
135         unsigned int            flags;          /* X: flags */
136         int                     id;             /* I: worker id */
137         struct work_struct      rebind_work;    /* L: rebind worker to cpu */
138 };
139
140 /*
141  * Global per-cpu workqueue.  There's one and only one for each cpu
142  * and all works are queued and processed here regardless of their
143  * target workqueues.
144  */
145 struct global_cwq {
146         spinlock_t              lock;           /* the gcwq lock */
147         struct list_head        worklist;       /* L: list of pending works */
148         unsigned int            cpu;            /* I: the associated cpu */
149         unsigned int            flags;          /* L: GCWQ_* flags */
150
151         int                     nr_workers;     /* L: total number of workers */
152         int                     nr_idle;        /* L: currently idle ones */
153
154         /* workers are chained either in the idle_list or busy_hash */
155         struct list_head        idle_list;      /* X: list of idle workers */
156         struct hlist_head       busy_hash[BUSY_WORKER_HASH_SIZE];
157                                                 /* L: hash of busy workers */
158
159         struct timer_list       idle_timer;     /* L: worker idle timeout */
160         struct timer_list       mayday_timer;   /* L: SOS timer for dworkers */
161
162         struct ida              worker_ida;     /* L: for worker IDs */
163
164         struct task_struct      *trustee;       /* L: for gcwq shutdown */
165         unsigned int            trustee_state;  /* L: trustee state */
166         wait_queue_head_t       trustee_wait;   /* trustee wait */
167         struct worker           *first_idle;    /* L: first idle worker */
168 } ____cacheline_aligned_in_smp;
169
170 /*
171  * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
172  * work_struct->data are used for flags and thus cwqs need to be
173  * aligned at two's power of the number of flag bits.
174  */
175 struct cpu_workqueue_struct {
176         struct global_cwq       *gcwq;          /* I: the associated gcwq */
177         struct workqueue_struct *wq;            /* I: the owning workqueue */
178         int                     work_color;     /* L: current color */
179         int                     flush_color;    /* L: flushing color */
180         int                     nr_in_flight[WORK_NR_COLORS];
181                                                 /* L: nr of in_flight works */
182         int                     nr_active;      /* L: nr of active works */
183         int                     max_active;     /* L: max active works */
184         struct list_head        delayed_works;  /* L: delayed works */
185 };
186
187 /*
188  * Structure used to wait for workqueue flush.
189  */
190 struct wq_flusher {
191         struct list_head        list;           /* F: list of flushers */
192         int                     flush_color;    /* F: flush color waiting for */
193         struct completion       done;           /* flush completion */
194 };
195
196 /*
197  * All cpumasks are assumed to be always set on UP and thus can't be
198  * used to determine whether there's something to be done.
199  */
200 #ifdef CONFIG_SMP
201 typedef cpumask_var_t mayday_mask_t;
202 #define mayday_test_and_set_cpu(cpu, mask)      \
203         cpumask_test_and_set_cpu((cpu), (mask))
204 #define mayday_clear_cpu(cpu, mask)             cpumask_clear_cpu((cpu), (mask))
205 #define for_each_mayday_cpu(cpu, mask)          for_each_cpu((cpu), (mask))
206 #define alloc_mayday_mask(maskp, gfp)           zalloc_cpumask_var((maskp), (gfp))
207 #define free_mayday_mask(mask)                  free_cpumask_var((mask))
208 #else
209 typedef unsigned long mayday_mask_t;
210 #define mayday_test_and_set_cpu(cpu, mask)      test_and_set_bit(0, &(mask))
211 #define mayday_clear_cpu(cpu, mask)             clear_bit(0, &(mask))
212 #define for_each_mayday_cpu(cpu, mask)          if ((cpu) = 0, (mask))
213 #define alloc_mayday_mask(maskp, gfp)           true
214 #define free_mayday_mask(mask)                  do { } while (0)
215 #endif
216
217 /*
218  * The externally visible workqueue abstraction is an array of
219  * per-CPU workqueues:
220  */
221 struct workqueue_struct {
222         unsigned int            flags;          /* I: WQ_* flags */
223         union {
224                 struct cpu_workqueue_struct __percpu    *pcpu;
225                 struct cpu_workqueue_struct             *single;
226                 unsigned long                           v;
227         } cpu_wq;                               /* I: cwq's */
228         struct list_head        list;           /* W: list of all workqueues */
229
230         struct mutex            flush_mutex;    /* protects wq flushing */
231         int                     work_color;     /* F: current work color */
232         int                     flush_color;    /* F: current flush color */
233         atomic_t                nr_cwqs_to_flush; /* flush in progress */
234         struct wq_flusher       *first_flusher; /* F: first flusher */
235         struct list_head        flusher_queue;  /* F: flush waiters */
236         struct list_head        flusher_overflow; /* F: flush overflow list */
237
238         mayday_mask_t           mayday_mask;    /* cpus requesting rescue */
239         struct worker           *rescuer;       /* I: rescue worker */
240
241         int                     saved_max_active; /* W: saved cwq max_active */
242         const char              *name;          /* I: workqueue name */
243 #ifdef CONFIG_LOCKDEP
244         struct lockdep_map      lockdep_map;
245 #endif
246 };
247
248 struct workqueue_struct *system_wq __read_mostly;
249 struct workqueue_struct *system_long_wq __read_mostly;
250 struct workqueue_struct *system_nrt_wq __read_mostly;
251 struct workqueue_struct *system_unbound_wq __read_mostly;
252 EXPORT_SYMBOL_GPL(system_wq);
253 EXPORT_SYMBOL_GPL(system_long_wq);
254 EXPORT_SYMBOL_GPL(system_nrt_wq);
255 EXPORT_SYMBOL_GPL(system_unbound_wq);
256
257 #define CREATE_TRACE_POINTS
258 #include <trace/events/workqueue.h>
259
260 #define for_each_busy_worker(worker, i, pos, gcwq)                      \
261         for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
262                 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
263
264 static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
265                                   unsigned int sw)
266 {
267         if (cpu < nr_cpu_ids) {
268                 if (sw & 1) {
269                         cpu = cpumask_next(cpu, mask);
270                         if (cpu < nr_cpu_ids)
271                                 return cpu;
272                 }
273                 if (sw & 2)
274                         return WORK_CPU_UNBOUND;
275         }
276         return WORK_CPU_NONE;
277 }
278
279 static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
280                                 struct workqueue_struct *wq)
281 {
282         return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
283 }
284
285 /*
286  * CPU iterators
287  *
288  * An extra gcwq is defined for an invalid cpu number
289  * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
290  * specific CPU.  The following iterators are similar to
291  * for_each_*_cpu() iterators but also considers the unbound gcwq.
292  *
293  * for_each_gcwq_cpu()          : possible CPUs + WORK_CPU_UNBOUND
294  * for_each_online_gcwq_cpu()   : online CPUs + WORK_CPU_UNBOUND
295  * for_each_cwq_cpu()           : possible CPUs for bound workqueues,
296  *                                WORK_CPU_UNBOUND for unbound workqueues
297  */
298 #define for_each_gcwq_cpu(cpu)                                          \
299         for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3);         \
300              (cpu) < WORK_CPU_NONE;                                     \
301              (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3))
302
303 #define for_each_online_gcwq_cpu(cpu)                                   \
304         for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3);           \
305              (cpu) < WORK_CPU_NONE;                                     \
306              (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3))
307
308 #define for_each_cwq_cpu(cpu, wq)                                       \
309         for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq));        \
310              (cpu) < WORK_CPU_NONE;                                     \
311              (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
312
313 #ifdef CONFIG_LOCKDEP
314 /**
315  * in_workqueue_context() - in context of specified workqueue?
316  * @wq: the workqueue of interest
317  *
318  * Checks lockdep state to see if the current task is executing from
319  * within a workqueue item.  This function exists only if lockdep is
320  * enabled.
321  */
322 int in_workqueue_context(struct workqueue_struct *wq)
323 {
324         return lock_is_held(&wq->lockdep_map);
325 }
326 #endif
327
328 #ifdef CONFIG_DEBUG_OBJECTS_WORK
329
330 static struct debug_obj_descr work_debug_descr;
331
332 /*
333  * fixup_init is called when:
334  * - an active object is initialized
335  */
336 static int work_fixup_init(void *addr, enum debug_obj_state state)
337 {
338         struct work_struct *work = addr;
339
340         switch (state) {
341         case ODEBUG_STATE_ACTIVE:
342                 cancel_work_sync(work);
343                 debug_object_init(work, &work_debug_descr);
344                 return 1;
345         default:
346                 return 0;
347         }
348 }
349
350 /*
351  * fixup_activate is called when:
352  * - an active object is activated
353  * - an unknown object is activated (might be a statically initialized object)
354  */
355 static int work_fixup_activate(void *addr, enum debug_obj_state state)
356 {
357         struct work_struct *work = addr;
358
359         switch (state) {
360
361         case ODEBUG_STATE_NOTAVAILABLE:
362                 /*
363                  * This is not really a fixup. The work struct was
364                  * statically initialized. We just make sure that it
365                  * is tracked in the object tracker.
366                  */
367                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
368                         debug_object_init(work, &work_debug_descr);
369                         debug_object_activate(work, &work_debug_descr);
370                         return 0;
371                 }
372                 WARN_ON_ONCE(1);
373                 return 0;
374
375         case ODEBUG_STATE_ACTIVE:
376                 WARN_ON(1);
377
378         default:
379                 return 0;
380         }
381 }
382
383 /*
384  * fixup_free is called when:
385  * - an active object is freed
386  */
387 static int work_fixup_free(void *addr, enum debug_obj_state state)
388 {
389         struct work_struct *work = addr;
390
391         switch (state) {
392         case ODEBUG_STATE_ACTIVE:
393                 cancel_work_sync(work);
394                 debug_object_free(work, &work_debug_descr);
395                 return 1;
396         default:
397                 return 0;
398         }
399 }
400
401 static struct debug_obj_descr work_debug_descr = {
402         .name           = "work_struct",
403         .fixup_init     = work_fixup_init,
404         .fixup_activate = work_fixup_activate,
405         .fixup_free     = work_fixup_free,
406 };
407
408 static inline void debug_work_activate(struct work_struct *work)
409 {
410         debug_object_activate(work, &work_debug_descr);
411 }
412
413 static inline void debug_work_deactivate(struct work_struct *work)
414 {
415         debug_object_deactivate(work, &work_debug_descr);
416 }
417
418 void __init_work(struct work_struct *work, int onstack)
419 {
420         if (onstack)
421                 debug_object_init_on_stack(work, &work_debug_descr);
422         else
423                 debug_object_init(work, &work_debug_descr);
424 }
425 EXPORT_SYMBOL_GPL(__init_work);
426
427 void destroy_work_on_stack(struct work_struct *work)
428 {
429         debug_object_free(work, &work_debug_descr);
430 }
431 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
432
433 #else
434 static inline void debug_work_activate(struct work_struct *work) { }
435 static inline void debug_work_deactivate(struct work_struct *work) { }
436 #endif
437
438 /* Serializes the accesses to the list of workqueues. */
439 static DEFINE_SPINLOCK(workqueue_lock);
440 static LIST_HEAD(workqueues);
441 static bool workqueue_freezing;         /* W: have wqs started freezing? */
442
443 /*
444  * The almighty global cpu workqueues.  nr_running is the only field
445  * which is expected to be used frequently by other cpus via
446  * try_to_wake_up().  Put it in a separate cacheline.
447  */
448 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
449 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
450
451 /*
452  * Global cpu workqueue and nr_running counter for unbound gcwq.  The
453  * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its
454  * workers have WORKER_UNBOUND set.
455  */
456 static struct global_cwq unbound_global_cwq;
457 static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0);       /* always 0 */
458
459 static int worker_thread(void *__worker);
460
461 static struct global_cwq *get_gcwq(unsigned int cpu)
462 {
463         if (cpu != WORK_CPU_UNBOUND)
464                 return &per_cpu(global_cwq, cpu);
465         else
466                 return &unbound_global_cwq;
467 }
468
469 static atomic_t *get_gcwq_nr_running(unsigned int cpu)
470 {
471         if (cpu != WORK_CPU_UNBOUND)
472                 return &per_cpu(gcwq_nr_running, cpu);
473         else
474                 return &unbound_gcwq_nr_running;
475 }
476
477 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
478                                             struct workqueue_struct *wq)
479 {
480         if (!(wq->flags & WQ_UNBOUND)) {
481                 if (likely(cpu < nr_cpu_ids)) {
482 #ifdef CONFIG_SMP
483                         return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
484 #else
485                         return wq->cpu_wq.single;
486 #endif
487                 }
488         } else if (likely(cpu == WORK_CPU_UNBOUND))
489                 return wq->cpu_wq.single;
490         return NULL;
491 }
492
493 static unsigned int work_color_to_flags(int color)
494 {
495         return color << WORK_STRUCT_COLOR_SHIFT;
496 }
497
498 static int get_work_color(struct work_struct *work)
499 {
500         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
501                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
502 }
503
504 static int work_next_color(int color)
505 {
506         return (color + 1) % WORK_NR_COLORS;
507 }
508
509 /*
510  * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
511  * work is on queue.  Once execution starts, WORK_STRUCT_CWQ is
512  * cleared and the work data contains the cpu number it was last on.
513  *
514  * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
515  * cwq, cpu or clear work->data.  These functions should only be
516  * called while the work is owned - ie. while the PENDING bit is set.
517  *
518  * get_work_[g]cwq() can be used to obtain the gcwq or cwq
519  * corresponding to a work.  gcwq is available once the work has been
520  * queued anywhere after initialization.  cwq is available only from
521  * queueing until execution starts.
522  */
523 static inline void set_work_data(struct work_struct *work, unsigned long data,
524                                  unsigned long flags)
525 {
526         BUG_ON(!work_pending(work));
527         atomic_long_set(&work->data, data | flags | work_static(work));
528 }
529
530 static void set_work_cwq(struct work_struct *work,
531                          struct cpu_workqueue_struct *cwq,
532                          unsigned long extra_flags)
533 {
534         set_work_data(work, (unsigned long)cwq,
535                       WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
536 }
537
538 static void set_work_cpu(struct work_struct *work, unsigned int cpu)
539 {
540         set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
541 }
542
543 static void clear_work_data(struct work_struct *work)
544 {
545         set_work_data(work, WORK_STRUCT_NO_CPU, 0);
546 }
547
548 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
549 {
550         unsigned long data = atomic_long_read(&work->data);
551
552         if (data & WORK_STRUCT_CWQ)
553                 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
554         else
555                 return NULL;
556 }
557
558 static struct global_cwq *get_work_gcwq(struct work_struct *work)
559 {
560         unsigned long data = atomic_long_read(&work->data);
561         unsigned int cpu;
562
563         if (data & WORK_STRUCT_CWQ)
564                 return ((struct cpu_workqueue_struct *)
565                         (data & WORK_STRUCT_WQ_DATA_MASK))->gcwq;
566
567         cpu = data >> WORK_STRUCT_FLAG_BITS;
568         if (cpu == WORK_CPU_NONE)
569                 return NULL;
570
571         BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
572         return get_gcwq(cpu);
573 }
574
575 /*
576  * Policy functions.  These define the policies on how the global
577  * worker pool is managed.  Unless noted otherwise, these functions
578  * assume that they're being called with gcwq->lock held.
579  */
580
581 static bool __need_more_worker(struct global_cwq *gcwq)
582 {
583         return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) ||
584                 gcwq->flags & GCWQ_HIGHPRI_PENDING;
585 }
586
587 /*
588  * Need to wake up a worker?  Called from anything but currently
589  * running workers.
590  */
591 static bool need_more_worker(struct global_cwq *gcwq)
592 {
593         return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq);
594 }
595
596 /* Can I start working?  Called from busy but !running workers. */
597 static bool may_start_working(struct global_cwq *gcwq)
598 {
599         return gcwq->nr_idle;
600 }
601
602 /* Do I need to keep working?  Called from currently running workers. */
603 static bool keep_working(struct global_cwq *gcwq)
604 {
605         atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
606
607         return !list_empty(&gcwq->worklist) &&
608                 (atomic_read(nr_running) <= 1 ||
609                  gcwq->flags & GCWQ_HIGHPRI_PENDING);
610 }
611
612 /* Do we need a new worker?  Called from manager. */
613 static bool need_to_create_worker(struct global_cwq *gcwq)
614 {
615         return need_more_worker(gcwq) && !may_start_working(gcwq);
616 }
617
618 /* Do I need to be the manager? */
619 static bool need_to_manage_workers(struct global_cwq *gcwq)
620 {
621         return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
622 }
623
624 /* Do we have too many workers and should some go away? */
625 static bool too_many_workers(struct global_cwq *gcwq)
626 {
627         bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
628         int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
629         int nr_busy = gcwq->nr_workers - nr_idle;
630
631         return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
632 }
633
634 /*
635  * Wake up functions.
636  */
637
638 /* Return the first worker.  Safe with preemption disabled */
639 static struct worker *first_worker(struct global_cwq *gcwq)
640 {
641         if (unlikely(list_empty(&gcwq->idle_list)))
642                 return NULL;
643
644         return list_first_entry(&gcwq->idle_list, struct worker, entry);
645 }
646
647 /**
648  * wake_up_worker - wake up an idle worker
649  * @gcwq: gcwq to wake worker for
650  *
651  * Wake up the first idle worker of @gcwq.
652  *
653  * CONTEXT:
654  * spin_lock_irq(gcwq->lock).
655  */
656 static void wake_up_worker(struct global_cwq *gcwq)
657 {
658         struct worker *worker = first_worker(gcwq);
659
660         if (likely(worker))
661                 wake_up_process(worker->task);
662 }
663
664 /**
665  * wq_worker_waking_up - a worker is waking up
666  * @task: task waking up
667  * @cpu: CPU @task is waking up to
668  *
669  * This function is called during try_to_wake_up() when a worker is
670  * being awoken.
671  *
672  * CONTEXT:
673  * spin_lock_irq(rq->lock)
674  */
675 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
676 {
677         struct worker *worker = kthread_data(task);
678
679         if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
680                 atomic_inc(get_gcwq_nr_running(cpu));
681 }
682
683 /**
684  * wq_worker_sleeping - a worker is going to sleep
685  * @task: task going to sleep
686  * @cpu: CPU in question, must be the current CPU number
687  *
688  * This function is called during schedule() when a busy worker is
689  * going to sleep.  Worker on the same cpu can be woken up by
690  * returning pointer to its task.
691  *
692  * CONTEXT:
693  * spin_lock_irq(rq->lock)
694  *
695  * RETURNS:
696  * Worker task on @cpu to wake up, %NULL if none.
697  */
698 struct task_struct *wq_worker_sleeping(struct task_struct *task,
699                                        unsigned int cpu)
700 {
701         struct worker *worker = kthread_data(task), *to_wakeup = NULL;
702         struct global_cwq *gcwq = get_gcwq(cpu);
703         atomic_t *nr_running = get_gcwq_nr_running(cpu);
704
705         if (unlikely(worker->flags & WORKER_NOT_RUNNING))
706                 return NULL;
707
708         /* this can only happen on the local cpu */
709         BUG_ON(cpu != raw_smp_processor_id());
710
711         /*
712          * The counterpart of the following dec_and_test, implied mb,
713          * worklist not empty test sequence is in insert_work().
714          * Please read comment there.
715          *
716          * NOT_RUNNING is clear.  This means that trustee is not in
717          * charge and we're running on the local cpu w/ rq lock held
718          * and preemption disabled, which in turn means that none else
719          * could be manipulating idle_list, so dereferencing idle_list
720          * without gcwq lock is safe.
721          */
722         if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
723                 to_wakeup = first_worker(gcwq);
724         return to_wakeup ? to_wakeup->task : NULL;
725 }
726
727 /**
728  * worker_set_flags - set worker flags and adjust nr_running accordingly
729  * @worker: self
730  * @flags: flags to set
731  * @wakeup: wakeup an idle worker if necessary
732  *
733  * Set @flags in @worker->flags and adjust nr_running accordingly.  If
734  * nr_running becomes zero and @wakeup is %true, an idle worker is
735  * woken up.
736  *
737  * CONTEXT:
738  * spin_lock_irq(gcwq->lock)
739  */
740 static inline void worker_set_flags(struct worker *worker, unsigned int flags,
741                                     bool wakeup)
742 {
743         struct global_cwq *gcwq = worker->gcwq;
744
745         WARN_ON_ONCE(worker->task != current);
746
747         /*
748          * If transitioning into NOT_RUNNING, adjust nr_running and
749          * wake up an idle worker as necessary if requested by
750          * @wakeup.
751          */
752         if ((flags & WORKER_NOT_RUNNING) &&
753             !(worker->flags & WORKER_NOT_RUNNING)) {
754                 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
755
756                 if (wakeup) {
757                         if (atomic_dec_and_test(nr_running) &&
758                             !list_empty(&gcwq->worklist))
759                                 wake_up_worker(gcwq);
760                 } else
761                         atomic_dec(nr_running);
762         }
763
764         worker->flags |= flags;
765 }
766
767 /**
768  * worker_clr_flags - clear worker flags and adjust nr_running accordingly
769  * @worker: self
770  * @flags: flags to clear
771  *
772  * Clear @flags in @worker->flags and adjust nr_running accordingly.
773  *
774  * CONTEXT:
775  * spin_lock_irq(gcwq->lock)
776  */
777 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
778 {
779         struct global_cwq *gcwq = worker->gcwq;
780         unsigned int oflags = worker->flags;
781
782         WARN_ON_ONCE(worker->task != current);
783
784         worker->flags &= ~flags;
785
786         /* if transitioning out of NOT_RUNNING, increment nr_running */
787         if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
788                 if (!(worker->flags & WORKER_NOT_RUNNING))
789                         atomic_inc(get_gcwq_nr_running(gcwq->cpu));
790 }
791
792 /**
793  * busy_worker_head - return the busy hash head for a work
794  * @gcwq: gcwq of interest
795  * @work: work to be hashed
796  *
797  * Return hash head of @gcwq for @work.
798  *
799  * CONTEXT:
800  * spin_lock_irq(gcwq->lock).
801  *
802  * RETURNS:
803  * Pointer to the hash head.
804  */
805 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
806                                            struct work_struct *work)
807 {
808         const int base_shift = ilog2(sizeof(struct work_struct));
809         unsigned long v = (unsigned long)work;
810
811         /* simple shift and fold hash, do we need something better? */
812         v >>= base_shift;
813         v += v >> BUSY_WORKER_HASH_ORDER;
814         v &= BUSY_WORKER_HASH_MASK;
815
816         return &gcwq->busy_hash[v];
817 }
818
819 /**
820  * __find_worker_executing_work - find worker which is executing a work
821  * @gcwq: gcwq of interest
822  * @bwh: hash head as returned by busy_worker_head()
823  * @work: work to find worker for
824  *
825  * Find a worker which is executing @work on @gcwq.  @bwh should be
826  * the hash head obtained by calling busy_worker_head() with the same
827  * work.
828  *
829  * CONTEXT:
830  * spin_lock_irq(gcwq->lock).
831  *
832  * RETURNS:
833  * Pointer to worker which is executing @work if found, NULL
834  * otherwise.
835  */
836 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
837                                                    struct hlist_head *bwh,
838                                                    struct work_struct *work)
839 {
840         struct worker *worker;
841         struct hlist_node *tmp;
842
843         hlist_for_each_entry(worker, tmp, bwh, hentry)
844                 if (worker->current_work == work)
845                         return worker;
846         return NULL;
847 }
848
849 /**
850  * find_worker_executing_work - find worker which is executing a work
851  * @gcwq: gcwq of interest
852  * @work: work to find worker for
853  *
854  * Find a worker which is executing @work on @gcwq.  This function is
855  * identical to __find_worker_executing_work() except that this
856  * function calculates @bwh itself.
857  *
858  * CONTEXT:
859  * spin_lock_irq(gcwq->lock).
860  *
861  * RETURNS:
862  * Pointer to worker which is executing @work if found, NULL
863  * otherwise.
864  */
865 static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
866                                                  struct work_struct *work)
867 {
868         return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
869                                             work);
870 }
871
872 /**
873  * gcwq_determine_ins_pos - find insertion position
874  * @gcwq: gcwq of interest
875  * @cwq: cwq a work is being queued for
876  *
877  * A work for @cwq is about to be queued on @gcwq, determine insertion
878  * position for the work.  If @cwq is for HIGHPRI wq, the work is
879  * queued at the head of the queue but in FIFO order with respect to
880  * other HIGHPRI works; otherwise, at the end of the queue.  This
881  * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that
882  * there are HIGHPRI works pending.
883  *
884  * CONTEXT:
885  * spin_lock_irq(gcwq->lock).
886  *
887  * RETURNS:
888  * Pointer to inserstion position.
889  */
890 static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq,
891                                                struct cpu_workqueue_struct *cwq)
892 {
893         struct work_struct *twork;
894
895         if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
896                 return &gcwq->worklist;
897
898         list_for_each_entry(twork, &gcwq->worklist, entry) {
899                 struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
900
901                 if (!(tcwq->wq->flags & WQ_HIGHPRI))
902                         break;
903         }
904
905         gcwq->flags |= GCWQ_HIGHPRI_PENDING;
906         return &twork->entry;
907 }
908
909 /**
910  * insert_work - insert a work into gcwq
911  * @cwq: cwq @work belongs to
912  * @work: work to insert
913  * @head: insertion point
914  * @extra_flags: extra WORK_STRUCT_* flags to set
915  *
916  * Insert @work which belongs to @cwq into @gcwq after @head.
917  * @extra_flags is or'd to work_struct flags.
918  *
919  * CONTEXT:
920  * spin_lock_irq(gcwq->lock).
921  */
922 static void insert_work(struct cpu_workqueue_struct *cwq,
923                         struct work_struct *work, struct list_head *head,
924                         unsigned int extra_flags)
925 {
926         struct global_cwq *gcwq = cwq->gcwq;
927
928         /* we own @work, set data and link */
929         set_work_cwq(work, cwq, extra_flags);
930
931         /*
932          * Ensure that we get the right work->data if we see the
933          * result of list_add() below, see try_to_grab_pending().
934          */
935         smp_wmb();
936
937         list_add_tail(&work->entry, head);
938
939         /*
940          * Ensure either worker_sched_deactivated() sees the above
941          * list_add_tail() or we see zero nr_running to avoid workers
942          * lying around lazily while there are works to be processed.
943          */
944         smp_mb();
945
946         if (__need_more_worker(gcwq))
947                 wake_up_worker(gcwq);
948 }
949
950 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
951                          struct work_struct *work)
952 {
953         struct global_cwq *gcwq;
954         struct cpu_workqueue_struct *cwq;
955         struct list_head *worklist;
956         unsigned int work_flags;
957         unsigned long flags;
958
959         debug_work_activate(work);
960
961         if (WARN_ON_ONCE(wq->flags & WQ_DYING))
962                 return;
963
964         /* determine gcwq to use */
965         if (!(wq->flags & WQ_UNBOUND)) {
966                 struct global_cwq *last_gcwq;
967
968                 if (unlikely(cpu == WORK_CPU_UNBOUND))
969                         cpu = raw_smp_processor_id();
970
971                 /*
972                  * It's multi cpu.  If @wq is non-reentrant and @work
973                  * was previously on a different cpu, it might still
974                  * be running there, in which case the work needs to
975                  * be queued on that cpu to guarantee non-reentrance.
976                  */
977                 gcwq = get_gcwq(cpu);
978                 if (wq->flags & WQ_NON_REENTRANT &&
979                     (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
980                         struct worker *worker;
981
982                         spin_lock_irqsave(&last_gcwq->lock, flags);
983
984                         worker = find_worker_executing_work(last_gcwq, work);
985
986                         if (worker && worker->current_cwq->wq == wq)
987                                 gcwq = last_gcwq;
988                         else {
989                                 /* meh... not running there, queue here */
990                                 spin_unlock_irqrestore(&last_gcwq->lock, flags);
991                                 spin_lock_irqsave(&gcwq->lock, flags);
992                         }
993                 } else
994                         spin_lock_irqsave(&gcwq->lock, flags);
995         } else {
996                 gcwq = get_gcwq(WORK_CPU_UNBOUND);
997                 spin_lock_irqsave(&gcwq->lock, flags);
998         }
999
1000         /* gcwq determined, get cwq and queue */
1001         cwq = get_cwq(gcwq->cpu, wq);
1002         trace_workqueue_queue_work(cpu, cwq, work);
1003
1004         BUG_ON(!list_empty(&work->entry));
1005
1006         cwq->nr_in_flight[cwq->work_color]++;
1007         work_flags = work_color_to_flags(cwq->work_color);
1008
1009         if (likely(cwq->nr_active < cwq->max_active)) {
1010                 trace_workqueue_activate_work(work);
1011                 cwq->nr_active++;
1012                 worklist = gcwq_determine_ins_pos(gcwq, cwq);
1013         } else {
1014                 work_flags |= WORK_STRUCT_DELAYED;
1015                 worklist = &cwq->delayed_works;
1016         }
1017
1018         insert_work(cwq, work, worklist, work_flags);
1019
1020         spin_unlock_irqrestore(&gcwq->lock, flags);
1021 }
1022
1023 /**
1024  * queue_work - queue work on a workqueue
1025  * @wq: workqueue to use
1026  * @work: work to queue
1027  *
1028  * Returns 0 if @work was already on a queue, non-zero otherwise.
1029  *
1030  * We queue the work to the CPU on which it was submitted, but if the CPU dies
1031  * it can be processed by another CPU.
1032  */
1033 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
1034 {
1035         int ret;
1036
1037         ret = queue_work_on(get_cpu(), wq, work);
1038         put_cpu();
1039
1040         return ret;
1041 }
1042 EXPORT_SYMBOL_GPL(queue_work);
1043
1044 /**
1045  * queue_work_on - queue work on specific cpu
1046  * @cpu: CPU number to execute work on
1047  * @wq: workqueue to use
1048  * @work: work to queue
1049  *
1050  * Returns 0 if @work was already on a queue, non-zero otherwise.
1051  *
1052  * We queue the work to a specific CPU, the caller must ensure it
1053  * can't go away.
1054  */
1055 int
1056 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
1057 {
1058         int ret = 0;
1059
1060         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1061                 __queue_work(cpu, wq, work);
1062                 ret = 1;
1063         }
1064         return ret;
1065 }
1066 EXPORT_SYMBOL_GPL(queue_work_on);
1067
1068 static void delayed_work_timer_fn(unsigned long __data)
1069 {
1070         struct delayed_work *dwork = (struct delayed_work *)__data;
1071         struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
1072
1073         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
1074 }
1075
1076 /**
1077  * queue_delayed_work - queue work on a workqueue after delay
1078  * @wq: workqueue to use
1079  * @dwork: delayable work to queue
1080  * @delay: number of jiffies to wait before queueing
1081  *
1082  * Returns 0 if @work was already on a queue, non-zero otherwise.
1083  */
1084 int queue_delayed_work(struct workqueue_struct *wq,
1085                         struct delayed_work *dwork, unsigned long delay)
1086 {
1087         if (delay == 0)
1088                 return queue_work(wq, &dwork->work);
1089
1090         return queue_delayed_work_on(-1, wq, dwork, delay);
1091 }
1092 EXPORT_SYMBOL_GPL(queue_delayed_work);
1093
1094 /**
1095  * queue_delayed_work_on - queue work on specific CPU after delay
1096  * @cpu: CPU number to execute work on
1097  * @wq: workqueue to use
1098  * @dwork: work to queue
1099  * @delay: number of jiffies to wait before queueing
1100  *
1101  * Returns 0 if @work was already on a queue, non-zero otherwise.
1102  */
1103 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1104                         struct delayed_work *dwork, unsigned long delay)
1105 {
1106         int ret = 0;
1107         struct timer_list *timer = &dwork->timer;
1108         struct work_struct *work = &dwork->work;
1109
1110         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1111                 unsigned int lcpu;
1112
1113                 BUG_ON(timer_pending(timer));
1114                 BUG_ON(!list_empty(&work->entry));
1115
1116                 timer_stats_timer_set_start_info(&dwork->timer);
1117
1118                 /*
1119                  * This stores cwq for the moment, for the timer_fn.
1120                  * Note that the work's gcwq is preserved to allow
1121                  * reentrance detection for delayed works.
1122                  */
1123                 if (!(wq->flags & WQ_UNBOUND)) {
1124                         struct global_cwq *gcwq = get_work_gcwq(work);
1125
1126                         if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
1127                                 lcpu = gcwq->cpu;
1128                         else
1129                                 lcpu = raw_smp_processor_id();
1130                 } else
1131                         lcpu = WORK_CPU_UNBOUND;
1132
1133                 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1134
1135                 timer->expires = jiffies + delay;
1136                 timer->data = (unsigned long)dwork;
1137                 timer->function = delayed_work_timer_fn;
1138
1139                 if (unlikely(cpu >= 0))
1140                         add_timer_on(timer, cpu);
1141                 else
1142                         add_timer(timer);
1143                 ret = 1;
1144         }
1145         return ret;
1146 }
1147 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1148
1149 /**
1150  * worker_enter_idle - enter idle state
1151  * @worker: worker which is entering idle state
1152  *
1153  * @worker is entering idle state.  Update stats and idle timer if
1154  * necessary.
1155  *
1156  * LOCKING:
1157  * spin_lock_irq(gcwq->lock).
1158  */
1159 static void worker_enter_idle(struct worker *worker)
1160 {
1161         struct global_cwq *gcwq = worker->gcwq;
1162
1163         BUG_ON(worker->flags & WORKER_IDLE);
1164         BUG_ON(!list_empty(&worker->entry) &&
1165                (worker->hentry.next || worker->hentry.pprev));
1166
1167         /* can't use worker_set_flags(), also called from start_worker() */
1168         worker->flags |= WORKER_IDLE;
1169         gcwq->nr_idle++;
1170         worker->last_active = jiffies;
1171
1172         /* idle_list is LIFO */
1173         list_add(&worker->entry, &gcwq->idle_list);
1174
1175         if (likely(!(worker->flags & WORKER_ROGUE))) {
1176                 if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
1177                         mod_timer(&gcwq->idle_timer,
1178                                   jiffies + IDLE_WORKER_TIMEOUT);
1179         } else
1180                 wake_up_all(&gcwq->trustee_wait);
1181
1182         /* sanity check nr_running */
1183         WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
1184                      atomic_read(get_gcwq_nr_running(gcwq->cpu)));
1185 }
1186
1187 /**
1188  * worker_leave_idle - leave idle state
1189  * @worker: worker which is leaving idle state
1190  *
1191  * @worker is leaving idle state.  Update stats.
1192  *
1193  * LOCKING:
1194  * spin_lock_irq(gcwq->lock).
1195  */
1196 static void worker_leave_idle(struct worker *worker)
1197 {
1198         struct global_cwq *gcwq = worker->gcwq;
1199
1200         BUG_ON(!(worker->flags & WORKER_IDLE));
1201         worker_clr_flags(worker, WORKER_IDLE);
1202         gcwq->nr_idle--;
1203         list_del_init(&worker->entry);
1204 }
1205
1206 /**
1207  * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1208  * @worker: self
1209  *
1210  * Works which are scheduled while the cpu is online must at least be
1211  * scheduled to a worker which is bound to the cpu so that if they are
1212  * flushed from cpu callbacks while cpu is going down, they are
1213  * guaranteed to execute on the cpu.
1214  *
1215  * This function is to be used by rogue workers and rescuers to bind
1216  * themselves to the target cpu and may race with cpu going down or
1217  * coming online.  kthread_bind() can't be used because it may put the
1218  * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1219  * verbatim as it's best effort and blocking and gcwq may be
1220  * [dis]associated in the meantime.
1221  *
1222  * This function tries set_cpus_allowed() and locks gcwq and verifies
1223  * the binding against GCWQ_DISASSOCIATED which is set during
1224  * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1225  * idle state or fetches works without dropping lock, it can guarantee
1226  * the scheduling requirement described in the first paragraph.
1227  *
1228  * CONTEXT:
1229  * Might sleep.  Called without any lock but returns with gcwq->lock
1230  * held.
1231  *
1232  * RETURNS:
1233  * %true if the associated gcwq is online (@worker is successfully
1234  * bound), %false if offline.
1235  */
1236 static bool worker_maybe_bind_and_lock(struct worker *worker)
1237 __acquires(&gcwq->lock)
1238 {
1239         struct global_cwq *gcwq = worker->gcwq;
1240         struct task_struct *task = worker->task;
1241
1242         while (true) {
1243                 /*
1244                  * The following call may fail, succeed or succeed
1245                  * without actually migrating the task to the cpu if
1246                  * it races with cpu hotunplug operation.  Verify
1247                  * against GCWQ_DISASSOCIATED.
1248                  */
1249                 if (!(gcwq->flags & GCWQ_DISASSOCIATED))
1250                         set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
1251
1252                 spin_lock_irq(&gcwq->lock);
1253                 if (gcwq->flags & GCWQ_DISASSOCIATED)
1254                         return false;
1255                 if (task_cpu(task) == gcwq->cpu &&
1256                     cpumask_equal(&current->cpus_allowed,
1257                                   get_cpu_mask(gcwq->cpu)))
1258                         return true;
1259                 spin_unlock_irq(&gcwq->lock);
1260
1261                 /* CPU has come up inbetween, retry migration */
1262                 cpu_relax();
1263         }
1264 }
1265
1266 /*
1267  * Function for worker->rebind_work used to rebind rogue busy workers
1268  * to the associated cpu which is coming back online.  This is
1269  * scheduled by cpu up but can race with other cpu hotplug operations
1270  * and may be executed twice without intervening cpu down.
1271  */
1272 static void worker_rebind_fn(struct work_struct *work)
1273 {
1274         struct worker *worker = container_of(work, struct worker, rebind_work);
1275         struct global_cwq *gcwq = worker->gcwq;
1276
1277         if (worker_maybe_bind_and_lock(worker))
1278                 worker_clr_flags(worker, WORKER_REBIND);
1279
1280         spin_unlock_irq(&gcwq->lock);
1281 }
1282
1283 static struct worker *alloc_worker(void)
1284 {
1285         struct worker *worker;
1286
1287         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
1288         if (worker) {
1289                 INIT_LIST_HEAD(&worker->entry);
1290                 INIT_LIST_HEAD(&worker->scheduled);
1291                 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1292                 /* on creation a worker is in !idle && prep state */
1293                 worker->flags = WORKER_PREP;
1294         }
1295         return worker;
1296 }
1297
1298 /**
1299  * create_worker - create a new workqueue worker
1300  * @gcwq: gcwq the new worker will belong to
1301  * @bind: whether to set affinity to @cpu or not
1302  *
1303  * Create a new worker which is bound to @gcwq.  The returned worker
1304  * can be started by calling start_worker() or destroyed using
1305  * destroy_worker().
1306  *
1307  * CONTEXT:
1308  * Might sleep.  Does GFP_KERNEL allocations.
1309  *
1310  * RETURNS:
1311  * Pointer to the newly created worker.
1312  */
1313 static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
1314 {
1315         bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
1316         struct worker *worker = NULL;
1317         int id = -1;
1318
1319         spin_lock_irq(&gcwq->lock);
1320         while (ida_get_new(&gcwq->worker_ida, &id)) {
1321                 spin_unlock_irq(&gcwq->lock);
1322                 if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
1323                         goto fail;
1324                 spin_lock_irq(&gcwq->lock);
1325         }
1326         spin_unlock_irq(&gcwq->lock);
1327
1328         worker = alloc_worker();
1329         if (!worker)
1330                 goto fail;
1331
1332         worker->gcwq = gcwq;
1333         worker->id = id;
1334
1335         if (!on_unbound_cpu)
1336                 worker->task = kthread_create(worker_thread, worker,
1337                                               "kworker/%u:%d", gcwq->cpu, id);
1338         else
1339                 worker->task = kthread_create(worker_thread, worker,
1340                                               "kworker/u:%d", id);
1341         if (IS_ERR(worker->task))
1342                 goto fail;
1343
1344         /*
1345          * A rogue worker will become a regular one if CPU comes
1346          * online later on.  Make sure every worker has
1347          * PF_THREAD_BOUND set.
1348          */
1349         if (bind && !on_unbound_cpu)
1350                 kthread_bind(worker->task, gcwq->cpu);
1351         else {
1352                 worker->task->flags |= PF_THREAD_BOUND;
1353                 if (on_unbound_cpu)
1354                         worker->flags |= WORKER_UNBOUND;
1355         }
1356
1357         return worker;
1358 fail:
1359         if (id >= 0) {
1360                 spin_lock_irq(&gcwq->lock);
1361                 ida_remove(&gcwq->worker_ida, id);
1362                 spin_unlock_irq(&gcwq->lock);
1363         }
1364         kfree(worker);
1365         return NULL;
1366 }
1367
1368 /**
1369  * start_worker - start a newly created worker
1370  * @worker: worker to start
1371  *
1372  * Make the gcwq aware of @worker and start it.
1373  *
1374  * CONTEXT:
1375  * spin_lock_irq(gcwq->lock).
1376  */
1377 static void start_worker(struct worker *worker)
1378 {
1379         worker->flags |= WORKER_STARTED;
1380         worker->gcwq->nr_workers++;
1381         worker_enter_idle(worker);
1382         wake_up_process(worker->task);
1383 }
1384
1385 /**
1386  * destroy_worker - destroy a workqueue worker
1387  * @worker: worker to be destroyed
1388  *
1389  * Destroy @worker and adjust @gcwq stats accordingly.
1390  *
1391  * CONTEXT:
1392  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1393  */
1394 static void destroy_worker(struct worker *worker)
1395 {
1396         struct global_cwq *gcwq = worker->gcwq;
1397         int id = worker->id;
1398
1399         /* sanity check frenzy */
1400         BUG_ON(worker->current_work);
1401         BUG_ON(!list_empty(&worker->scheduled));
1402
1403         if (worker->flags & WORKER_STARTED)
1404                 gcwq->nr_workers--;
1405         if (worker->flags & WORKER_IDLE)
1406                 gcwq->nr_idle--;
1407
1408         list_del_init(&worker->entry);
1409         worker->flags |= WORKER_DIE;
1410
1411         spin_unlock_irq(&gcwq->lock);
1412
1413         kthread_stop(worker->task);
1414         kfree(worker);
1415
1416         spin_lock_irq(&gcwq->lock);
1417         ida_remove(&gcwq->worker_ida, id);
1418 }
1419
1420 static void idle_worker_timeout(unsigned long __gcwq)
1421 {
1422         struct global_cwq *gcwq = (void *)__gcwq;
1423
1424         spin_lock_irq(&gcwq->lock);
1425
1426         if (too_many_workers(gcwq)) {
1427                 struct worker *worker;
1428                 unsigned long expires;
1429
1430                 /* idle_list is kept in LIFO order, check the last one */
1431                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1432                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1433
1434                 if (time_before(jiffies, expires))
1435                         mod_timer(&gcwq->idle_timer, expires);
1436                 else {
1437                         /* it's been idle for too long, wake up manager */
1438                         gcwq->flags |= GCWQ_MANAGE_WORKERS;
1439                         wake_up_worker(gcwq);
1440                 }
1441         }
1442
1443         spin_unlock_irq(&gcwq->lock);
1444 }
1445
1446 static bool send_mayday(struct work_struct *work)
1447 {
1448         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1449         struct workqueue_struct *wq = cwq->wq;
1450         unsigned int cpu;
1451
1452         if (!(wq->flags & WQ_RESCUER))
1453                 return false;
1454
1455         /* mayday mayday mayday */
1456         cpu = cwq->gcwq->cpu;
1457         /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
1458         if (cpu == WORK_CPU_UNBOUND)
1459                 cpu = 0;
1460         if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
1461                 wake_up_process(wq->rescuer->task);
1462         return true;
1463 }
1464
1465 static void gcwq_mayday_timeout(unsigned long __gcwq)
1466 {
1467         struct global_cwq *gcwq = (void *)__gcwq;
1468         struct work_struct *work;
1469
1470         spin_lock_irq(&gcwq->lock);
1471
1472         if (need_to_create_worker(gcwq)) {
1473                 /*
1474                  * We've been trying to create a new worker but
1475                  * haven't been successful.  We might be hitting an
1476                  * allocation deadlock.  Send distress signals to
1477                  * rescuers.
1478                  */
1479                 list_for_each_entry(work, &gcwq->worklist, entry)
1480                         send_mayday(work);
1481         }
1482
1483         spin_unlock_irq(&gcwq->lock);
1484
1485         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
1486 }
1487
1488 /**
1489  * maybe_create_worker - create a new worker if necessary
1490  * @gcwq: gcwq to create a new worker for
1491  *
1492  * Create a new worker for @gcwq if necessary.  @gcwq is guaranteed to
1493  * have at least one idle worker on return from this function.  If
1494  * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1495  * sent to all rescuers with works scheduled on @gcwq to resolve
1496  * possible allocation deadlock.
1497  *
1498  * On return, need_to_create_worker() is guaranteed to be false and
1499  * may_start_working() true.
1500  *
1501  * LOCKING:
1502  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1503  * multiple times.  Does GFP_KERNEL allocations.  Called only from
1504  * manager.
1505  *
1506  * RETURNS:
1507  * false if no action was taken and gcwq->lock stayed locked, true
1508  * otherwise.
1509  */
1510 static bool maybe_create_worker(struct global_cwq *gcwq)
1511 __releases(&gcwq->lock)
1512 __acquires(&gcwq->lock)
1513 {
1514         if (!need_to_create_worker(gcwq))
1515                 return false;
1516 restart:
1517         spin_unlock_irq(&gcwq->lock);
1518
1519         /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1520         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1521
1522         while (true) {
1523                 struct worker *worker;
1524
1525                 worker = create_worker(gcwq, true);
1526                 if (worker) {
1527                         del_timer_sync(&gcwq->mayday_timer);
1528                         spin_lock_irq(&gcwq->lock);
1529                         start_worker(worker);
1530                         BUG_ON(need_to_create_worker(gcwq));
1531                         return true;
1532                 }
1533
1534                 if (!need_to_create_worker(gcwq))
1535                         break;
1536
1537                 __set_current_state(TASK_INTERRUPTIBLE);
1538                 schedule_timeout(CREATE_COOLDOWN);
1539
1540                 if (!need_to_create_worker(gcwq))
1541                         break;
1542         }
1543
1544         del_timer_sync(&gcwq->mayday_timer);
1545         spin_lock_irq(&gcwq->lock);
1546         if (need_to_create_worker(gcwq))
1547                 goto restart;
1548         return true;
1549 }
1550
1551 /**
1552  * maybe_destroy_worker - destroy workers which have been idle for a while
1553  * @gcwq: gcwq to destroy workers for
1554  *
1555  * Destroy @gcwq workers which have been idle for longer than
1556  * IDLE_WORKER_TIMEOUT.
1557  *
1558  * LOCKING:
1559  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1560  * multiple times.  Called only from manager.
1561  *
1562  * RETURNS:
1563  * false if no action was taken and gcwq->lock stayed locked, true
1564  * otherwise.
1565  */
1566 static bool maybe_destroy_workers(struct global_cwq *gcwq)
1567 {
1568         bool ret = false;
1569
1570         while (too_many_workers(gcwq)) {
1571                 struct worker *worker;
1572                 unsigned long expires;
1573
1574                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1575                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1576
1577                 if (time_before(jiffies, expires)) {
1578                         mod_timer(&gcwq->idle_timer, expires);
1579                         break;
1580                 }
1581
1582                 destroy_worker(worker);
1583                 ret = true;
1584         }
1585
1586         return ret;
1587 }
1588
1589 /**
1590  * manage_workers - manage worker pool
1591  * @worker: self
1592  *
1593  * Assume the manager role and manage gcwq worker pool @worker belongs
1594  * to.  At any given time, there can be only zero or one manager per
1595  * gcwq.  The exclusion is handled automatically by this function.
1596  *
1597  * The caller can safely start processing works on false return.  On
1598  * true return, it's guaranteed that need_to_create_worker() is false
1599  * and may_start_working() is true.
1600  *
1601  * CONTEXT:
1602  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1603  * multiple times.  Does GFP_KERNEL allocations.
1604  *
1605  * RETURNS:
1606  * false if no action was taken and gcwq->lock stayed locked, true if
1607  * some action was taken.
1608  */
1609 static bool manage_workers(struct worker *worker)
1610 {
1611         struct global_cwq *gcwq = worker->gcwq;
1612         bool ret = false;
1613
1614         if (gcwq->flags & GCWQ_MANAGING_WORKERS)
1615                 return ret;
1616
1617         gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
1618         gcwq->flags |= GCWQ_MANAGING_WORKERS;
1619
1620         /*
1621          * Destroy and then create so that may_start_working() is true
1622          * on return.
1623          */
1624         ret |= maybe_destroy_workers(gcwq);
1625         ret |= maybe_create_worker(gcwq);
1626
1627         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
1628
1629         /*
1630          * The trustee might be waiting to take over the manager
1631          * position, tell it we're done.
1632          */
1633         if (unlikely(gcwq->trustee))
1634                 wake_up_all(&gcwq->trustee_wait);
1635
1636         return ret;
1637 }
1638
1639 /**
1640  * move_linked_works - move linked works to a list
1641  * @work: start of series of works to be scheduled
1642  * @head: target list to append @work to
1643  * @nextp: out paramter for nested worklist walking
1644  *
1645  * Schedule linked works starting from @work to @head.  Work series to
1646  * be scheduled starts at @work and includes any consecutive work with
1647  * WORK_STRUCT_LINKED set in its predecessor.
1648  *
1649  * If @nextp is not NULL, it's updated to point to the next work of
1650  * the last scheduled work.  This allows move_linked_works() to be
1651  * nested inside outer list_for_each_entry_safe().
1652  *
1653  * CONTEXT:
1654  * spin_lock_irq(gcwq->lock).
1655  */
1656 static void move_linked_works(struct work_struct *work, struct list_head *head,
1657                               struct work_struct **nextp)
1658 {
1659         struct work_struct *n;
1660
1661         /*
1662          * Linked worklist will always end before the end of the list,
1663          * use NULL for list head.
1664          */
1665         list_for_each_entry_safe_from(work, n, NULL, entry) {
1666                 list_move_tail(&work->entry, head);
1667                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1668                         break;
1669         }
1670
1671         /*
1672          * If we're already inside safe list traversal and have moved
1673          * multiple works to the scheduled queue, the next position
1674          * needs to be updated.
1675          */
1676         if (nextp)
1677                 *nextp = n;
1678 }
1679
1680 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1681 {
1682         struct work_struct *work = list_first_entry(&cwq->delayed_works,
1683                                                     struct work_struct, entry);
1684         struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
1685
1686         trace_workqueue_activate_work(work);
1687         move_linked_works(work, pos, NULL);
1688         __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1689         cwq->nr_active++;
1690 }
1691
1692 /**
1693  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1694  * @cwq: cwq of interest
1695  * @color: color of work which left the queue
1696  * @delayed: for a delayed work
1697  *
1698  * A work either has completed or is removed from pending queue,
1699  * decrement nr_in_flight of its cwq and handle workqueue flushing.
1700  *
1701  * CONTEXT:
1702  * spin_lock_irq(gcwq->lock).
1703  */
1704 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color,
1705                                  bool delayed)
1706 {
1707         /* ignore uncolored works */
1708         if (color == WORK_NO_COLOR)
1709                 return;
1710
1711         cwq->nr_in_flight[color]--;
1712
1713         if (!delayed) {
1714                 cwq->nr_active--;
1715                 if (!list_empty(&cwq->delayed_works)) {
1716                         /* one down, submit a delayed one */
1717                         if (cwq->nr_active < cwq->max_active)
1718                                 cwq_activate_first_delayed(cwq);
1719                 }
1720         }
1721
1722         /* is flush in progress and are we at the flushing tip? */
1723         if (likely(cwq->flush_color != color))
1724                 return;
1725
1726         /* are there still in-flight works? */
1727         if (cwq->nr_in_flight[color])
1728                 return;
1729
1730         /* this cwq is done, clear flush_color */
1731         cwq->flush_color = -1;
1732
1733         /*
1734          * If this was the last cwq, wake up the first flusher.  It
1735          * will handle the rest.
1736          */
1737         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1738                 complete(&cwq->wq->first_flusher->done);
1739 }
1740
1741 /**
1742  * process_one_work - process single work
1743  * @worker: self
1744  * @work: work to process
1745  *
1746  * Process @work.  This function contains all the logics necessary to
1747  * process a single work including synchronization against and
1748  * interaction with other workers on the same cpu, queueing and
1749  * flushing.  As long as context requirement is met, any worker can
1750  * call this function to process a work.
1751  *
1752  * CONTEXT:
1753  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1754  */
1755 static void process_one_work(struct worker *worker, struct work_struct *work)
1756 __releases(&gcwq->lock)
1757 __acquires(&gcwq->lock)
1758 {
1759         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1760         struct global_cwq *gcwq = cwq->gcwq;
1761         struct hlist_head *bwh = busy_worker_head(gcwq, work);
1762         bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
1763         work_func_t f = work->func;
1764         int work_color;
1765         struct worker *collision;
1766 #ifdef CONFIG_LOCKDEP
1767         /*
1768          * It is permissible to free the struct work_struct from
1769          * inside the function that is called from it, this we need to
1770          * take into account for lockdep too.  To avoid bogus "held
1771          * lock freed" warnings as well as problems when looking into
1772          * work->lockdep_map, make a copy and use that here.
1773          */
1774         struct lockdep_map lockdep_map = work->lockdep_map;
1775 #endif
1776         /*
1777          * A single work shouldn't be executed concurrently by
1778          * multiple workers on a single cpu.  Check whether anyone is
1779          * already processing the work.  If so, defer the work to the
1780          * currently executing one.
1781          */
1782         collision = __find_worker_executing_work(gcwq, bwh, work);
1783         if (unlikely(collision)) {
1784                 move_linked_works(work, &collision->scheduled, NULL);
1785                 return;
1786         }
1787
1788         /* claim and process */
1789         debug_work_deactivate(work);
1790         hlist_add_head(&worker->hentry, bwh);
1791         worker->current_work = work;
1792         worker->current_cwq = cwq;
1793         work_color = get_work_color(work);
1794
1795         /* record the current cpu number in the work data and dequeue */
1796         set_work_cpu(work, gcwq->cpu);
1797         list_del_init(&work->entry);
1798
1799         /*
1800          * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
1801          * wake up another worker; otherwise, clear HIGHPRI_PENDING.
1802          */
1803         if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
1804                 struct work_struct *nwork = list_first_entry(&gcwq->worklist,
1805                                                 struct work_struct, entry);
1806
1807                 if (!list_empty(&gcwq->worklist) &&
1808                     get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
1809                         wake_up_worker(gcwq);
1810                 else
1811                         gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
1812         }
1813
1814         /*
1815          * CPU intensive works don't participate in concurrency
1816          * management.  They're the scheduler's responsibility.
1817          */
1818         if (unlikely(cpu_intensive))
1819                 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);
1820
1821         spin_unlock_irq(&gcwq->lock);
1822
1823         work_clear_pending(work);
1824         lock_map_acquire(&cwq->wq->lockdep_map);
1825         lock_map_acquire(&lockdep_map);
1826         trace_workqueue_execute_start(work);
1827         f(work);
1828         /*
1829          * While we must be careful to not use "work" after this, the trace
1830          * point will only record its address.
1831          */
1832         trace_workqueue_execute_end(work);
1833         lock_map_release(&lockdep_map);
1834         lock_map_release(&cwq->wq->lockdep_map);
1835
1836         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
1837                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
1838                        "%s/0x%08x/%d\n",
1839                        current->comm, preempt_count(), task_pid_nr(current));
1840                 printk(KERN_ERR "    last function: ");
1841                 print_symbol("%s\n", (unsigned long)f);
1842                 debug_show_held_locks(current);
1843                 dump_stack();
1844         }
1845
1846         spin_lock_irq(&gcwq->lock);
1847
1848         /* clear cpu intensive status */
1849         if (unlikely(cpu_intensive))
1850                 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
1851
1852         /* we're done with it, release */
1853         hlist_del_init(&worker->hentry);
1854         worker->current_work = NULL;
1855         worker->current_cwq = NULL;
1856         cwq_dec_nr_in_flight(cwq, work_color, false);
1857 }
1858
1859 /**
1860  * process_scheduled_works - process scheduled works
1861  * @worker: self
1862  *
1863  * Process all scheduled works.  Please note that the scheduled list
1864  * may change while processing a work, so this function repeatedly
1865  * fetches a work from the top and executes it.
1866  *
1867  * CONTEXT:
1868  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1869  * multiple times.
1870  */
1871 static void process_scheduled_works(struct worker *worker)
1872 {
1873         while (!list_empty(&worker->scheduled)) {
1874                 struct work_struct *work = list_first_entry(&worker->scheduled,
1875                                                 struct work_struct, entry);
1876                 process_one_work(worker, work);
1877         }
1878 }
1879
1880 /**
1881  * worker_thread - the worker thread function
1882  * @__worker: self
1883  *
1884  * The gcwq worker thread function.  There's a single dynamic pool of
1885  * these per each cpu.  These workers process all works regardless of
1886  * their specific target workqueue.  The only exception is works which
1887  * belong to workqueues with a rescuer which will be explained in
1888  * rescuer_thread().
1889  */
1890 static int worker_thread(void *__worker)
1891 {
1892         struct worker *worker = __worker;
1893         struct global_cwq *gcwq = worker->gcwq;
1894
1895         /* tell the scheduler that this is a workqueue worker */
1896         worker->task->flags |= PF_WQ_WORKER;
1897 woke_up:
1898         spin_lock_irq(&gcwq->lock);
1899
1900         /* DIE can be set only while we're idle, checking here is enough */
1901         if (worker->flags & WORKER_DIE) {
1902                 spin_unlock_irq(&gcwq->lock);
1903                 worker->task->flags &= ~PF_WQ_WORKER;
1904                 return 0;
1905         }
1906
1907         worker_leave_idle(worker);
1908 recheck:
1909         /* no more worker necessary? */
1910         if (!need_more_worker(gcwq))
1911                 goto sleep;
1912
1913         /* do we need to manage? */
1914         if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
1915                 goto recheck;
1916
1917         /*
1918          * ->scheduled list can only be filled while a worker is
1919          * preparing to process a work or actually processing it.
1920          * Make sure nobody diddled with it while I was sleeping.
1921          */
1922         BUG_ON(!list_empty(&worker->scheduled));
1923
1924         /*
1925          * When control reaches this point, we're guaranteed to have
1926          * at least one idle worker or that someone else has already
1927          * assumed the manager role.
1928          */
1929         worker_clr_flags(worker, WORKER_PREP);
1930
1931         do {
1932                 struct work_struct *work =
1933                         list_first_entry(&gcwq->worklist,
1934                                          struct work_struct, entry);
1935
1936                 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1937                         /* optimization path, not strictly necessary */
1938                         process_one_work(worker, work);
1939                         if (unlikely(!list_empty(&worker->scheduled)))
1940                                 process_scheduled_works(worker);
1941                 } else {
1942                         move_linked_works(work, &worker->scheduled, NULL);
1943                         process_scheduled_works(worker);
1944                 }
1945         } while (keep_working(gcwq));
1946
1947         worker_set_flags(worker, WORKER_PREP, false);
1948 sleep:
1949         if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
1950                 goto recheck;
1951
1952         /*
1953          * gcwq->lock is held and there's no work to process and no
1954          * need to manage, sleep.  Workers are woken up only while
1955          * holding gcwq->lock or from local cpu, so setting the
1956          * current state before releasing gcwq->lock is enough to
1957          * prevent losing any event.
1958          */
1959         worker_enter_idle(worker);
1960         __set_current_state(TASK_INTERRUPTIBLE);
1961         spin_unlock_irq(&gcwq->lock);
1962         schedule();
1963         goto woke_up;
1964 }
1965
1966 /**
1967  * rescuer_thread - the rescuer thread function
1968  * @__wq: the associated workqueue
1969  *
1970  * Workqueue rescuer thread function.  There's one rescuer for each
1971  * workqueue which has WQ_RESCUER set.
1972  *
1973  * Regular work processing on a gcwq may block trying to create a new
1974  * worker which uses GFP_KERNEL allocation which has slight chance of
1975  * developing into deadlock if some works currently on the same queue
1976  * need to be processed to satisfy the GFP_KERNEL allocation.  This is
1977  * the problem rescuer solves.
1978  *
1979  * When such condition is possible, the gcwq summons rescuers of all
1980  * workqueues which have works queued on the gcwq and let them process
1981  * those works so that forward progress can be guaranteed.
1982  *
1983  * This should happen rarely.
1984  */
1985 static int rescuer_thread(void *__wq)
1986 {
1987         struct workqueue_struct *wq = __wq;
1988         struct worker *rescuer = wq->rescuer;
1989         struct list_head *scheduled = &rescuer->scheduled;
1990         bool is_unbound = wq->flags & WQ_UNBOUND;
1991         unsigned int cpu;
1992
1993         set_user_nice(current, RESCUER_NICE_LEVEL);
1994 repeat:
1995         set_current_state(TASK_INTERRUPTIBLE);
1996
1997         if (kthread_should_stop())
1998                 return 0;
1999
2000         /*
2001          * See whether any cpu is asking for help.  Unbounded
2002          * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
2003          */
2004         for_each_mayday_cpu(cpu, wq->mayday_mask) {
2005                 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
2006                 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
2007                 struct global_cwq *gcwq = cwq->gcwq;
2008                 struct work_struct *work, *n;
2009
2010                 __set_current_state(TASK_RUNNING);
2011                 mayday_clear_cpu(cpu, wq->mayday_mask);
2012
2013                 /* migrate to the target cpu if possible */
2014                 rescuer->gcwq = gcwq;
2015                 worker_maybe_bind_and_lock(rescuer);
2016
2017                 /*
2018                  * Slurp in all works issued via this workqueue and
2019                  * process'em.
2020                  */
2021                 BUG_ON(!list_empty(&rescuer->scheduled));
2022                 list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
2023                         if (get_work_cwq(work) == cwq)
2024                                 move_linked_works(work, scheduled, &n);
2025
2026                 process_scheduled_works(rescuer);
2027                 spin_unlock_irq(&gcwq->lock);
2028         }
2029
2030         schedule();
2031         goto repeat;
2032 }
2033
2034 struct wq_barrier {
2035         struct work_struct      work;
2036         struct completion       done;
2037 };
2038
2039 static void wq_barrier_func(struct work_struct *work)
2040 {
2041         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
2042         complete(&barr->done);
2043 }
2044
2045 /**
2046  * insert_wq_barrier - insert a barrier work
2047  * @cwq: cwq to insert barrier into
2048  * @barr: wq_barrier to insert
2049  * @target: target work to attach @barr to
2050  * @worker: worker currently executing @target, NULL if @target is not executing
2051  *
2052  * @barr is linked to @target such that @barr is completed only after
2053  * @target finishes execution.  Please note that the ordering
2054  * guarantee is observed only with respect to @target and on the local
2055  * cpu.
2056  *
2057  * Currently, a queued barrier can't be canceled.  This is because
2058  * try_to_grab_pending() can't determine whether the work to be
2059  * grabbed is at the head of the queue and thus can't clear LINKED
2060  * flag of the previous work while there must be a valid next work
2061  * after a work with LINKED flag set.
2062  *
2063  * Note that when @worker is non-NULL, @target may be modified
2064  * underneath us, so we can't reliably determine cwq from @target.
2065  *
2066  * CONTEXT:
2067  * spin_lock_irq(gcwq->lock).
2068  */
2069 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
2070                               struct wq_barrier *barr,
2071                               struct work_struct *target, struct worker *worker)
2072 {
2073         struct list_head *head;
2074         unsigned int linked = 0;
2075
2076         /*
2077          * debugobject calls are safe here even with gcwq->lock locked
2078          * as we know for sure that this will not trigger any of the
2079          * checks and call back into the fixup functions where we
2080          * might deadlock.
2081          */
2082         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
2083         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2084         init_completion(&barr->done);
2085
2086         /*
2087          * If @target is currently being executed, schedule the
2088          * barrier to the worker; otherwise, put it after @target.
2089          */
2090         if (worker)
2091                 head = worker->scheduled.next;
2092         else {
2093                 unsigned long *bits = work_data_bits(target);
2094
2095                 head = target->entry.next;
2096                 /* there can already be other linked works, inherit and set */
2097                 linked = *bits & WORK_STRUCT_LINKED;
2098                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2099         }
2100
2101         debug_work_activate(&barr->work);
2102         insert_work(cwq, &barr->work, head,
2103                     work_color_to_flags(WORK_NO_COLOR) | linked);
2104 }
2105
2106 /**
2107  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
2108  * @wq: workqueue being flushed
2109  * @flush_color: new flush color, < 0 for no-op
2110  * @work_color: new work color, < 0 for no-op
2111  *
2112  * Prepare cwqs for workqueue flushing.
2113  *
2114  * If @flush_color is non-negative, flush_color on all cwqs should be
2115  * -1.  If no cwq has in-flight commands at the specified color, all
2116  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
2117  * has in flight commands, its cwq->flush_color is set to
2118  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
2119  * wakeup logic is armed and %true is returned.
2120  *
2121  * The caller should have initialized @wq->first_flusher prior to
2122  * calling this function with non-negative @flush_color.  If
2123  * @flush_color is negative, no flush color update is done and %false
2124  * is returned.
2125  *
2126  * If @work_color is non-negative, all cwqs should have the same
2127  * work_color which is previous to @work_color and all will be
2128  * advanced to @work_color.
2129  *
2130  * CONTEXT:
2131  * mutex_lock(wq->flush_mutex).
2132  *
2133  * RETURNS:
2134  * %true if @flush_color >= 0 and there's something to flush.  %false
2135  * otherwise.
2136  */
2137 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
2138                                       int flush_color, int work_color)
2139 {
2140         bool wait = false;
2141         unsigned int cpu;
2142
2143         if (flush_color >= 0) {
2144                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
2145                 atomic_set(&wq->nr_cwqs_to_flush, 1);
2146         }
2147
2148         for_each_cwq_cpu(cpu, wq) {
2149                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2150                 struct global_cwq *gcwq = cwq->gcwq;
2151
2152                 spin_lock_irq(&gcwq->lock);
2153
2154                 if (flush_color >= 0) {
2155                         BUG_ON(cwq->flush_color != -1);
2156
2157                         if (cwq->nr_in_flight[flush_color]) {
2158                                 cwq->flush_color = flush_color;
2159                                 atomic_inc(&wq->nr_cwqs_to_flush);
2160                                 wait = true;
2161                         }
2162                 }
2163
2164                 if (work_color >= 0) {
2165                         BUG_ON(work_color != work_next_color(cwq->work_color));
2166                         cwq->work_color = work_color;
2167                 }
2168
2169                 spin_unlock_irq(&gcwq->lock);
2170         }
2171
2172         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
2173                 complete(&wq->first_flusher->done);
2174
2175         return wait;
2176 }
2177
2178 /**
2179  * flush_workqueue - ensure that any scheduled work has run to completion.
2180  * @wq: workqueue to flush
2181  *
2182  * Forces execution of the workqueue and blocks until its completion.
2183  * This is typically used in driver shutdown handlers.
2184  *
2185  * We sleep until all works which were queued on entry have been handled,
2186  * but we are not livelocked by new incoming ones.
2187  */
2188 void flush_workqueue(struct workqueue_struct *wq)
2189 {
2190         struct wq_flusher this_flusher = {
2191                 .list = LIST_HEAD_INIT(this_flusher.list),
2192                 .flush_color = -1,
2193                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
2194         };
2195         int next_color;
2196
2197         lock_map_acquire(&wq->lockdep_map);
2198         lock_map_release(&wq->lockdep_map);
2199
2200         mutex_lock(&wq->flush_mutex);
2201
2202         /*
2203          * Start-to-wait phase
2204          */
2205         next_color = work_next_color(wq->work_color);
2206
2207         if (next_color != wq->flush_color) {
2208                 /*
2209                  * Color space is not full.  The current work_color
2210                  * becomes our flush_color and work_color is advanced
2211                  * by one.
2212                  */
2213                 BUG_ON(!list_empty(&wq->flusher_overflow));
2214                 this_flusher.flush_color = wq->work_color;
2215                 wq->work_color = next_color;
2216
2217                 if (!wq->first_flusher) {
2218                         /* no flush in progress, become the first flusher */
2219                         BUG_ON(wq->flush_color != this_flusher.flush_color);
2220
2221                         wq->first_flusher = &this_flusher;
2222
2223                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
2224                                                        wq->work_color)) {
2225                                 /* nothing to flush, done */
2226                                 wq->flush_color = next_color;
2227                                 wq->first_flusher = NULL;
2228                                 goto out_unlock;
2229                         }
2230                 } else {
2231                         /* wait in queue */
2232                         BUG_ON(wq->flush_color == this_flusher.flush_color);
2233                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
2234                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2235                 }
2236         } else {
2237                 /*
2238                  * Oops, color space is full, wait on overflow queue.
2239                  * The next flush completion will assign us
2240                  * flush_color and transfer to flusher_queue.
2241                  */
2242                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2243         }
2244
2245         mutex_unlock(&wq->flush_mutex);
2246
2247         wait_for_completion(&this_flusher.done);
2248
2249         /*
2250          * Wake-up-and-cascade phase
2251          *
2252          * First flushers are responsible for cascading flushes and
2253          * handling overflow.  Non-first flushers can simply return.
2254          */
2255         if (wq->first_flusher != &this_flusher)
2256                 return;
2257
2258         mutex_lock(&wq->flush_mutex);
2259
2260         /* we might have raced, check again with mutex held */
2261         if (wq->first_flusher != &this_flusher)
2262                 goto out_unlock;
2263
2264         wq->first_flusher = NULL;
2265
2266         BUG_ON(!list_empty(&this_flusher.list));
2267         BUG_ON(wq->flush_color != this_flusher.flush_color);
2268
2269         while (true) {
2270                 struct wq_flusher *next, *tmp;
2271
2272                 /* complete all the flushers sharing the current flush color */
2273                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2274                         if (next->flush_color != wq->flush_color)
2275                                 break;
2276                         list_del_init(&next->list);
2277                         complete(&next->done);
2278                 }
2279
2280                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
2281                        wq->flush_color != work_next_color(wq->work_color));
2282
2283                 /* this flush_color is finished, advance by one */
2284                 wq->flush_color = work_next_color(wq->flush_color);
2285
2286                 /* one color has been freed, handle overflow queue */
2287                 if (!list_empty(&wq->flusher_overflow)) {
2288                         /*
2289                          * Assign the same color to all overflowed
2290                          * flushers, advance work_color and append to
2291                          * flusher_queue.  This is the start-to-wait
2292                          * phase for these overflowed flushers.
2293                          */
2294                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
2295                                 tmp->flush_color = wq->work_color;
2296
2297                         wq->work_color = work_next_color(wq->work_color);
2298
2299                         list_splice_tail_init(&wq->flusher_overflow,
2300                                               &wq->flusher_queue);
2301                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2302                 }
2303
2304                 if (list_empty(&wq->flusher_queue)) {
2305                         BUG_ON(wq->flush_color != wq->work_color);
2306                         break;
2307                 }
2308
2309                 /*
2310                  * Need to flush more colors.  Make the next flusher
2311                  * the new first flusher and arm cwqs.
2312                  */
2313                 BUG_ON(wq->flush_color == wq->work_color);
2314                 BUG_ON(wq->flush_color != next->flush_color);
2315
2316                 list_del_init(&next->list);
2317                 wq->first_flusher = next;
2318
2319                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
2320                         break;
2321
2322                 /*
2323                  * Meh... this color is already done, clear first
2324                  * flusher and repeat cascading.
2325                  */
2326                 wq->first_flusher = NULL;
2327         }
2328
2329 out_unlock:
2330         mutex_unlock(&wq->flush_mutex);
2331 }
2332 EXPORT_SYMBOL_GPL(flush_workqueue);
2333
2334 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
2335                              bool wait_executing)
2336 {
2337         struct worker *worker = NULL;
2338         struct global_cwq *gcwq;
2339         struct cpu_workqueue_struct *cwq;
2340
2341         might_sleep();
2342         gcwq = get_work_gcwq(work);
2343         if (!gcwq)
2344                 return false;
2345
2346         spin_lock_irq(&gcwq->lock);
2347         if (!list_empty(&work->entry)) {
2348                 /*
2349                  * See the comment near try_to_grab_pending()->smp_rmb().
2350                  * If it was re-queued to a different gcwq under us, we
2351                  * are not going to wait.
2352                  */
2353                 smp_rmb();
2354                 cwq = get_work_cwq(work);
2355                 if (unlikely(!cwq || gcwq != cwq->gcwq))
2356                         goto already_gone;
2357         } else if (wait_executing) {
2358                 worker = find_worker_executing_work(gcwq, work);
2359                 if (!worker)
2360                         goto already_gone;
2361                 cwq = worker->current_cwq;
2362         } else
2363                 goto already_gone;
2364
2365         insert_wq_barrier(cwq, barr, work, worker);
2366         spin_unlock_irq(&gcwq->lock);
2367
2368         lock_map_acquire(&cwq->wq->lockdep_map);
2369         lock_map_release(&cwq->wq->lockdep_map);
2370         return true;
2371 already_gone:
2372         spin_unlock_irq(&gcwq->lock);
2373         return false;
2374 }
2375
2376 /**
2377  * flush_work - wait for a work to finish executing the last queueing instance
2378  * @work: the work to flush
2379  *
2380  * Wait until @work has finished execution.  This function considers
2381  * only the last queueing instance of @work.  If @work has been
2382  * enqueued across different CPUs on a non-reentrant workqueue or on
2383  * multiple workqueues, @work might still be executing on return on
2384  * some of the CPUs from earlier queueing.
2385  *
2386  * If @work was queued only on a non-reentrant, ordered or unbound
2387  * workqueue, @work is guaranteed to be idle on return if it hasn't
2388  * been requeued since flush started.
2389  *
2390  * RETURNS:
2391  * %true if flush_work() waited for the work to finish execution,
2392  * %false if it was already idle.
2393  */
2394 bool flush_work(struct work_struct *work)
2395 {
2396         struct wq_barrier barr;
2397
2398         if (start_flush_work(work, &barr, true)) {
2399                 wait_for_completion(&barr.done);
2400                 destroy_work_on_stack(&barr.work);
2401                 return true;
2402         } else
2403                 return false;
2404 }
2405 EXPORT_SYMBOL_GPL(flush_work);
2406
2407 static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2408 {
2409         struct wq_barrier barr;
2410         struct worker *worker;
2411
2412         spin_lock_irq(&gcwq->lock);
2413
2414         worker = find_worker_executing_work(gcwq, work);
2415         if (unlikely(worker))
2416                 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2417
2418         spin_unlock_irq(&gcwq->lock);
2419
2420         if (unlikely(worker)) {
2421                 wait_for_completion(&barr.done);
2422                 destroy_work_on_stack(&barr.work);
2423                 return true;
2424         } else
2425                 return false;
2426 }
2427
2428 static bool wait_on_work(struct work_struct *work)
2429 {
2430         bool ret = false;
2431         int cpu;
2432
2433         might_sleep();
2434
2435         lock_map_acquire(&work->lockdep_map);
2436         lock_map_release(&work->lockdep_map);
2437
2438         for_each_gcwq_cpu(cpu)
2439                 ret |= wait_on_cpu_work(get_gcwq(cpu), work);
2440         return ret;
2441 }
2442
2443 /**
2444  * flush_work_sync - wait until a work has finished execution
2445  * @work: the work to flush
2446  *
2447  * Wait until @work has finished execution.  On return, it's
2448  * guaranteed that all queueing instances of @work which happened
2449  * before this function is called are finished.  In other words, if
2450  * @work hasn't been requeued since this function was called, @work is
2451  * guaranteed to be idle on return.
2452  *
2453  * RETURNS:
2454  * %true if flush_work_sync() waited for the work to finish execution,
2455  * %false if it was already idle.
2456  */
2457 bool flush_work_sync(struct work_struct *work)
2458 {
2459         struct wq_barrier barr;
2460         bool pending, waited;
2461
2462         /* we'll wait for executions separately, queue barr only if pending */
2463         pending = start_flush_work(work, &barr, false);
2464
2465         /* wait for executions to finish */
2466         waited = wait_on_work(work);
2467
2468         /* wait for the pending one */
2469         if (pending) {
2470                 wait_for_completion(&barr.done);
2471                 destroy_work_on_stack(&barr.work);
2472         }
2473
2474         return pending || waited;
2475 }
2476 EXPORT_SYMBOL_GPL(flush_work_sync);
2477
2478 /*
2479  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2480  * so this work can't be re-armed in any way.
2481  */
2482 static int try_to_grab_pending(struct work_struct *work)
2483 {
2484         struct global_cwq *gcwq;
2485         int ret = -1;
2486
2487         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2488                 return 0;
2489
2490         /*
2491          * The queueing is in progress, or it is already queued. Try to
2492          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2493          */
2494         gcwq = get_work_gcwq(work);
2495         if (!gcwq)
2496                 return ret;
2497
2498         spin_lock_irq(&gcwq->lock);
2499         if (!list_empty(&work->entry)) {
2500                 /*
2501                  * This work is queued, but perhaps we locked the wrong gcwq.
2502                  * In that case we must see the new value after rmb(), see
2503                  * insert_work()->wmb().
2504                  */
2505                 smp_rmb();
2506                 if (gcwq == get_work_gcwq(work)) {
2507                         debug_work_deactivate(work);
2508                         list_del_init(&work->entry);
2509                         cwq_dec_nr_in_flight(get_work_cwq(work),
2510                                 get_work_color(work),
2511                                 *work_data_bits(work) & WORK_STRUCT_DELAYED);
2512                         ret = 1;
2513                 }
2514         }
2515         spin_unlock_irq(&gcwq->lock);
2516
2517         return ret;
2518 }
2519
2520 static bool __cancel_work_timer(struct work_struct *work,
2521                                 struct timer_list* timer)
2522 {
2523         int ret;
2524
2525         do {
2526                 ret = (timer && likely(del_timer(timer)));
2527                 if (!ret)
2528                         ret = try_to_grab_pending(work);
2529                 wait_on_work(work);
2530         } while (unlikely(ret < 0));
2531
2532         clear_work_data(work);
2533         return ret;
2534 }
2535
2536 /**
2537  * cancel_work_sync - cancel a work and wait for it to finish
2538  * @work: the work to cancel
2539  *
2540  * Cancel @work and wait for its execution to finish.  This function
2541  * can be used even if the work re-queues itself or migrates to
2542  * another workqueue.  On return from this function, @work is
2543  * guaranteed to be not pending or executing on any CPU.
2544  *
2545  * cancel_work_sync(&delayed_work->work) must not be used for
2546  * delayed_work's.  Use cancel_delayed_work_sync() instead.
2547  *
2548  * The caller must ensure that the workqueue on which @work was last
2549  * queued can't be destroyed before this function returns.
2550  *
2551  * RETURNS:
2552  * %true if @work was pending, %false otherwise.
2553  */
2554 bool cancel_work_sync(struct work_struct *work)
2555 {
2556         return __cancel_work_timer(work, NULL);
2557 }
2558 EXPORT_SYMBOL_GPL(cancel_work_sync);
2559
2560 /**
2561  * flush_delayed_work - wait for a dwork to finish executing the last queueing
2562  * @dwork: the delayed work to flush
2563  *
2564  * Delayed timer is cancelled and the pending work is queued for
2565  * immediate execution.  Like flush_work(), this function only
2566  * considers the last queueing instance of @dwork.
2567  *
2568  * RETURNS:
2569  * %true if flush_work() waited for the work to finish execution,
2570  * %false if it was already idle.
2571  */
2572 bool flush_delayed_work(struct delayed_work *dwork)
2573 {
2574         if (del_timer_sync(&dwork->timer))
2575                 __queue_work(raw_smp_processor_id(),
2576                              get_work_cwq(&dwork->work)->wq, &dwork->work);
2577         return flush_work(&dwork->work);
2578 }
2579 EXPORT_SYMBOL(flush_delayed_work);
2580
2581 /**
2582  * flush_delayed_work_sync - wait for a dwork to finish
2583  * @dwork: the delayed work to flush
2584  *
2585  * Delayed timer is cancelled and the pending work is queued for
2586  * execution immediately.  Other than timer handling, its behavior
2587  * is identical to flush_work_sync().
2588  *
2589  * RETURNS:
2590  * %true if flush_work_sync() waited for the work to finish execution,
2591  * %false if it was already idle.
2592  */
2593 bool flush_delayed_work_sync(struct delayed_work *dwork)
2594 {
2595         if (del_timer_sync(&dwork->timer))
2596                 __queue_work(raw_smp_processor_id(),
2597                              get_work_cwq(&dwork->work)->wq, &dwork->work);
2598         return flush_work_sync(&dwork->work);
2599 }
2600 EXPORT_SYMBOL(flush_delayed_work_sync);
2601
2602 /**
2603  * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
2604  * @dwork: the delayed work cancel
2605  *
2606  * This is cancel_work_sync() for delayed works.
2607  *
2608  * RETURNS:
2609  * %true if @dwork was pending, %false otherwise.
2610  */
2611 bool cancel_delayed_work_sync(struct delayed_work *dwork)
2612 {
2613         return __cancel_work_timer(&dwork->work, &dwork->timer);
2614 }
2615 EXPORT_SYMBOL(cancel_delayed_work_sync);
2616
2617 /**
2618  * schedule_work - put work task in global workqueue
2619  * @work: job to be done
2620  *
2621  * Returns zero if @work was already on the kernel-global workqueue and
2622  * non-zero otherwise.
2623  *
2624  * This puts a job in the kernel-global workqueue if it was not already
2625  * queued and leaves it in the same position on the kernel-global
2626  * workqueue otherwise.
2627  */
2628 int schedule_work(struct work_struct *work)
2629 {
2630         return queue_work(system_wq, work);
2631 }
2632 EXPORT_SYMBOL(schedule_work);
2633
2634 /*
2635  * schedule_work_on - put work task on a specific cpu
2636  * @cpu: cpu to put the work task on
2637  * @work: job to be done
2638  *
2639  * This puts a job on a specific cpu
2640  */
2641 int schedule_work_on(int cpu, struct work_struct *work)
2642 {
2643         return queue_work_on(cpu, system_wq, work);
2644 }
2645 EXPORT_SYMBOL(schedule_work_on);
2646
2647 /**
2648  * schedule_delayed_work - put work task in global workqueue after delay
2649  * @dwork: job to be done
2650  * @delay: number of jiffies to wait or 0 for immediate execution
2651  *
2652  * After waiting for a given time this puts a job in the kernel-global
2653  * workqueue.
2654  */
2655 int schedule_delayed_work(struct delayed_work *dwork,
2656                                         unsigned long delay)
2657 {
2658         return queue_delayed_work(system_wq, dwork, delay);
2659 }
2660 EXPORT_SYMBOL(schedule_delayed_work);
2661
2662 /**
2663  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
2664  * @cpu: cpu to use
2665  * @dwork: job to be done
2666  * @delay: number of jiffies to wait
2667  *
2668  * After waiting for a given time this puts a job in the kernel-global
2669  * workqueue on the specified CPU.
2670  */
2671 int schedule_delayed_work_on(int cpu,
2672                         struct delayed_work *dwork, unsigned long delay)
2673 {
2674         return queue_delayed_work_on(cpu, system_wq, dwork, delay);
2675 }
2676 EXPORT_SYMBOL(schedule_delayed_work_on);
2677
2678 /**
2679  * schedule_on_each_cpu - execute a function synchronously on each online CPU
2680  * @func: the function to call
2681  *
2682  * schedule_on_each_cpu() executes @func on each online CPU using the
2683  * system workqueue and blocks until all CPUs have completed.
2684  * schedule_on_each_cpu() is very slow.
2685  *
2686  * RETURNS:
2687  * 0 on success, -errno on failure.
2688  */
2689 int schedule_on_each_cpu(work_func_t func)
2690 {
2691         int cpu;
2692         struct work_struct __percpu *works;
2693
2694         works = alloc_percpu(struct work_struct);
2695         if (!works)
2696                 return -ENOMEM;
2697
2698         get_online_cpus();
2699
2700         for_each_online_cpu(cpu) {
2701                 struct work_struct *work = per_cpu_ptr(works, cpu);
2702
2703                 INIT_WORK(work, func);
2704                 schedule_work_on(cpu, work);
2705         }
2706
2707         for_each_online_cpu(cpu)
2708                 flush_work(per_cpu_ptr(works, cpu));
2709
2710         put_online_cpus();
2711         free_percpu(works);
2712         return 0;
2713 }
2714
2715 /**
2716  * flush_scheduled_work - ensure that any scheduled work has run to completion.
2717  *
2718  * Forces execution of the kernel-global workqueue and blocks until its
2719  * completion.
2720  *
2721  * Think twice before calling this function!  It's very easy to get into
2722  * trouble if you don't take great care.  Either of the following situations
2723  * will lead to deadlock:
2724  *
2725  *      One of the work items currently on the workqueue needs to acquire
2726  *      a lock held by your code or its caller.
2727  *
2728  *      Your code is running in the context of a work routine.
2729  *
2730  * They will be detected by lockdep when they occur, but the first might not
2731  * occur very often.  It depends on what work items are on the workqueue and
2732  * what locks they need, which you have no control over.
2733  *
2734  * In most situations flushing the entire workqueue is overkill; you merely
2735  * need to know that a particular work item isn't queued and isn't running.
2736  * In such cases you should use cancel_delayed_work_sync() or
2737  * cancel_work_sync() instead.
2738  */
2739 void flush_scheduled_work(void)
2740 {
2741         flush_workqueue(system_wq);
2742 }
2743 EXPORT_SYMBOL(flush_scheduled_work);
2744
2745 /**
2746  * execute_in_process_context - reliably execute the routine with user context
2747  * @fn:         the function to execute
2748  * @ew:         guaranteed storage for the execute work structure (must
2749  *              be available when the work executes)
2750  *
2751  * Executes the function immediately if process context is available,
2752  * otherwise schedules the function for delayed execution.
2753  *
2754  * Returns:     0 - function was executed
2755  *              1 - function was scheduled for execution
2756  */
2757 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
2758 {
2759         if (!in_interrupt()) {
2760                 fn(&ew->work);
2761                 return 0;
2762         }
2763
2764         INIT_WORK(&ew->work, fn);
2765         schedule_work(&ew->work);
2766
2767         return 1;
2768 }
2769 EXPORT_SYMBOL_GPL(execute_in_process_context);
2770
2771 int keventd_up(void)
2772 {
2773         return system_wq != NULL;
2774 }
2775
2776 static int alloc_cwqs(struct workqueue_struct *wq)
2777 {
2778         /*
2779          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
2780          * Make sure that the alignment isn't lower than that of
2781          * unsigned long long.
2782          */
2783         const size_t size = sizeof(struct cpu_workqueue_struct);
2784         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
2785                                    __alignof__(unsigned long long));
2786 #ifdef CONFIG_SMP
2787         bool percpu = !(wq->flags & WQ_UNBOUND);
2788 #else
2789         bool percpu = false;
2790 #endif
2791
2792         if (percpu)
2793                 wq->cpu_wq.pcpu = __alloc_percpu(size, align);
2794         else {
2795                 void *ptr;
2796
2797                 /*
2798                  * Allocate enough room to align cwq and put an extra
2799                  * pointer at the end pointing back to the originally
2800                  * allocated pointer which will be used for free.
2801                  */
2802                 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
2803                 if (ptr) {
2804                         wq->cpu_wq.single = PTR_ALIGN(ptr, align);
2805                         *(void **)(wq->cpu_wq.single + 1) = ptr;
2806                 }
2807         }
2808
2809         /* just in case, make sure it's actually aligned */
2810         BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
2811         return wq->cpu_wq.v ? 0 : -ENOMEM;
2812 }
2813
2814 static void free_cwqs(struct workqueue_struct *wq)
2815 {
2816 #ifdef CONFIG_SMP
2817         bool percpu = !(wq->flags & WQ_UNBOUND);
2818 #else
2819         bool percpu = false;
2820 #endif
2821
2822         if (percpu)
2823                 free_percpu(wq->cpu_wq.pcpu);
2824         else if (wq->cpu_wq.single) {
2825                 /* the pointer to free is stored right after the cwq */
2826                 kfree(*(void **)(wq->cpu_wq.single + 1));
2827         }
2828 }
2829
2830 static int wq_clamp_max_active(int max_active, unsigned int flags,
2831                                const char *name)
2832 {
2833         int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
2834
2835         if (max_active < 1 || max_active > lim)
2836                 printk(KERN_WARNING "workqueue: max_active %d requested for %s "
2837                        "is out of range, clamping between %d and %d\n",
2838                        max_active, name, 1, lim);
2839
2840         return clamp_val(max_active, 1, lim);
2841 }
2842
2843 struct workqueue_struct *__alloc_workqueue_key(const char *name,
2844                                                unsigned int flags,
2845                                                int max_active,
2846                                                struct lock_class_key *key,
2847                                                const char *lock_name)
2848 {
2849         struct workqueue_struct *wq;
2850         unsigned int cpu;
2851
2852         /*
2853          * Workqueues which may be used during memory reclaim should
2854          * have a rescuer to guarantee forward progress.
2855          */
2856         if (flags & WQ_MEM_RECLAIM)
2857                 flags |= WQ_RESCUER;
2858
2859         /*
2860          * Unbound workqueues aren't concurrency managed and should be
2861          * dispatched to workers immediately.
2862          */
2863         if (flags & WQ_UNBOUND)
2864                 flags |= WQ_HIGHPRI;
2865
2866         max_active = max_active ?: WQ_DFL_ACTIVE;
2867         max_active = wq_clamp_max_active(max_active, flags, name);
2868
2869         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
2870         if (!wq)
2871                 goto err;
2872
2873         wq->flags = flags;
2874         wq->saved_max_active = max_active;
2875         mutex_init(&wq->flush_mutex);
2876         atomic_set(&wq->nr_cwqs_to_flush, 0);
2877         INIT_LIST_HEAD(&wq->flusher_queue);
2878         INIT_LIST_HEAD(&wq->flusher_overflow);
2879
2880         wq->name = name;
2881         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
2882         INIT_LIST_HEAD(&wq->list);
2883
2884         if (alloc_cwqs(wq) < 0)
2885                 goto err;
2886
2887         for_each_cwq_cpu(cpu, wq) {
2888                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2889                 struct global_cwq *gcwq = get_gcwq(cpu);
2890
2891                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
2892                 cwq->gcwq = gcwq;
2893                 cwq->wq = wq;
2894                 cwq->flush_color = -1;
2895                 cwq->max_active = max_active;
2896                 INIT_LIST_HEAD(&cwq->delayed_works);
2897         }
2898
2899         if (flags & WQ_RESCUER) {
2900                 struct worker *rescuer;
2901
2902                 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
2903                         goto err;
2904
2905                 wq->rescuer = rescuer = alloc_worker();
2906                 if (!rescuer)
2907                         goto err;
2908
2909                 rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
2910                 if (IS_ERR(rescuer->task))
2911                         goto err;
2912
2913                 rescuer->task->flags |= PF_THREAD_BOUND;
2914                 wake_up_process(rescuer->task);
2915         }
2916
2917         /*
2918          * workqueue_lock protects global freeze state and workqueues
2919          * list.  Grab it, set max_active accordingly and add the new
2920          * workqueue to workqueues list.
2921          */
2922         spin_lock(&workqueue_lock);
2923
2924         if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
2925                 for_each_cwq_cpu(cpu, wq)
2926                         get_cwq(cpu, wq)->max_active = 0;
2927
2928         list_add(&wq->list, &workqueues);
2929
2930         spin_unlock(&workqueue_lock);
2931
2932         return wq;
2933 err:
2934         if (wq) {
2935                 free_cwqs(wq);
2936                 free_mayday_mask(wq->mayday_mask);
2937                 kfree(wq->rescuer);
2938                 kfree(wq);
2939         }
2940         return NULL;
2941 }
2942 EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
2943
2944 /**
2945  * destroy_workqueue - safely terminate a workqueue
2946  * @wq: target workqueue
2947  *
2948  * Safely destroy a workqueue. All work currently pending will be done first.
2949  */
2950 void destroy_workqueue(struct workqueue_struct *wq)
2951 {
2952         unsigned int cpu;
2953
2954         wq->flags |= WQ_DYING;
2955         flush_workqueue(wq);
2956
2957         /*
2958          * wq list is used to freeze wq, remove from list after
2959          * flushing is complete in case freeze races us.
2960          */
2961         spin_lock(&workqueue_lock);
2962         list_del(&wq->list);
2963         spin_unlock(&workqueue_lock);
2964
2965         /* sanity check */
2966         for_each_cwq_cpu(cpu, wq) {
2967                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2968                 int i;
2969
2970                 for (i = 0; i < WORK_NR_COLORS; i++)
2971                         BUG_ON(cwq->nr_in_flight[i]);
2972                 BUG_ON(cwq->nr_active);
2973                 BUG_ON(!list_empty(&cwq->delayed_works));
2974         }
2975
2976         if (wq->flags & WQ_RESCUER) {
2977                 kthread_stop(wq->rescuer->task);
2978                 free_mayday_mask(wq->mayday_mask);
2979                 kfree(wq->rescuer);
2980         }
2981
2982         free_cwqs(wq);
2983         kfree(wq);
2984 }
2985 EXPORT_SYMBOL_GPL(destroy_workqueue);
2986
2987 /**
2988  * workqueue_set_max_active - adjust max_active of a workqueue
2989  * @wq: target workqueue
2990  * @max_active: new max_active value.
2991  *
2992  * Set max_active of @wq to @max_active.
2993  *
2994  * CONTEXT:
2995  * Don't call from IRQ context.
2996  */
2997 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
2998 {
2999         unsigned int cpu;
3000
3001         max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
3002
3003         spin_lock(&workqueue_lock);
3004
3005         wq->saved_max_active = max_active;
3006
3007         for_each_cwq_cpu(cpu, wq) {
3008                 struct global_cwq *gcwq = get_gcwq(cpu);
3009
3010                 spin_lock_irq(&gcwq->lock);
3011
3012                 if (!(wq->flags & WQ_FREEZEABLE) ||
3013                     !(gcwq->flags & GCWQ_FREEZING))
3014                         get_cwq(gcwq->cpu, wq)->max_active = max_active;
3015
3016                 spin_unlock_irq(&gcwq->lock);
3017         }
3018
3019         spin_unlock(&workqueue_lock);
3020 }
3021 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
3022
3023 /**
3024  * workqueue_congested - test whether a workqueue is congested
3025  * @cpu: CPU in question
3026  * @wq: target workqueue
3027  *
3028  * Test whether @wq's cpu workqueue for @cpu is congested.  There is
3029  * no synchronization around this function and the test result is
3030  * unreliable and only useful as advisory hints or for debugging.
3031  *
3032  * RETURNS:
3033  * %true if congested, %false otherwise.
3034  */
3035 bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
3036 {
3037         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3038
3039         return !list_empty(&cwq->delayed_works);
3040 }
3041 EXPORT_SYMBOL_GPL(workqueue_congested);
3042
3043 /**
3044  * work_cpu - return the last known associated cpu for @work
3045  * @work: the work of interest
3046  *
3047  * RETURNS:
3048  * CPU number if @work was ever queued.  WORK_CPU_NONE otherwise.
3049  */
3050 unsigned int work_cpu(struct work_struct *work)
3051 {
3052         struct global_cwq *gcwq = get_work_gcwq(work);
3053
3054         return gcwq ? gcwq->cpu : WORK_CPU_NONE;
3055 }
3056 EXPORT_SYMBOL_GPL(work_cpu);
3057
3058 /**
3059  * work_busy - test whether a work is currently pending or running
3060  * @work: the work to be tested
3061  *
3062  * Test whether @work is currently pending or running.  There is no
3063  * synchronization around this function and the test result is
3064  * unreliable and only useful as advisory hints or for debugging.
3065  * Especially for reentrant wqs, the pending state might hide the
3066  * running state.
3067  *
3068  * RETURNS:
3069  * OR'd bitmask of WORK_BUSY_* bits.
3070  */
3071 unsigned int work_busy(struct work_struct *work)
3072 {
3073         struct global_cwq *gcwq = get_work_gcwq(work);
3074         unsigned long flags;
3075         unsigned int ret = 0;
3076
3077         if (!gcwq)
3078                 return false;
3079
3080         spin_lock_irqsave(&gcwq->lock, flags);
3081
3082         if (work_pending(work))
3083                 ret |= WORK_BUSY_PENDING;
3084         if (find_worker_executing_work(gcwq, work))
3085                 ret |= WORK_BUSY_RUNNING;
3086
3087         spin_unlock_irqrestore(&gcwq->lock, flags);
3088
3089         return ret;
3090 }
3091 EXPORT_SYMBOL_GPL(work_busy);
3092
3093 /*
3094  * CPU hotplug.
3095  *
3096  * There are two challenges in supporting CPU hotplug.  Firstly, there
3097  * are a lot of assumptions on strong associations among work, cwq and
3098  * gcwq which make migrating pending and scheduled works very
3099  * difficult to implement without impacting hot paths.  Secondly,
3100  * gcwqs serve mix of short, long and very long running works making
3101  * blocked draining impractical.
3102  *
3103  * This is solved by allowing a gcwq to be detached from CPU, running
3104  * it with unbound (rogue) workers and allowing it to be reattached
3105  * later if the cpu comes back online.  A separate thread is created
3106  * to govern a gcwq in such state and is called the trustee of the
3107  * gcwq.
3108  *
3109  * Trustee states and their descriptions.
3110  *
3111  * START        Command state used on startup.  On CPU_DOWN_PREPARE, a
3112  *              new trustee is started with this state.
3113  *
3114  * IN_CHARGE    Once started, trustee will enter this state after
3115  *              assuming the manager role and making all existing
3116  *              workers rogue.  DOWN_PREPARE waits for trustee to
3117  *              enter this state.  After reaching IN_CHARGE, trustee
3118  *              tries to execute the pending worklist until it's empty
3119  *              and the state is set to BUTCHER, or the state is set
3120  *              to RELEASE.
3121  *
3122  * BUTCHER      Command state which is set by the cpu callback after
3123  *              the cpu has went down.  Once this state is set trustee
3124  *              knows that there will be no new works on the worklist
3125  *              and once the worklist is empty it can proceed to
3126  *              killing idle workers.
3127  *
3128  * RELEASE      Command state which is set by the cpu callback if the
3129  *              cpu down has been canceled or it has come online
3130  *              again.  After recognizing this state, trustee stops
3131  *              trying to drain or butcher and clears ROGUE, rebinds
3132  *              all remaining workers back to the cpu and releases
3133  *              manager role.
3134  *
3135  * DONE         Trustee will enter this state after BUTCHER or RELEASE
3136  *              is complete.
3137  *
3138  *          trustee                 CPU                draining
3139  *         took over                down               complete
3140  * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
3141  *                        |                     |                  ^
3142  *                        | CPU is back online  v   return workers |
3143  *                         ----------------> RELEASE --------------
3144  */
3145
3146 /**
3147  * trustee_wait_event_timeout - timed event wait for trustee
3148  * @cond: condition to wait for
3149  * @timeout: timeout in jiffies
3150  *
3151  * wait_event_timeout() for trustee to use.  Handles locking and
3152  * checks for RELEASE request.
3153  *
3154  * CONTEXT:
3155  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3156  * multiple times.  To be used by trustee.
3157  *
3158  * RETURNS:
3159  * Positive indicating left time if @cond is satisfied, 0 if timed
3160  * out, -1 if canceled.
3161  */
3162 #define trustee_wait_event_timeout(cond, timeout) ({                    \
3163         long __ret = (timeout);                                         \
3164         while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
3165                __ret) {                                                 \
3166                 spin_unlock_irq(&gcwq->lock);                           \
3167                 __wait_event_timeout(gcwq->trustee_wait, (cond) ||      \
3168                         (gcwq->trustee_state == TRUSTEE_RELEASE),       \
3169                         __ret);                                         \
3170                 spin_lock_irq(&gcwq->lock);                             \
3171         }                                                               \
3172         gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);          \
3173 })
3174
3175 /**
3176  * trustee_wait_event - event wait for trustee
3177  * @cond: condition to wait for
3178  *
3179  * wait_event() for trustee to use.  Automatically handles locking and
3180  * checks for CANCEL request.
3181  *
3182  * CONTEXT:
3183  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3184  * multiple times.  To be used by trustee.
3185  *
3186  * RETURNS:
3187  * 0 if @cond is satisfied, -1 if canceled.
3188  */
3189 #define trustee_wait_event(cond) ({                                     \
3190         long __ret1;                                                    \
3191         __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
3192         __ret1 < 0 ? -1 : 0;                                            \
3193 })
3194
3195 static int __cpuinit trustee_thread(void *__gcwq)
3196 {
3197         struct global_cwq *gcwq = __gcwq;
3198         struct worker *worker;
3199         struct work_struct *work;
3200         struct hlist_node *pos;
3201         long rc;
3202         int i;
3203
3204         BUG_ON(gcwq->cpu != smp_processor_id());
3205
3206         spin_lock_irq(&gcwq->lock);
3207         /*
3208          * Claim the manager position and make all workers rogue.
3209          * Trustee must be bound to the target cpu and can't be
3210          * cancelled.
3211          */
3212         BUG_ON(gcwq->cpu != smp_processor_id());
3213         rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
3214         BUG_ON(rc < 0);
3215
3216         gcwq->flags |= GCWQ_MANAGING_WORKERS;
3217
3218         list_for_each_entry(worker, &gcwq->idle_list, entry)
3219                 worker->flags |= WORKER_ROGUE;
3220
3221         for_each_busy_worker(worker, i, pos, gcwq)
3222                 worker->flags |= WORKER_ROGUE;
3223
3224         /*
3225          * Call schedule() so that we cross rq->lock and thus can
3226          * guarantee sched callbacks see the rogue flag.  This is
3227          * necessary as scheduler callbacks may be invoked from other
3228          * cpus.
3229          */
3230         spin_unlock_irq(&gcwq->lock);
3231         schedule();
3232         spin_lock_irq(&gcwq->lock);
3233
3234         /*
3235          * Sched callbacks are disabled now.  Zap nr_running.  After
3236          * this, nr_running stays zero and need_more_worker() and
3237          * keep_working() are always true as long as the worklist is
3238          * not empty.
3239          */
3240         atomic_set(get_gcwq_nr_running(gcwq->cpu), 0);
3241
3242         spin_unlock_irq(&gcwq->lock);
3243         del_timer_sync(&gcwq->idle_timer);
3244         spin_lock_irq(&gcwq->lock);
3245
3246         /*
3247          * We're now in charge.  Notify and proceed to drain.  We need
3248          * to keep the gcwq running during the whole CPU down
3249          * procedure as other cpu hotunplug callbacks may need to
3250          * flush currently running tasks.
3251          */
3252         gcwq->trustee_state = TRUSTEE_IN_CHARGE;
3253         wake_up_all(&gcwq->trustee_wait);
3254
3255         /*
3256          * The original cpu is in the process of dying and may go away
3257          * anytime now.  When that happens, we and all workers would
3258          * be migrated to other cpus.  Try draining any left work.  We
3259          * want to get it over with ASAP - spam rescuers, wake up as
3260          * many idlers as necessary and create new ones till the
3261          * worklist is empty.  Note that if the gcwq is frozen, there
3262          * may be frozen works in freezeable cwqs.  Don't declare
3263          * completion while frozen.
3264          */
3265         while (gcwq->nr_workers != gcwq->nr_idle ||
3266                gcwq->flags & GCWQ_FREEZING ||
3267                gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
3268                 int nr_works = 0;
3269
3270                 list_for_each_entry(work, &gcwq->worklist, entry) {
3271                         send_mayday(work);
3272                         nr_works++;
3273                 }
3274
3275                 list_for_each_entry(worker, &gcwq->idle_list, entry) {
3276                         if (!nr_works--)
3277                                 break;
3278                         wake_up_process(worker->task);
3279                 }
3280
3281                 if (need_to_create_worker(gcwq)) {
3282                         spin_unlock_irq(&gcwq->lock);
3283                         worker = create_worker(gcwq, false);
3284                         spin_lock_irq(&gcwq->lock);
3285                         if (worker) {
3286                                 worker->flags |= WORKER_ROGUE;
3287                                 start_worker(worker);
3288                         }
3289                 }
3290
3291                 /* give a breather */
3292                 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
3293                         break;
3294         }
3295
3296         /*
3297          * Either all works have been scheduled and cpu is down, or
3298          * cpu down has already been canceled.  Wait for and butcher
3299          * all workers till we're canceled.
3300          */
3301         do {
3302                 rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
3303                 while (!list_empty(&gcwq->idle_list))
3304                         destroy_worker(list_first_entry(&gcwq->idle_list,
3305                                                         struct worker, entry));
3306         } while (gcwq->nr_workers && rc >= 0);
3307
3308         /*
3309          * At this point, either draining has completed and no worker
3310          * is left, or cpu down has been canceled or the cpu is being
3311          * brought back up.  There shouldn't be any idle one left.
3312          * Tell the remaining busy ones to rebind once it finishes the
3313          * currently scheduled works by scheduling the rebind_work.
3314          */
3315         WARN_ON(!list_empty(&gcwq->idle_list));
3316
3317         for_each_busy_worker(worker, i, pos, gcwq) {
3318                 struct work_struct *rebind_work = &worker->rebind_work;
3319
3320                 /*
3321                  * Rebind_work may race with future cpu hotplug
3322                  * operations.  Use a separate flag to mark that
3323                  * rebinding is scheduled.
3324                  */
3325                 worker->flags |= WORKER_REBIND;
3326                 worker->flags &= ~WORKER_ROGUE;
3327
3328                 /* queue rebind_work, wq doesn't matter, use the default one */
3329                 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
3330                                      work_data_bits(rebind_work)))
3331                         continue;
3332
3333                 debug_work_activate(rebind_work);
3334                 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
3335                             worker->scheduled.next,
3336                             work_color_to_flags(WORK_NO_COLOR));
3337         }
3338
3339         /* relinquish manager role */
3340         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
3341
3342         /* notify completion */
3343         gcwq->trustee = NULL;
3344         gcwq->trustee_state = TRUSTEE_DONE;
3345         wake_up_all(&gcwq->trustee_wait);
3346         spin_unlock_irq(&gcwq->lock);
3347         return 0;
3348 }
3349
3350 /**
3351  * wait_trustee_state - wait for trustee to enter the specified state
3352  * @gcwq: gcwq the trustee of interest belongs to
3353  * @state: target state to wait for
3354  *
3355  * Wait for the trustee to reach @state.  DONE is already matched.
3356  *
3357  * CONTEXT:
3358  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3359  * multiple times.  To be used by cpu_callback.
3360  */
3361 static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
3362 __releases(&gcwq->lock)
3363 __acquires(&gcwq->lock)
3364 {
3365         if (!(gcwq->trustee_state == state ||
3366               gcwq->trustee_state == TRUSTEE_DONE)) {
3367                 spin_unlock_irq(&gcwq->lock);
3368                 __wait_event(gcwq->trustee_wait,
3369                              gcwq->trustee_state == state ||
3370                              gcwq->trustee_state == TRUSTEE_DONE);
3371                 spin_lock_irq(&gcwq->lock);
3372         }
3373 }
3374
3375 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
3376                                                 unsigned long action,
3377                                                 void *hcpu)
3378 {
3379         unsigned int cpu = (unsigned long)hcpu;
3380         struct global_cwq *gcwq = get_gcwq(cpu);
3381         struct task_struct *new_trustee = NULL;
3382         struct worker *uninitialized_var(new_worker);
3383         unsigned long flags;
3384
3385         action &= ~CPU_TASKS_FROZEN;
3386
3387         switch (action) {
3388         case CPU_DOWN_PREPARE:
3389                 new_trustee = kthread_create(trustee_thread, gcwq,
3390                                              "workqueue_trustee/%d\n", cpu);
3391                 if (IS_ERR(new_trustee))
3392                         return notifier_from_errno(PTR_ERR(new_trustee));
3393                 kthread_bind(new_trustee, cpu);
3394                 /* fall through */
3395         case CPU_UP_PREPARE:
3396                 BUG_ON(gcwq->first_idle);
3397                 new_worker = create_worker(gcwq, false);
3398                 if (!new_worker) {
3399                         if (new_trustee)
3400                                 kthread_stop(new_trustee);
3401                         return NOTIFY_BAD;
3402                 }
3403         }
3404
3405         /* some are called w/ irq disabled, don't disturb irq status */
3406         spin_lock_irqsave(&gcwq->lock, flags);
3407
3408         switch (action) {
3409         case CPU_DOWN_PREPARE:
3410                 /* initialize trustee and tell it to acquire the gcwq */
3411                 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
3412                 gcwq->trustee = new_trustee;
3413                 gcwq->trustee_state = TRUSTEE_START;
3414                 wake_up_process(gcwq->trustee);
3415                 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
3416                 /* fall through */
3417         case CPU_UP_PREPARE:
3418                 BUG_ON(gcwq->first_idle);
3419                 gcwq->first_idle = new_worker;
3420                 break;
3421
3422         case CPU_DYING:
3423                 /*
3424                  * Before this, the trustee and all workers except for
3425                  * the ones which are still executing works from
3426                  * before the last CPU down must be on the cpu.  After
3427                  * this, they'll all be diasporas.
3428                  */
3429                 gcwq->flags |= GCWQ_DISASSOCIATED;
3430                 break;
3431
3432         case CPU_POST_DEAD:
3433                 gcwq->trustee_state = TRUSTEE_BUTCHER;
3434                 /* fall through */
3435         case CPU_UP_CANCELED:
3436                 destroy_worker(gcwq->first_idle);
3437                 gcwq->first_idle = NULL;
3438                 break;
3439
3440         case CPU_DOWN_FAILED:
3441         case CPU_ONLINE:
3442                 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3443                 if (gcwq->trustee_state != TRUSTEE_DONE) {
3444                         gcwq->trustee_state = TRUSTEE_RELEASE;
3445                         wake_up_process(gcwq->trustee);
3446                         wait_trustee_state(gcwq, TRUSTEE_DONE);
3447                 }
3448
3449                 /*
3450                  * Trustee is done and there might be no worker left.
3451                  * Put the first_idle in and request a real manager to
3452                  * take a look.
3453                  */
3454                 spin_unlock_irq(&gcwq->lock);
3455                 kthread_bind(gcwq->first_idle->task, cpu);
3456                 spin_lock_irq(&gcwq->lock);
3457                 gcwq->flags |= GCWQ_MANAGE_WORKERS;
3458                 start_worker(gcwq->first_idle);
3459                 gcwq->first_idle = NULL;
3460                 break;
3461         }
3462
3463         spin_unlock_irqrestore(&gcwq->lock, flags);
3464
3465         return notifier_from_errno(0);
3466 }
3467
3468 #ifdef CONFIG_SMP
3469
3470 struct work_for_cpu {
3471         struct completion completion;
3472         long (*fn)(void *);
3473         void *arg;
3474         long ret;
3475 };
3476
3477 static int do_work_for_cpu(void *_wfc)
3478 {
3479         struct work_for_cpu *wfc = _wfc;
3480         wfc->ret = wfc->fn(wfc->arg);
3481         complete(&wfc->completion);
3482         return 0;
3483 }
3484
3485 /**
3486  * work_on_cpu - run a function in user context on a particular cpu
3487  * @cpu: the cpu to run on
3488  * @fn: the function to run
3489  * @arg: the function arg
3490  *
3491  * This will return the value @fn returns.
3492  * It is up to the caller to ensure that the cpu doesn't go offline.
3493  * The caller must not hold any locks which would prevent @fn from completing.
3494  */
3495 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
3496 {
3497         struct task_struct *sub_thread;
3498         struct work_for_cpu wfc = {
3499                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
3500                 .fn = fn,
3501                 .arg = arg,
3502         };
3503
3504         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
3505         if (IS_ERR(sub_thread))
3506                 return PTR_ERR(sub_thread);
3507         kthread_bind(sub_thread, cpu);
3508         wake_up_process(sub_thread);
3509         wait_for_completion(&wfc.completion);
3510         return wfc.ret;
3511 }
3512 EXPORT_SYMBOL_GPL(work_on_cpu);
3513 #endif /* CONFIG_SMP */
3514
3515 #ifdef CONFIG_FREEZER
3516
3517 /**
3518  * freeze_workqueues_begin - begin freezing workqueues
3519  *
3520  * Start freezing workqueues.  After this function returns, all
3521  * freezeable workqueues will queue new works to their frozen_works
3522  * list instead of gcwq->worklist.
3523  *
3524  * CONTEXT:
3525  * Grabs and releases workqueue_lock and gcwq->lock's.
3526  */
3527 void freeze_workqueues_begin(void)
3528 {
3529         unsigned int cpu;
3530
3531         spin_lock(&workqueue_lock);
3532
3533         BUG_ON(workqueue_freezing);
3534         workqueue_freezing = true;
3535
3536         for_each_gcwq_cpu(cpu) {
3537                 struct global_cwq *gcwq = get_gcwq(cpu);
3538                 struct workqueue_struct *wq;
3539
3540                 spin_lock_irq(&gcwq->lock);
3541
3542                 BUG_ON(gcwq->flags & GCWQ_FREEZING);
3543                 gcwq->flags |= GCWQ_FREEZING;
3544
3545                 list_for_each_entry(wq, &workqueues, list) {
3546                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3547
3548                         if (cwq && wq->flags & WQ_FREEZEABLE)
3549                                 cwq->max_active = 0;
3550                 }
3551
3552                 spin_unlock_irq(&gcwq->lock);
3553         }
3554
3555         spin_unlock(&workqueue_lock);
3556 }
3557
3558 /**
3559  * freeze_workqueues_busy - are freezeable workqueues still busy?
3560  *
3561  * Check whether freezing is complete.  This function must be called
3562  * between freeze_workqueues_begin() and thaw_workqueues().
3563  *
3564  * CONTEXT:
3565  * Grabs and releases workqueue_lock.
3566  *
3567  * RETURNS:
3568  * %true if some freezeable workqueues are still busy.  %false if
3569  * freezing is complete.
3570  */
3571 bool freeze_workqueues_busy(void)
3572 {
3573         unsigned int cpu;
3574         bool busy = false;
3575
3576         spin_lock(&workqueue_lock);
3577
3578         BUG_ON(!workqueue_freezing);
3579
3580         for_each_gcwq_cpu(cpu) {
3581                 struct workqueue_struct *wq;
3582                 /*
3583                  * nr_active is monotonically decreasing.  It's safe
3584                  * to peek without lock.
3585                  */
3586                 list_for_each_entry(wq, &workqueues, list) {
3587                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3588
3589                         if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3590                                 continue;
3591
3592                         BUG_ON(cwq->nr_active < 0);
3593                         if (cwq->nr_active) {
3594                                 busy = true;
3595                                 goto out_unlock;
3596                         }
3597                 }
3598         }
3599 out_unlock:
3600         spin_unlock(&workqueue_lock);
3601         return busy;
3602 }
3603
3604 /**
3605  * thaw_workqueues - thaw workqueues
3606  *
3607  * Thaw workqueues.  Normal queueing is restored and all collected
3608  * frozen works are transferred to their respective gcwq worklists.
3609  *
3610  * CONTEXT:
3611  * Grabs and releases workqueue_lock and gcwq->lock's.
3612  */
3613 void thaw_workqueues(void)
3614 {
3615         unsigned int cpu;
3616
3617         spin_lock(&workqueue_lock);
3618
3619         if (!workqueue_freezing)
3620                 goto out_unlock;
3621
3622         for_each_gcwq_cpu(cpu) {
3623                 struct global_cwq *gcwq = get_gcwq(cpu);
3624                 struct workqueue_struct *wq;
3625
3626                 spin_lock_irq(&gcwq->lock);
3627
3628                 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
3629                 gcwq->flags &= ~GCWQ_FREEZING;
3630
3631                 list_for_each_entry(wq, &workqueues, list) {
3632                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3633
3634                         if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3635                                 continue;
3636
3637                         /* restore max_active and repopulate worklist */
3638                         cwq->max_active = wq->saved_max_active;
3639
3640                         while (!list_empty(&cwq->delayed_works) &&
3641                                cwq->nr_active < cwq->max_active)
3642                                 cwq_activate_first_delayed(cwq);
3643                 }
3644
3645                 wake_up_worker(gcwq);
3646
3647                 spin_unlock_irq(&gcwq->lock);
3648         }
3649
3650         workqueue_freezing = false;
3651 out_unlock:
3652         spin_unlock(&workqueue_lock);
3653 }
3654 #endif /* CONFIG_FREEZER */
3655
3656 static int __init init_workqueues(void)
3657 {
3658         unsigned int cpu;
3659         int i;
3660
3661         cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
3662
3663         /* initialize gcwqs */
3664         for_each_gcwq_cpu(cpu) {
3665                 struct global_cwq *gcwq = get_gcwq(cpu);
3666
3667                 spin_lock_init(&gcwq->lock);
3668                 INIT_LIST_HEAD(&gcwq->worklist);
3669                 gcwq->cpu = cpu;
3670                 gcwq->flags |= GCWQ_DISASSOCIATED;
3671
3672                 INIT_LIST_HEAD(&gcwq->idle_list);
3673                 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
3674                         INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
3675
3676                 init_timer_deferrable(&gcwq->idle_timer);
3677                 gcwq->idle_timer.function = idle_worker_timeout;
3678                 gcwq->idle_timer.data = (unsigned long)gcwq;
3679
3680                 setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
3681                             (unsigned long)gcwq);
3682
3683                 ida_init(&gcwq->worker_ida);
3684
3685                 gcwq->trustee_state = TRUSTEE_DONE;
3686                 init_waitqueue_head(&gcwq->trustee_wait);
3687         }
3688
3689         /* create the initial worker */
3690         for_each_online_gcwq_cpu(cpu) {
3691                 struct global_cwq *gcwq = get_gcwq(cpu);
3692                 struct worker *worker;
3693
3694                 if (cpu != WORK_CPU_UNBOUND)
3695                         gcwq->flags &= ~GCWQ_DISASSOCIATED;
3696                 worker = create_worker(gcwq, true);
3697                 BUG_ON(!worker);
3698                 spin_lock_irq(&gcwq->lock);
3699                 start_worker(worker);
3700                 spin_unlock_irq(&gcwq->lock);
3701         }
3702
3703         system_wq = alloc_workqueue("events", 0, 0);
3704         system_long_wq = alloc_workqueue("events_long", 0, 0);
3705         system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3706         system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3707                                             WQ_UNBOUND_MAX_ACTIVE);
3708         BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
3709         return 0;
3710 }
3711 early_initcall(init_workqueues);