]> nv-tegra.nvidia Code Review - linux-2.6.git/blob - kernel/workqueue.c
workqueue: mark init_workqueues() as early_initcall()
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h>
36 #include <linux/idr.h>
37
38 #include "workqueue_sched.h"
39
40 enum {
41         /* global_cwq flags */
42         GCWQ_MANAGE_WORKERS     = 1 << 0,       /* need to manage workers */
43         GCWQ_MANAGING_WORKERS   = 1 << 1,       /* managing workers */
44         GCWQ_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
45         GCWQ_FREEZING           = 1 << 3,       /* freeze in progress */
46         GCWQ_HIGHPRI_PENDING    = 1 << 4,       /* highpri works on queue */
47
48         /* worker flags */
49         WORKER_STARTED          = 1 << 0,       /* started */
50         WORKER_DIE              = 1 << 1,       /* die die die */
51         WORKER_IDLE             = 1 << 2,       /* is idle */
52         WORKER_PREP             = 1 << 3,       /* preparing to run works */
53         WORKER_ROGUE            = 1 << 4,       /* not bound to any cpu */
54         WORKER_REBIND           = 1 << 5,       /* mom is home, come back */
55         WORKER_CPU_INTENSIVE    = 1 << 6,       /* cpu intensive */
56         WORKER_UNBOUND          = 1 << 7,       /* worker is unbound */
57
58         WORKER_NOT_RUNNING      = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
59                                   WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
60
61         /* gcwq->trustee_state */
62         TRUSTEE_START           = 0,            /* start */
63         TRUSTEE_IN_CHARGE       = 1,            /* trustee in charge of gcwq */
64         TRUSTEE_BUTCHER         = 2,            /* butcher workers */
65         TRUSTEE_RELEASE         = 3,            /* release workers */
66         TRUSTEE_DONE            = 4,            /* trustee is done */
67
68         BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
69         BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
70         BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
71
72         MAX_IDLE_WORKERS_RATIO  = 4,            /* 1/4 of busy can be idle */
73         IDLE_WORKER_TIMEOUT     = 300 * HZ,     /* keep idle ones for 5 mins */
74
75         MAYDAY_INITIAL_TIMEOUT  = HZ / 100,     /* call for help after 10ms */
76         MAYDAY_INTERVAL         = HZ / 10,      /* and then every 100ms */
77         CREATE_COOLDOWN         = HZ,           /* time to breath after fail */
78         TRUSTEE_COOLDOWN        = HZ / 10,      /* for trustee draining */
79
80         /*
81          * Rescue workers are used only on emergencies and shared by
82          * all cpus.  Give -20.
83          */
84         RESCUER_NICE_LEVEL      = -20,
85 };
86
87 /*
88  * Structure fields follow one of the following exclusion rules.
89  *
90  * I: Set during initialization and read-only afterwards.
91  *
92  * P: Preemption protected.  Disabling preemption is enough and should
93  *    only be modified and accessed from the local cpu.
94  *
95  * L: gcwq->lock protected.  Access with gcwq->lock held.
96  *
97  * X: During normal operation, modification requires gcwq->lock and
98  *    should be done only from local cpu.  Either disabling preemption
99  *    on local cpu or grabbing gcwq->lock is enough for read access.
100  *    If GCWQ_DISASSOCIATED is set, it's identical to L.
101  *
102  * F: wq->flush_mutex protected.
103  *
104  * W: workqueue_lock protected.
105  */
106
107 struct global_cwq;
108
109 /*
110  * The poor guys doing the actual heavy lifting.  All on-duty workers
111  * are either serving the manager role, on idle list or on busy hash.
112  */
113 struct worker {
114         /* on idle list while idle, on busy hash table while busy */
115         union {
116                 struct list_head        entry;  /* L: while idle */
117                 struct hlist_node       hentry; /* L: while busy */
118         };
119
120         struct work_struct      *current_work;  /* L: work being processed */
121         struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
122         struct list_head        scheduled;      /* L: scheduled works */
123         struct task_struct      *task;          /* I: worker task */
124         struct global_cwq       *gcwq;          /* I: the associated gcwq */
125         /* 64 bytes boundary on 64bit, 32 on 32bit */
126         unsigned long           last_active;    /* L: last active timestamp */
127         unsigned int            flags;          /* X: flags */
128         int                     id;             /* I: worker id */
129         struct work_struct      rebind_work;    /* L: rebind worker to cpu */
130 };
131
132 /*
133  * Global per-cpu workqueue.  There's one and only one for each cpu
134  * and all works are queued and processed here regardless of their
135  * target workqueues.
136  */
137 struct global_cwq {
138         spinlock_t              lock;           /* the gcwq lock */
139         struct list_head        worklist;       /* L: list of pending works */
140         unsigned int            cpu;            /* I: the associated cpu */
141         unsigned int            flags;          /* L: GCWQ_* flags */
142
143         int                     nr_workers;     /* L: total number of workers */
144         int                     nr_idle;        /* L: currently idle ones */
145
146         /* workers are chained either in the idle_list or busy_hash */
147         struct list_head        idle_list;      /* X: list of idle workers */
148         struct hlist_head       busy_hash[BUSY_WORKER_HASH_SIZE];
149                                                 /* L: hash of busy workers */
150
151         struct timer_list       idle_timer;     /* L: worker idle timeout */
152         struct timer_list       mayday_timer;   /* L: SOS timer for dworkers */
153
154         struct ida              worker_ida;     /* L: for worker IDs */
155
156         struct task_struct      *trustee;       /* L: for gcwq shutdown */
157         unsigned int            trustee_state;  /* L: trustee state */
158         wait_queue_head_t       trustee_wait;   /* trustee wait */
159         struct worker           *first_idle;    /* L: first idle worker */
160 } ____cacheline_aligned_in_smp;
161
162 /*
163  * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
164  * work_struct->data are used for flags and thus cwqs need to be
165  * aligned at two's power of the number of flag bits.
166  */
167 struct cpu_workqueue_struct {
168         struct global_cwq       *gcwq;          /* I: the associated gcwq */
169         struct workqueue_struct *wq;            /* I: the owning workqueue */
170         int                     work_color;     /* L: current color */
171         int                     flush_color;    /* L: flushing color */
172         int                     nr_in_flight[WORK_NR_COLORS];
173                                                 /* L: nr of in_flight works */
174         int                     nr_active;      /* L: nr of active works */
175         int                     max_active;     /* L: max active works */
176         struct list_head        delayed_works;  /* L: delayed works */
177 };
178
179 /*
180  * Structure used to wait for workqueue flush.
181  */
182 struct wq_flusher {
183         struct list_head        list;           /* F: list of flushers */
184         int                     flush_color;    /* F: flush color waiting for */
185         struct completion       done;           /* flush completion */
186 };
187
188 /*
189  * All cpumasks are assumed to be always set on UP and thus can't be
190  * used to determine whether there's something to be done.
191  */
192 #ifdef CONFIG_SMP
193 typedef cpumask_var_t mayday_mask_t;
194 #define mayday_test_and_set_cpu(cpu, mask)      \
195         cpumask_test_and_set_cpu((cpu), (mask))
196 #define mayday_clear_cpu(cpu, mask)             cpumask_clear_cpu((cpu), (mask))
197 #define for_each_mayday_cpu(cpu, mask)          for_each_cpu((cpu), (mask))
198 #define alloc_mayday_mask(maskp, gfp)           alloc_cpumask_var((maskp), (gfp))
199 #define free_mayday_mask(mask)                  free_cpumask_var((mask))
200 #else
201 typedef unsigned long mayday_mask_t;
202 #define mayday_test_and_set_cpu(cpu, mask)      test_and_set_bit(0, &(mask))
203 #define mayday_clear_cpu(cpu, mask)             clear_bit(0, &(mask))
204 #define for_each_mayday_cpu(cpu, mask)          if ((cpu) = 0, (mask))
205 #define alloc_mayday_mask(maskp, gfp)           true
206 #define free_mayday_mask(mask)                  do { } while (0)
207 #endif
208
209 /*
210  * The externally visible workqueue abstraction is an array of
211  * per-CPU workqueues:
212  */
213 struct workqueue_struct {
214         unsigned int            flags;          /* I: WQ_* flags */
215         union {
216                 struct cpu_workqueue_struct __percpu    *pcpu;
217                 struct cpu_workqueue_struct             *single;
218                 unsigned long                           v;
219         } cpu_wq;                               /* I: cwq's */
220         struct list_head        list;           /* W: list of all workqueues */
221
222         struct mutex            flush_mutex;    /* protects wq flushing */
223         int                     work_color;     /* F: current work color */
224         int                     flush_color;    /* F: current flush color */
225         atomic_t                nr_cwqs_to_flush; /* flush in progress */
226         struct wq_flusher       *first_flusher; /* F: first flusher */
227         struct list_head        flusher_queue;  /* F: flush waiters */
228         struct list_head        flusher_overflow; /* F: flush overflow list */
229
230         mayday_mask_t           mayday_mask;    /* cpus requesting rescue */
231         struct worker           *rescuer;       /* I: rescue worker */
232
233         int                     saved_max_active; /* W: saved cwq max_active */
234         const char              *name;          /* I: workqueue name */
235 #ifdef CONFIG_LOCKDEP
236         struct lockdep_map      lockdep_map;
237 #endif
238 };
239
240 struct workqueue_struct *system_wq __read_mostly;
241 struct workqueue_struct *system_long_wq __read_mostly;
242 struct workqueue_struct *system_nrt_wq __read_mostly;
243 struct workqueue_struct *system_unbound_wq __read_mostly;
244 EXPORT_SYMBOL_GPL(system_wq);
245 EXPORT_SYMBOL_GPL(system_long_wq);
246 EXPORT_SYMBOL_GPL(system_nrt_wq);
247 EXPORT_SYMBOL_GPL(system_unbound_wq);
248
249 #define for_each_busy_worker(worker, i, pos, gcwq)                      \
250         for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
251                 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
252
253 static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
254                                   unsigned int sw)
255 {
256         if (cpu < nr_cpu_ids) {
257                 if (sw & 1) {
258                         cpu = cpumask_next(cpu, mask);
259                         if (cpu < nr_cpu_ids)
260                                 return cpu;
261                 }
262                 if (sw & 2)
263                         return WORK_CPU_UNBOUND;
264         }
265         return WORK_CPU_NONE;
266 }
267
268 static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
269                                 struct workqueue_struct *wq)
270 {
271         return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
272 }
273
274 /*
275  * CPU iterators
276  *
277  * An extra gcwq is defined for an invalid cpu number
278  * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
279  * specific CPU.  The following iterators are similar to
280  * for_each_*_cpu() iterators but also considers the unbound gcwq.
281  *
282  * for_each_gcwq_cpu()          : possible CPUs + WORK_CPU_UNBOUND
283  * for_each_online_gcwq_cpu()   : online CPUs + WORK_CPU_UNBOUND
284  * for_each_cwq_cpu()           : possible CPUs for bound workqueues,
285  *                                WORK_CPU_UNBOUND for unbound workqueues
286  */
287 #define for_each_gcwq_cpu(cpu)                                          \
288         for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3);         \
289              (cpu) < WORK_CPU_NONE;                                     \
290              (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3))
291
292 #define for_each_online_gcwq_cpu(cpu)                                   \
293         for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3);           \
294              (cpu) < WORK_CPU_NONE;                                     \
295              (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3))
296
297 #define for_each_cwq_cpu(cpu, wq)                                       \
298         for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq));        \
299              (cpu) < WORK_CPU_NONE;                                     \
300              (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
301
302 #ifdef CONFIG_DEBUG_OBJECTS_WORK
303
304 static struct debug_obj_descr work_debug_descr;
305
306 /*
307  * fixup_init is called when:
308  * - an active object is initialized
309  */
310 static int work_fixup_init(void *addr, enum debug_obj_state state)
311 {
312         struct work_struct *work = addr;
313
314         switch (state) {
315         case ODEBUG_STATE_ACTIVE:
316                 cancel_work_sync(work);
317                 debug_object_init(work, &work_debug_descr);
318                 return 1;
319         default:
320                 return 0;
321         }
322 }
323
324 /*
325  * fixup_activate is called when:
326  * - an active object is activated
327  * - an unknown object is activated (might be a statically initialized object)
328  */
329 static int work_fixup_activate(void *addr, enum debug_obj_state state)
330 {
331         struct work_struct *work = addr;
332
333         switch (state) {
334
335         case ODEBUG_STATE_NOTAVAILABLE:
336                 /*
337                  * This is not really a fixup. The work struct was
338                  * statically initialized. We just make sure that it
339                  * is tracked in the object tracker.
340                  */
341                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
342                         debug_object_init(work, &work_debug_descr);
343                         debug_object_activate(work, &work_debug_descr);
344                         return 0;
345                 }
346                 WARN_ON_ONCE(1);
347                 return 0;
348
349         case ODEBUG_STATE_ACTIVE:
350                 WARN_ON(1);
351
352         default:
353                 return 0;
354         }
355 }
356
357 /*
358  * fixup_free is called when:
359  * - an active object is freed
360  */
361 static int work_fixup_free(void *addr, enum debug_obj_state state)
362 {
363         struct work_struct *work = addr;
364
365         switch (state) {
366         case ODEBUG_STATE_ACTIVE:
367                 cancel_work_sync(work);
368                 debug_object_free(work, &work_debug_descr);
369                 return 1;
370         default:
371                 return 0;
372         }
373 }
374
375 static struct debug_obj_descr work_debug_descr = {
376         .name           = "work_struct",
377         .fixup_init     = work_fixup_init,
378         .fixup_activate = work_fixup_activate,
379         .fixup_free     = work_fixup_free,
380 };
381
382 static inline void debug_work_activate(struct work_struct *work)
383 {
384         debug_object_activate(work, &work_debug_descr);
385 }
386
387 static inline void debug_work_deactivate(struct work_struct *work)
388 {
389         debug_object_deactivate(work, &work_debug_descr);
390 }
391
392 void __init_work(struct work_struct *work, int onstack)
393 {
394         if (onstack)
395                 debug_object_init_on_stack(work, &work_debug_descr);
396         else
397                 debug_object_init(work, &work_debug_descr);
398 }
399 EXPORT_SYMBOL_GPL(__init_work);
400
401 void destroy_work_on_stack(struct work_struct *work)
402 {
403         debug_object_free(work, &work_debug_descr);
404 }
405 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
406
407 #else
408 static inline void debug_work_activate(struct work_struct *work) { }
409 static inline void debug_work_deactivate(struct work_struct *work) { }
410 #endif
411
412 /* Serializes the accesses to the list of workqueues. */
413 static DEFINE_SPINLOCK(workqueue_lock);
414 static LIST_HEAD(workqueues);
415 static bool workqueue_freezing;         /* W: have wqs started freezing? */
416
417 /*
418  * The almighty global cpu workqueues.  nr_running is the only field
419  * which is expected to be used frequently by other cpus via
420  * try_to_wake_up().  Put it in a separate cacheline.
421  */
422 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
423 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
424
425 /*
426  * Global cpu workqueue and nr_running counter for unbound gcwq.  The
427  * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its
428  * workers have WORKER_UNBOUND set.
429  */
430 static struct global_cwq unbound_global_cwq;
431 static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0);       /* always 0 */
432
433 static int worker_thread(void *__worker);
434
435 static struct global_cwq *get_gcwq(unsigned int cpu)
436 {
437         if (cpu != WORK_CPU_UNBOUND)
438                 return &per_cpu(global_cwq, cpu);
439         else
440                 return &unbound_global_cwq;
441 }
442
443 static atomic_t *get_gcwq_nr_running(unsigned int cpu)
444 {
445         if (cpu != WORK_CPU_UNBOUND)
446                 return &per_cpu(gcwq_nr_running, cpu);
447         else
448                 return &unbound_gcwq_nr_running;
449 }
450
451 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
452                                             struct workqueue_struct *wq)
453 {
454         if (!(wq->flags & WQ_UNBOUND)) {
455                 if (likely(cpu < nr_cpu_ids)) {
456 #ifdef CONFIG_SMP
457                         return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
458 #else
459                         return wq->cpu_wq.single;
460 #endif
461                 }
462         } else if (likely(cpu == WORK_CPU_UNBOUND))
463                 return wq->cpu_wq.single;
464         return NULL;
465 }
466
467 static unsigned int work_color_to_flags(int color)
468 {
469         return color << WORK_STRUCT_COLOR_SHIFT;
470 }
471
472 static int get_work_color(struct work_struct *work)
473 {
474         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
475                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
476 }
477
478 static int work_next_color(int color)
479 {
480         return (color + 1) % WORK_NR_COLORS;
481 }
482
483 /*
484  * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
485  * work is on queue.  Once execution starts, WORK_STRUCT_CWQ is
486  * cleared and the work data contains the cpu number it was last on.
487  *
488  * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
489  * cwq, cpu or clear work->data.  These functions should only be
490  * called while the work is owned - ie. while the PENDING bit is set.
491  *
492  * get_work_[g]cwq() can be used to obtain the gcwq or cwq
493  * corresponding to a work.  gcwq is available once the work has been
494  * queued anywhere after initialization.  cwq is available only from
495  * queueing until execution starts.
496  */
497 static inline void set_work_data(struct work_struct *work, unsigned long data,
498                                  unsigned long flags)
499 {
500         BUG_ON(!work_pending(work));
501         atomic_long_set(&work->data, data | flags | work_static(work));
502 }
503
504 static void set_work_cwq(struct work_struct *work,
505                          struct cpu_workqueue_struct *cwq,
506                          unsigned long extra_flags)
507 {
508         set_work_data(work, (unsigned long)cwq,
509                       WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
510 }
511
512 static void set_work_cpu(struct work_struct *work, unsigned int cpu)
513 {
514         set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
515 }
516
517 static void clear_work_data(struct work_struct *work)
518 {
519         set_work_data(work, WORK_STRUCT_NO_CPU, 0);
520 }
521
522 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
523 {
524         unsigned long data = atomic_long_read(&work->data);
525
526         if (data & WORK_STRUCT_CWQ)
527                 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
528         else
529                 return NULL;
530 }
531
532 static struct global_cwq *get_work_gcwq(struct work_struct *work)
533 {
534         unsigned long data = atomic_long_read(&work->data);
535         unsigned int cpu;
536
537         if (data & WORK_STRUCT_CWQ)
538                 return ((struct cpu_workqueue_struct *)
539                         (data & WORK_STRUCT_WQ_DATA_MASK))->gcwq;
540
541         cpu = data >> WORK_STRUCT_FLAG_BITS;
542         if (cpu == WORK_CPU_NONE)
543                 return NULL;
544
545         BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
546         return get_gcwq(cpu);
547 }
548
549 /*
550  * Policy functions.  These define the policies on how the global
551  * worker pool is managed.  Unless noted otherwise, these functions
552  * assume that they're being called with gcwq->lock held.
553  */
554
555 static bool __need_more_worker(struct global_cwq *gcwq)
556 {
557         return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) ||
558                 gcwq->flags & GCWQ_HIGHPRI_PENDING;
559 }
560
561 /*
562  * Need to wake up a worker?  Called from anything but currently
563  * running workers.
564  */
565 static bool need_more_worker(struct global_cwq *gcwq)
566 {
567         return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq);
568 }
569
570 /* Can I start working?  Called from busy but !running workers. */
571 static bool may_start_working(struct global_cwq *gcwq)
572 {
573         return gcwq->nr_idle;
574 }
575
576 /* Do I need to keep working?  Called from currently running workers. */
577 static bool keep_working(struct global_cwq *gcwq)
578 {
579         atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
580
581         return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
582 }
583
584 /* Do we need a new worker?  Called from manager. */
585 static bool need_to_create_worker(struct global_cwq *gcwq)
586 {
587         return need_more_worker(gcwq) && !may_start_working(gcwq);
588 }
589
590 /* Do I need to be the manager? */
591 static bool need_to_manage_workers(struct global_cwq *gcwq)
592 {
593         return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
594 }
595
596 /* Do we have too many workers and should some go away? */
597 static bool too_many_workers(struct global_cwq *gcwq)
598 {
599         bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
600         int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
601         int nr_busy = gcwq->nr_workers - nr_idle;
602
603         return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
604 }
605
606 /*
607  * Wake up functions.
608  */
609
610 /* Return the first worker.  Safe with preemption disabled */
611 static struct worker *first_worker(struct global_cwq *gcwq)
612 {
613         if (unlikely(list_empty(&gcwq->idle_list)))
614                 return NULL;
615
616         return list_first_entry(&gcwq->idle_list, struct worker, entry);
617 }
618
619 /**
620  * wake_up_worker - wake up an idle worker
621  * @gcwq: gcwq to wake worker for
622  *
623  * Wake up the first idle worker of @gcwq.
624  *
625  * CONTEXT:
626  * spin_lock_irq(gcwq->lock).
627  */
628 static void wake_up_worker(struct global_cwq *gcwq)
629 {
630         struct worker *worker = first_worker(gcwq);
631
632         if (likely(worker))
633                 wake_up_process(worker->task);
634 }
635
636 /**
637  * wq_worker_waking_up - a worker is waking up
638  * @task: task waking up
639  * @cpu: CPU @task is waking up to
640  *
641  * This function is called during try_to_wake_up() when a worker is
642  * being awoken.
643  *
644  * CONTEXT:
645  * spin_lock_irq(rq->lock)
646  */
647 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
648 {
649         struct worker *worker = kthread_data(task);
650
651         if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
652                 atomic_inc(get_gcwq_nr_running(cpu));
653 }
654
655 /**
656  * wq_worker_sleeping - a worker is going to sleep
657  * @task: task going to sleep
658  * @cpu: CPU in question, must be the current CPU number
659  *
660  * This function is called during schedule() when a busy worker is
661  * going to sleep.  Worker on the same cpu can be woken up by
662  * returning pointer to its task.
663  *
664  * CONTEXT:
665  * spin_lock_irq(rq->lock)
666  *
667  * RETURNS:
668  * Worker task on @cpu to wake up, %NULL if none.
669  */
670 struct task_struct *wq_worker_sleeping(struct task_struct *task,
671                                        unsigned int cpu)
672 {
673         struct worker *worker = kthread_data(task), *to_wakeup = NULL;
674         struct global_cwq *gcwq = get_gcwq(cpu);
675         atomic_t *nr_running = get_gcwq_nr_running(cpu);
676
677         if (unlikely(worker->flags & WORKER_NOT_RUNNING))
678                 return NULL;
679
680         /* this can only happen on the local cpu */
681         BUG_ON(cpu != raw_smp_processor_id());
682
683         /*
684          * The counterpart of the following dec_and_test, implied mb,
685          * worklist not empty test sequence is in insert_work().
686          * Please read comment there.
687          *
688          * NOT_RUNNING is clear.  This means that trustee is not in
689          * charge and we're running on the local cpu w/ rq lock held
690          * and preemption disabled, which in turn means that none else
691          * could be manipulating idle_list, so dereferencing idle_list
692          * without gcwq lock is safe.
693          */
694         if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
695                 to_wakeup = first_worker(gcwq);
696         return to_wakeup ? to_wakeup->task : NULL;
697 }
698
699 /**
700  * worker_set_flags - set worker flags and adjust nr_running accordingly
701  * @worker: self
702  * @flags: flags to set
703  * @wakeup: wakeup an idle worker if necessary
704  *
705  * Set @flags in @worker->flags and adjust nr_running accordingly.  If
706  * nr_running becomes zero and @wakeup is %true, an idle worker is
707  * woken up.
708  *
709  * CONTEXT:
710  * spin_lock_irq(gcwq->lock)
711  */
712 static inline void worker_set_flags(struct worker *worker, unsigned int flags,
713                                     bool wakeup)
714 {
715         struct global_cwq *gcwq = worker->gcwq;
716
717         WARN_ON_ONCE(worker->task != current);
718
719         /*
720          * If transitioning into NOT_RUNNING, adjust nr_running and
721          * wake up an idle worker as necessary if requested by
722          * @wakeup.
723          */
724         if ((flags & WORKER_NOT_RUNNING) &&
725             !(worker->flags & WORKER_NOT_RUNNING)) {
726                 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
727
728                 if (wakeup) {
729                         if (atomic_dec_and_test(nr_running) &&
730                             !list_empty(&gcwq->worklist))
731                                 wake_up_worker(gcwq);
732                 } else
733                         atomic_dec(nr_running);
734         }
735
736         worker->flags |= flags;
737 }
738
739 /**
740  * worker_clr_flags - clear worker flags and adjust nr_running accordingly
741  * @worker: self
742  * @flags: flags to clear
743  *
744  * Clear @flags in @worker->flags and adjust nr_running accordingly.
745  *
746  * CONTEXT:
747  * spin_lock_irq(gcwq->lock)
748  */
749 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
750 {
751         struct global_cwq *gcwq = worker->gcwq;
752         unsigned int oflags = worker->flags;
753
754         WARN_ON_ONCE(worker->task != current);
755
756         worker->flags &= ~flags;
757
758         /* if transitioning out of NOT_RUNNING, increment nr_running */
759         if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
760                 if (!(worker->flags & WORKER_NOT_RUNNING))
761                         atomic_inc(get_gcwq_nr_running(gcwq->cpu));
762 }
763
764 /**
765  * busy_worker_head - return the busy hash head for a work
766  * @gcwq: gcwq of interest
767  * @work: work to be hashed
768  *
769  * Return hash head of @gcwq for @work.
770  *
771  * CONTEXT:
772  * spin_lock_irq(gcwq->lock).
773  *
774  * RETURNS:
775  * Pointer to the hash head.
776  */
777 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
778                                            struct work_struct *work)
779 {
780         const int base_shift = ilog2(sizeof(struct work_struct));
781         unsigned long v = (unsigned long)work;
782
783         /* simple shift and fold hash, do we need something better? */
784         v >>= base_shift;
785         v += v >> BUSY_WORKER_HASH_ORDER;
786         v &= BUSY_WORKER_HASH_MASK;
787
788         return &gcwq->busy_hash[v];
789 }
790
791 /**
792  * __find_worker_executing_work - find worker which is executing a work
793  * @gcwq: gcwq of interest
794  * @bwh: hash head as returned by busy_worker_head()
795  * @work: work to find worker for
796  *
797  * Find a worker which is executing @work on @gcwq.  @bwh should be
798  * the hash head obtained by calling busy_worker_head() with the same
799  * work.
800  *
801  * CONTEXT:
802  * spin_lock_irq(gcwq->lock).
803  *
804  * RETURNS:
805  * Pointer to worker which is executing @work if found, NULL
806  * otherwise.
807  */
808 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
809                                                    struct hlist_head *bwh,
810                                                    struct work_struct *work)
811 {
812         struct worker *worker;
813         struct hlist_node *tmp;
814
815         hlist_for_each_entry(worker, tmp, bwh, hentry)
816                 if (worker->current_work == work)
817                         return worker;
818         return NULL;
819 }
820
821 /**
822  * find_worker_executing_work - find worker which is executing a work
823  * @gcwq: gcwq of interest
824  * @work: work to find worker for
825  *
826  * Find a worker which is executing @work on @gcwq.  This function is
827  * identical to __find_worker_executing_work() except that this
828  * function calculates @bwh itself.
829  *
830  * CONTEXT:
831  * spin_lock_irq(gcwq->lock).
832  *
833  * RETURNS:
834  * Pointer to worker which is executing @work if found, NULL
835  * otherwise.
836  */
837 static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
838                                                  struct work_struct *work)
839 {
840         return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
841                                             work);
842 }
843
844 /**
845  * gcwq_determine_ins_pos - find insertion position
846  * @gcwq: gcwq of interest
847  * @cwq: cwq a work is being queued for
848  *
849  * A work for @cwq is about to be queued on @gcwq, determine insertion
850  * position for the work.  If @cwq is for HIGHPRI wq, the work is
851  * queued at the head of the queue but in FIFO order with respect to
852  * other HIGHPRI works; otherwise, at the end of the queue.  This
853  * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that
854  * there are HIGHPRI works pending.
855  *
856  * CONTEXT:
857  * spin_lock_irq(gcwq->lock).
858  *
859  * RETURNS:
860  * Pointer to inserstion position.
861  */
862 static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq,
863                                                struct cpu_workqueue_struct *cwq)
864 {
865         struct work_struct *twork;
866
867         if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
868                 return &gcwq->worklist;
869
870         list_for_each_entry(twork, &gcwq->worklist, entry) {
871                 struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
872
873                 if (!(tcwq->wq->flags & WQ_HIGHPRI))
874                         break;
875         }
876
877         gcwq->flags |= GCWQ_HIGHPRI_PENDING;
878         return &twork->entry;
879 }
880
881 /**
882  * insert_work - insert a work into gcwq
883  * @cwq: cwq @work belongs to
884  * @work: work to insert
885  * @head: insertion point
886  * @extra_flags: extra WORK_STRUCT_* flags to set
887  *
888  * Insert @work which belongs to @cwq into @gcwq after @head.
889  * @extra_flags is or'd to work_struct flags.
890  *
891  * CONTEXT:
892  * spin_lock_irq(gcwq->lock).
893  */
894 static void insert_work(struct cpu_workqueue_struct *cwq,
895                         struct work_struct *work, struct list_head *head,
896                         unsigned int extra_flags)
897 {
898         struct global_cwq *gcwq = cwq->gcwq;
899
900         /* we own @work, set data and link */
901         set_work_cwq(work, cwq, extra_flags);
902
903         /*
904          * Ensure that we get the right work->data if we see the
905          * result of list_add() below, see try_to_grab_pending().
906          */
907         smp_wmb();
908
909         list_add_tail(&work->entry, head);
910
911         /*
912          * Ensure either worker_sched_deactivated() sees the above
913          * list_add_tail() or we see zero nr_running to avoid workers
914          * lying around lazily while there are works to be processed.
915          */
916         smp_mb();
917
918         if (__need_more_worker(gcwq))
919                 wake_up_worker(gcwq);
920 }
921
922 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
923                          struct work_struct *work)
924 {
925         struct global_cwq *gcwq;
926         struct cpu_workqueue_struct *cwq;
927         struct list_head *worklist;
928         unsigned long flags;
929
930         debug_work_activate(work);
931
932         /* determine gcwq to use */
933         if (!(wq->flags & WQ_UNBOUND)) {
934                 struct global_cwq *last_gcwq;
935
936                 if (unlikely(cpu == WORK_CPU_UNBOUND))
937                         cpu = raw_smp_processor_id();
938
939                 /*
940                  * It's multi cpu.  If @wq is non-reentrant and @work
941                  * was previously on a different cpu, it might still
942                  * be running there, in which case the work needs to
943                  * be queued on that cpu to guarantee non-reentrance.
944                  */
945                 gcwq = get_gcwq(cpu);
946                 if (wq->flags & WQ_NON_REENTRANT &&
947                     (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
948                         struct worker *worker;
949
950                         spin_lock_irqsave(&last_gcwq->lock, flags);
951
952                         worker = find_worker_executing_work(last_gcwq, work);
953
954                         if (worker && worker->current_cwq->wq == wq)
955                                 gcwq = last_gcwq;
956                         else {
957                                 /* meh... not running there, queue here */
958                                 spin_unlock_irqrestore(&last_gcwq->lock, flags);
959                                 spin_lock_irqsave(&gcwq->lock, flags);
960                         }
961                 } else
962                         spin_lock_irqsave(&gcwq->lock, flags);
963         } else {
964                 gcwq = get_gcwq(WORK_CPU_UNBOUND);
965                 spin_lock_irqsave(&gcwq->lock, flags);
966         }
967
968         /* gcwq determined, get cwq and queue */
969         cwq = get_cwq(gcwq->cpu, wq);
970
971         BUG_ON(!list_empty(&work->entry));
972
973         cwq->nr_in_flight[cwq->work_color]++;
974
975         if (likely(cwq->nr_active < cwq->max_active)) {
976                 cwq->nr_active++;
977                 worklist = gcwq_determine_ins_pos(gcwq, cwq);
978         } else
979                 worklist = &cwq->delayed_works;
980
981         insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
982
983         spin_unlock_irqrestore(&gcwq->lock, flags);
984 }
985
986 /**
987  * queue_work - queue work on a workqueue
988  * @wq: workqueue to use
989  * @work: work to queue
990  *
991  * Returns 0 if @work was already on a queue, non-zero otherwise.
992  *
993  * We queue the work to the CPU on which it was submitted, but if the CPU dies
994  * it can be processed by another CPU.
995  */
996 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
997 {
998         int ret;
999
1000         ret = queue_work_on(get_cpu(), wq, work);
1001         put_cpu();
1002
1003         return ret;
1004 }
1005 EXPORT_SYMBOL_GPL(queue_work);
1006
1007 /**
1008  * queue_work_on - queue work on specific cpu
1009  * @cpu: CPU number to execute work on
1010  * @wq: workqueue to use
1011  * @work: work to queue
1012  *
1013  * Returns 0 if @work was already on a queue, non-zero otherwise.
1014  *
1015  * We queue the work to a specific CPU, the caller must ensure it
1016  * can't go away.
1017  */
1018 int
1019 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
1020 {
1021         int ret = 0;
1022
1023         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1024                 __queue_work(cpu, wq, work);
1025                 ret = 1;
1026         }
1027         return ret;
1028 }
1029 EXPORT_SYMBOL_GPL(queue_work_on);
1030
1031 static void delayed_work_timer_fn(unsigned long __data)
1032 {
1033         struct delayed_work *dwork = (struct delayed_work *)__data;
1034         struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
1035
1036         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
1037 }
1038
1039 /**
1040  * queue_delayed_work - queue work on a workqueue after delay
1041  * @wq: workqueue to use
1042  * @dwork: delayable work to queue
1043  * @delay: number of jiffies to wait before queueing
1044  *
1045  * Returns 0 if @work was already on a queue, non-zero otherwise.
1046  */
1047 int queue_delayed_work(struct workqueue_struct *wq,
1048                         struct delayed_work *dwork, unsigned long delay)
1049 {
1050         if (delay == 0)
1051                 return queue_work(wq, &dwork->work);
1052
1053         return queue_delayed_work_on(-1, wq, dwork, delay);
1054 }
1055 EXPORT_SYMBOL_GPL(queue_delayed_work);
1056
1057 /**
1058  * queue_delayed_work_on - queue work on specific CPU after delay
1059  * @cpu: CPU number to execute work on
1060  * @wq: workqueue to use
1061  * @dwork: work to queue
1062  * @delay: number of jiffies to wait before queueing
1063  *
1064  * Returns 0 if @work was already on a queue, non-zero otherwise.
1065  */
1066 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1067                         struct delayed_work *dwork, unsigned long delay)
1068 {
1069         int ret = 0;
1070         struct timer_list *timer = &dwork->timer;
1071         struct work_struct *work = &dwork->work;
1072
1073         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1074                 unsigned int lcpu;
1075
1076                 BUG_ON(timer_pending(timer));
1077                 BUG_ON(!list_empty(&work->entry));
1078
1079                 timer_stats_timer_set_start_info(&dwork->timer);
1080
1081                 /*
1082                  * This stores cwq for the moment, for the timer_fn.
1083                  * Note that the work's gcwq is preserved to allow
1084                  * reentrance detection for delayed works.
1085                  */
1086                 if (!(wq->flags & WQ_UNBOUND)) {
1087                         struct global_cwq *gcwq = get_work_gcwq(work);
1088
1089                         if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
1090                                 lcpu = gcwq->cpu;
1091                         else
1092                                 lcpu = raw_smp_processor_id();
1093                 } else
1094                         lcpu = WORK_CPU_UNBOUND;
1095
1096                 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1097
1098                 timer->expires = jiffies + delay;
1099                 timer->data = (unsigned long)dwork;
1100                 timer->function = delayed_work_timer_fn;
1101
1102                 if (unlikely(cpu >= 0))
1103                         add_timer_on(timer, cpu);
1104                 else
1105                         add_timer(timer);
1106                 ret = 1;
1107         }
1108         return ret;
1109 }
1110 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1111
1112 /**
1113  * worker_enter_idle - enter idle state
1114  * @worker: worker which is entering idle state
1115  *
1116  * @worker is entering idle state.  Update stats and idle timer if
1117  * necessary.
1118  *
1119  * LOCKING:
1120  * spin_lock_irq(gcwq->lock).
1121  */
1122 static void worker_enter_idle(struct worker *worker)
1123 {
1124         struct global_cwq *gcwq = worker->gcwq;
1125
1126         BUG_ON(worker->flags & WORKER_IDLE);
1127         BUG_ON(!list_empty(&worker->entry) &&
1128                (worker->hentry.next || worker->hentry.pprev));
1129
1130         /* can't use worker_set_flags(), also called from start_worker() */
1131         worker->flags |= WORKER_IDLE;
1132         gcwq->nr_idle++;
1133         worker->last_active = jiffies;
1134
1135         /* idle_list is LIFO */
1136         list_add(&worker->entry, &gcwq->idle_list);
1137
1138         if (likely(!(worker->flags & WORKER_ROGUE))) {
1139                 if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
1140                         mod_timer(&gcwq->idle_timer,
1141                                   jiffies + IDLE_WORKER_TIMEOUT);
1142         } else
1143                 wake_up_all(&gcwq->trustee_wait);
1144
1145         /* sanity check nr_running */
1146         WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
1147                      atomic_read(get_gcwq_nr_running(gcwq->cpu)));
1148 }
1149
1150 /**
1151  * worker_leave_idle - leave idle state
1152  * @worker: worker which is leaving idle state
1153  *
1154  * @worker is leaving idle state.  Update stats.
1155  *
1156  * LOCKING:
1157  * spin_lock_irq(gcwq->lock).
1158  */
1159 static void worker_leave_idle(struct worker *worker)
1160 {
1161         struct global_cwq *gcwq = worker->gcwq;
1162
1163         BUG_ON(!(worker->flags & WORKER_IDLE));
1164         worker_clr_flags(worker, WORKER_IDLE);
1165         gcwq->nr_idle--;
1166         list_del_init(&worker->entry);
1167 }
1168
1169 /**
1170  * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1171  * @worker: self
1172  *
1173  * Works which are scheduled while the cpu is online must at least be
1174  * scheduled to a worker which is bound to the cpu so that if they are
1175  * flushed from cpu callbacks while cpu is going down, they are
1176  * guaranteed to execute on the cpu.
1177  *
1178  * This function is to be used by rogue workers and rescuers to bind
1179  * themselves to the target cpu and may race with cpu going down or
1180  * coming online.  kthread_bind() can't be used because it may put the
1181  * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1182  * verbatim as it's best effort and blocking and gcwq may be
1183  * [dis]associated in the meantime.
1184  *
1185  * This function tries set_cpus_allowed() and locks gcwq and verifies
1186  * the binding against GCWQ_DISASSOCIATED which is set during
1187  * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1188  * idle state or fetches works without dropping lock, it can guarantee
1189  * the scheduling requirement described in the first paragraph.
1190  *
1191  * CONTEXT:
1192  * Might sleep.  Called without any lock but returns with gcwq->lock
1193  * held.
1194  *
1195  * RETURNS:
1196  * %true if the associated gcwq is online (@worker is successfully
1197  * bound), %false if offline.
1198  */
1199 static bool worker_maybe_bind_and_lock(struct worker *worker)
1200 {
1201         struct global_cwq *gcwq = worker->gcwq;
1202         struct task_struct *task = worker->task;
1203
1204         while (true) {
1205                 /*
1206                  * The following call may fail, succeed or succeed
1207                  * without actually migrating the task to the cpu if
1208                  * it races with cpu hotunplug operation.  Verify
1209                  * against GCWQ_DISASSOCIATED.
1210                  */
1211                 if (!(gcwq->flags & GCWQ_DISASSOCIATED))
1212                         set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
1213
1214                 spin_lock_irq(&gcwq->lock);
1215                 if (gcwq->flags & GCWQ_DISASSOCIATED)
1216                         return false;
1217                 if (task_cpu(task) == gcwq->cpu &&
1218                     cpumask_equal(&current->cpus_allowed,
1219                                   get_cpu_mask(gcwq->cpu)))
1220                         return true;
1221                 spin_unlock_irq(&gcwq->lock);
1222
1223                 /* CPU has come up inbetween, retry migration */
1224                 cpu_relax();
1225         }
1226 }
1227
1228 /*
1229  * Function for worker->rebind_work used to rebind rogue busy workers
1230  * to the associated cpu which is coming back online.  This is
1231  * scheduled by cpu up but can race with other cpu hotplug operations
1232  * and may be executed twice without intervening cpu down.
1233  */
1234 static void worker_rebind_fn(struct work_struct *work)
1235 {
1236         struct worker *worker = container_of(work, struct worker, rebind_work);
1237         struct global_cwq *gcwq = worker->gcwq;
1238
1239         if (worker_maybe_bind_and_lock(worker))
1240                 worker_clr_flags(worker, WORKER_REBIND);
1241
1242         spin_unlock_irq(&gcwq->lock);
1243 }
1244
1245 static struct worker *alloc_worker(void)
1246 {
1247         struct worker *worker;
1248
1249         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
1250         if (worker) {
1251                 INIT_LIST_HEAD(&worker->entry);
1252                 INIT_LIST_HEAD(&worker->scheduled);
1253                 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1254                 /* on creation a worker is in !idle && prep state */
1255                 worker->flags = WORKER_PREP;
1256         }
1257         return worker;
1258 }
1259
1260 /**
1261  * create_worker - create a new workqueue worker
1262  * @gcwq: gcwq the new worker will belong to
1263  * @bind: whether to set affinity to @cpu or not
1264  *
1265  * Create a new worker which is bound to @gcwq.  The returned worker
1266  * can be started by calling start_worker() or destroyed using
1267  * destroy_worker().
1268  *
1269  * CONTEXT:
1270  * Might sleep.  Does GFP_KERNEL allocations.
1271  *
1272  * RETURNS:
1273  * Pointer to the newly created worker.
1274  */
1275 static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
1276 {
1277         bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
1278         struct worker *worker = NULL;
1279         int id = -1;
1280
1281         spin_lock_irq(&gcwq->lock);
1282         while (ida_get_new(&gcwq->worker_ida, &id)) {
1283                 spin_unlock_irq(&gcwq->lock);
1284                 if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
1285                         goto fail;
1286                 spin_lock_irq(&gcwq->lock);
1287         }
1288         spin_unlock_irq(&gcwq->lock);
1289
1290         worker = alloc_worker();
1291         if (!worker)
1292                 goto fail;
1293
1294         worker->gcwq = gcwq;
1295         worker->id = id;
1296
1297         if (!on_unbound_cpu)
1298                 worker->task = kthread_create(worker_thread, worker,
1299                                               "kworker/%u:%d", gcwq->cpu, id);
1300         else
1301                 worker->task = kthread_create(worker_thread, worker,
1302                                               "kworker/u:%d", id);
1303         if (IS_ERR(worker->task))
1304                 goto fail;
1305
1306         /*
1307          * A rogue worker will become a regular one if CPU comes
1308          * online later on.  Make sure every worker has
1309          * PF_THREAD_BOUND set.
1310          */
1311         if (bind && !on_unbound_cpu)
1312                 kthread_bind(worker->task, gcwq->cpu);
1313         else {
1314                 worker->task->flags |= PF_THREAD_BOUND;
1315                 if (on_unbound_cpu)
1316                         worker->flags |= WORKER_UNBOUND;
1317         }
1318
1319         return worker;
1320 fail:
1321         if (id >= 0) {
1322                 spin_lock_irq(&gcwq->lock);
1323                 ida_remove(&gcwq->worker_ida, id);
1324                 spin_unlock_irq(&gcwq->lock);
1325         }
1326         kfree(worker);
1327         return NULL;
1328 }
1329
1330 /**
1331  * start_worker - start a newly created worker
1332  * @worker: worker to start
1333  *
1334  * Make the gcwq aware of @worker and start it.
1335  *
1336  * CONTEXT:
1337  * spin_lock_irq(gcwq->lock).
1338  */
1339 static void start_worker(struct worker *worker)
1340 {
1341         worker->flags |= WORKER_STARTED;
1342         worker->gcwq->nr_workers++;
1343         worker_enter_idle(worker);
1344         wake_up_process(worker->task);
1345 }
1346
1347 /**
1348  * destroy_worker - destroy a workqueue worker
1349  * @worker: worker to be destroyed
1350  *
1351  * Destroy @worker and adjust @gcwq stats accordingly.
1352  *
1353  * CONTEXT:
1354  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1355  */
1356 static void destroy_worker(struct worker *worker)
1357 {
1358         struct global_cwq *gcwq = worker->gcwq;
1359         int id = worker->id;
1360
1361         /* sanity check frenzy */
1362         BUG_ON(worker->current_work);
1363         BUG_ON(!list_empty(&worker->scheduled));
1364
1365         if (worker->flags & WORKER_STARTED)
1366                 gcwq->nr_workers--;
1367         if (worker->flags & WORKER_IDLE)
1368                 gcwq->nr_idle--;
1369
1370         list_del_init(&worker->entry);
1371         worker->flags |= WORKER_DIE;
1372
1373         spin_unlock_irq(&gcwq->lock);
1374
1375         kthread_stop(worker->task);
1376         kfree(worker);
1377
1378         spin_lock_irq(&gcwq->lock);
1379         ida_remove(&gcwq->worker_ida, id);
1380 }
1381
1382 static void idle_worker_timeout(unsigned long __gcwq)
1383 {
1384         struct global_cwq *gcwq = (void *)__gcwq;
1385
1386         spin_lock_irq(&gcwq->lock);
1387
1388         if (too_many_workers(gcwq)) {
1389                 struct worker *worker;
1390                 unsigned long expires;
1391
1392                 /* idle_list is kept in LIFO order, check the last one */
1393                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1394                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1395
1396                 if (time_before(jiffies, expires))
1397                         mod_timer(&gcwq->idle_timer, expires);
1398                 else {
1399                         /* it's been idle for too long, wake up manager */
1400                         gcwq->flags |= GCWQ_MANAGE_WORKERS;
1401                         wake_up_worker(gcwq);
1402                 }
1403         }
1404
1405         spin_unlock_irq(&gcwq->lock);
1406 }
1407
1408 static bool send_mayday(struct work_struct *work)
1409 {
1410         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1411         struct workqueue_struct *wq = cwq->wq;
1412         unsigned int cpu;
1413
1414         if (!(wq->flags & WQ_RESCUER))
1415                 return false;
1416
1417         /* mayday mayday mayday */
1418         cpu = cwq->gcwq->cpu;
1419         /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
1420         if (cpu == WORK_CPU_UNBOUND)
1421                 cpu = 0;
1422         if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
1423                 wake_up_process(wq->rescuer->task);
1424         return true;
1425 }
1426
1427 static void gcwq_mayday_timeout(unsigned long __gcwq)
1428 {
1429         struct global_cwq *gcwq = (void *)__gcwq;
1430         struct work_struct *work;
1431
1432         spin_lock_irq(&gcwq->lock);
1433
1434         if (need_to_create_worker(gcwq)) {
1435                 /*
1436                  * We've been trying to create a new worker but
1437                  * haven't been successful.  We might be hitting an
1438                  * allocation deadlock.  Send distress signals to
1439                  * rescuers.
1440                  */
1441                 list_for_each_entry(work, &gcwq->worklist, entry)
1442                         send_mayday(work);
1443         }
1444
1445         spin_unlock_irq(&gcwq->lock);
1446
1447         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
1448 }
1449
1450 /**
1451  * maybe_create_worker - create a new worker if necessary
1452  * @gcwq: gcwq to create a new worker for
1453  *
1454  * Create a new worker for @gcwq if necessary.  @gcwq is guaranteed to
1455  * have at least one idle worker on return from this function.  If
1456  * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1457  * sent to all rescuers with works scheduled on @gcwq to resolve
1458  * possible allocation deadlock.
1459  *
1460  * On return, need_to_create_worker() is guaranteed to be false and
1461  * may_start_working() true.
1462  *
1463  * LOCKING:
1464  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1465  * multiple times.  Does GFP_KERNEL allocations.  Called only from
1466  * manager.
1467  *
1468  * RETURNS:
1469  * false if no action was taken and gcwq->lock stayed locked, true
1470  * otherwise.
1471  */
1472 static bool maybe_create_worker(struct global_cwq *gcwq)
1473 {
1474         if (!need_to_create_worker(gcwq))
1475                 return false;
1476 restart:
1477         spin_unlock_irq(&gcwq->lock);
1478
1479         /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1480         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1481
1482         while (true) {
1483                 struct worker *worker;
1484
1485                 worker = create_worker(gcwq, true);
1486                 if (worker) {
1487                         del_timer_sync(&gcwq->mayday_timer);
1488                         spin_lock_irq(&gcwq->lock);
1489                         start_worker(worker);
1490                         BUG_ON(need_to_create_worker(gcwq));
1491                         return true;
1492                 }
1493
1494                 if (!need_to_create_worker(gcwq))
1495                         break;
1496
1497                 __set_current_state(TASK_INTERRUPTIBLE);
1498                 schedule_timeout(CREATE_COOLDOWN);
1499
1500                 if (!need_to_create_worker(gcwq))
1501                         break;
1502         }
1503
1504         del_timer_sync(&gcwq->mayday_timer);
1505         spin_lock_irq(&gcwq->lock);
1506         if (need_to_create_worker(gcwq))
1507                 goto restart;
1508         return true;
1509 }
1510
1511 /**
1512  * maybe_destroy_worker - destroy workers which have been idle for a while
1513  * @gcwq: gcwq to destroy workers for
1514  *
1515  * Destroy @gcwq workers which have been idle for longer than
1516  * IDLE_WORKER_TIMEOUT.
1517  *
1518  * LOCKING:
1519  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1520  * multiple times.  Called only from manager.
1521  *
1522  * RETURNS:
1523  * false if no action was taken and gcwq->lock stayed locked, true
1524  * otherwise.
1525  */
1526 static bool maybe_destroy_workers(struct global_cwq *gcwq)
1527 {
1528         bool ret = false;
1529
1530         while (too_many_workers(gcwq)) {
1531                 struct worker *worker;
1532                 unsigned long expires;
1533
1534                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1535                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1536
1537                 if (time_before(jiffies, expires)) {
1538                         mod_timer(&gcwq->idle_timer, expires);
1539                         break;
1540                 }
1541
1542                 destroy_worker(worker);
1543                 ret = true;
1544         }
1545
1546         return ret;
1547 }
1548
1549 /**
1550  * manage_workers - manage worker pool
1551  * @worker: self
1552  *
1553  * Assume the manager role and manage gcwq worker pool @worker belongs
1554  * to.  At any given time, there can be only zero or one manager per
1555  * gcwq.  The exclusion is handled automatically by this function.
1556  *
1557  * The caller can safely start processing works on false return.  On
1558  * true return, it's guaranteed that need_to_create_worker() is false
1559  * and may_start_working() is true.
1560  *
1561  * CONTEXT:
1562  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1563  * multiple times.  Does GFP_KERNEL allocations.
1564  *
1565  * RETURNS:
1566  * false if no action was taken and gcwq->lock stayed locked, true if
1567  * some action was taken.
1568  */
1569 static bool manage_workers(struct worker *worker)
1570 {
1571         struct global_cwq *gcwq = worker->gcwq;
1572         bool ret = false;
1573
1574         if (gcwq->flags & GCWQ_MANAGING_WORKERS)
1575                 return ret;
1576
1577         gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
1578         gcwq->flags |= GCWQ_MANAGING_WORKERS;
1579
1580         /*
1581          * Destroy and then create so that may_start_working() is true
1582          * on return.
1583          */
1584         ret |= maybe_destroy_workers(gcwq);
1585         ret |= maybe_create_worker(gcwq);
1586
1587         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
1588
1589         /*
1590          * The trustee might be waiting to take over the manager
1591          * position, tell it we're done.
1592          */
1593         if (unlikely(gcwq->trustee))
1594                 wake_up_all(&gcwq->trustee_wait);
1595
1596         return ret;
1597 }
1598
1599 /**
1600  * move_linked_works - move linked works to a list
1601  * @work: start of series of works to be scheduled
1602  * @head: target list to append @work to
1603  * @nextp: out paramter for nested worklist walking
1604  *
1605  * Schedule linked works starting from @work to @head.  Work series to
1606  * be scheduled starts at @work and includes any consecutive work with
1607  * WORK_STRUCT_LINKED set in its predecessor.
1608  *
1609  * If @nextp is not NULL, it's updated to point to the next work of
1610  * the last scheduled work.  This allows move_linked_works() to be
1611  * nested inside outer list_for_each_entry_safe().
1612  *
1613  * CONTEXT:
1614  * spin_lock_irq(gcwq->lock).
1615  */
1616 static void move_linked_works(struct work_struct *work, struct list_head *head,
1617                               struct work_struct **nextp)
1618 {
1619         struct work_struct *n;
1620
1621         /*
1622          * Linked worklist will always end before the end of the list,
1623          * use NULL for list head.
1624          */
1625         list_for_each_entry_safe_from(work, n, NULL, entry) {
1626                 list_move_tail(&work->entry, head);
1627                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1628                         break;
1629         }
1630
1631         /*
1632          * If we're already inside safe list traversal and have moved
1633          * multiple works to the scheduled queue, the next position
1634          * needs to be updated.
1635          */
1636         if (nextp)
1637                 *nextp = n;
1638 }
1639
1640 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1641 {
1642         struct work_struct *work = list_first_entry(&cwq->delayed_works,
1643                                                     struct work_struct, entry);
1644         struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
1645
1646         move_linked_works(work, pos, NULL);
1647         cwq->nr_active++;
1648 }
1649
1650 /**
1651  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1652  * @cwq: cwq of interest
1653  * @color: color of work which left the queue
1654  *
1655  * A work either has completed or is removed from pending queue,
1656  * decrement nr_in_flight of its cwq and handle workqueue flushing.
1657  *
1658  * CONTEXT:
1659  * spin_lock_irq(gcwq->lock).
1660  */
1661 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1662 {
1663         /* ignore uncolored works */
1664         if (color == WORK_NO_COLOR)
1665                 return;
1666
1667         cwq->nr_in_flight[color]--;
1668         cwq->nr_active--;
1669
1670         if (!list_empty(&cwq->delayed_works)) {
1671                 /* one down, submit a delayed one */
1672                 if (cwq->nr_active < cwq->max_active)
1673                         cwq_activate_first_delayed(cwq);
1674         }
1675
1676         /* is flush in progress and are we at the flushing tip? */
1677         if (likely(cwq->flush_color != color))
1678                 return;
1679
1680         /* are there still in-flight works? */
1681         if (cwq->nr_in_flight[color])
1682                 return;
1683
1684         /* this cwq is done, clear flush_color */
1685         cwq->flush_color = -1;
1686
1687         /*
1688          * If this was the last cwq, wake up the first flusher.  It
1689          * will handle the rest.
1690          */
1691         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1692                 complete(&cwq->wq->first_flusher->done);
1693 }
1694
1695 /**
1696  * process_one_work - process single work
1697  * @worker: self
1698  * @work: work to process
1699  *
1700  * Process @work.  This function contains all the logics necessary to
1701  * process a single work including synchronization against and
1702  * interaction with other workers on the same cpu, queueing and
1703  * flushing.  As long as context requirement is met, any worker can
1704  * call this function to process a work.
1705  *
1706  * CONTEXT:
1707  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1708  */
1709 static void process_one_work(struct worker *worker, struct work_struct *work)
1710 {
1711         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1712         struct global_cwq *gcwq = cwq->gcwq;
1713         struct hlist_head *bwh = busy_worker_head(gcwq, work);
1714         bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
1715         work_func_t f = work->func;
1716         int work_color;
1717         struct worker *collision;
1718 #ifdef CONFIG_LOCKDEP
1719         /*
1720          * It is permissible to free the struct work_struct from
1721          * inside the function that is called from it, this we need to
1722          * take into account for lockdep too.  To avoid bogus "held
1723          * lock freed" warnings as well as problems when looking into
1724          * work->lockdep_map, make a copy and use that here.
1725          */
1726         struct lockdep_map lockdep_map = work->lockdep_map;
1727 #endif
1728         /*
1729          * A single work shouldn't be executed concurrently by
1730          * multiple workers on a single cpu.  Check whether anyone is
1731          * already processing the work.  If so, defer the work to the
1732          * currently executing one.
1733          */
1734         collision = __find_worker_executing_work(gcwq, bwh, work);
1735         if (unlikely(collision)) {
1736                 move_linked_works(work, &collision->scheduled, NULL);
1737                 return;
1738         }
1739
1740         /* claim and process */
1741         debug_work_deactivate(work);
1742         hlist_add_head(&worker->hentry, bwh);
1743         worker->current_work = work;
1744         worker->current_cwq = cwq;
1745         work_color = get_work_color(work);
1746
1747         /* record the current cpu number in the work data and dequeue */
1748         set_work_cpu(work, gcwq->cpu);
1749         list_del_init(&work->entry);
1750
1751         /*
1752          * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
1753          * wake up another worker; otherwise, clear HIGHPRI_PENDING.
1754          */
1755         if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
1756                 struct work_struct *nwork = list_first_entry(&gcwq->worklist,
1757                                                 struct work_struct, entry);
1758
1759                 if (!list_empty(&gcwq->worklist) &&
1760                     get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
1761                         wake_up_worker(gcwq);
1762                 else
1763                         gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
1764         }
1765
1766         /*
1767          * CPU intensive works don't participate in concurrency
1768          * management.  They're the scheduler's responsibility.
1769          */
1770         if (unlikely(cpu_intensive))
1771                 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);
1772
1773         spin_unlock_irq(&gcwq->lock);
1774
1775         work_clear_pending(work);
1776         lock_map_acquire(&cwq->wq->lockdep_map);
1777         lock_map_acquire(&lockdep_map);
1778         f(work);
1779         lock_map_release(&lockdep_map);
1780         lock_map_release(&cwq->wq->lockdep_map);
1781
1782         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
1783                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
1784                        "%s/0x%08x/%d\n",
1785                        current->comm, preempt_count(), task_pid_nr(current));
1786                 printk(KERN_ERR "    last function: ");
1787                 print_symbol("%s\n", (unsigned long)f);
1788                 debug_show_held_locks(current);
1789                 dump_stack();
1790         }
1791
1792         spin_lock_irq(&gcwq->lock);
1793
1794         /* clear cpu intensive status */
1795         if (unlikely(cpu_intensive))
1796                 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
1797
1798         /* we're done with it, release */
1799         hlist_del_init(&worker->hentry);
1800         worker->current_work = NULL;
1801         worker->current_cwq = NULL;
1802         cwq_dec_nr_in_flight(cwq, work_color);
1803 }
1804
1805 /**
1806  * process_scheduled_works - process scheduled works
1807  * @worker: self
1808  *
1809  * Process all scheduled works.  Please note that the scheduled list
1810  * may change while processing a work, so this function repeatedly
1811  * fetches a work from the top and executes it.
1812  *
1813  * CONTEXT:
1814  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1815  * multiple times.
1816  */
1817 static void process_scheduled_works(struct worker *worker)
1818 {
1819         while (!list_empty(&worker->scheduled)) {
1820                 struct work_struct *work = list_first_entry(&worker->scheduled,
1821                                                 struct work_struct, entry);
1822                 process_one_work(worker, work);
1823         }
1824 }
1825
1826 /**
1827  * worker_thread - the worker thread function
1828  * @__worker: self
1829  *
1830  * The gcwq worker thread function.  There's a single dynamic pool of
1831  * these per each cpu.  These workers process all works regardless of
1832  * their specific target workqueue.  The only exception is works which
1833  * belong to workqueues with a rescuer which will be explained in
1834  * rescuer_thread().
1835  */
1836 static int worker_thread(void *__worker)
1837 {
1838         struct worker *worker = __worker;
1839         struct global_cwq *gcwq = worker->gcwq;
1840
1841         /* tell the scheduler that this is a workqueue worker */
1842         worker->task->flags |= PF_WQ_WORKER;
1843 woke_up:
1844         spin_lock_irq(&gcwq->lock);
1845
1846         /* DIE can be set only while we're idle, checking here is enough */
1847         if (worker->flags & WORKER_DIE) {
1848                 spin_unlock_irq(&gcwq->lock);
1849                 worker->task->flags &= ~PF_WQ_WORKER;
1850                 return 0;
1851         }
1852
1853         worker_leave_idle(worker);
1854 recheck:
1855         /* no more worker necessary? */
1856         if (!need_more_worker(gcwq))
1857                 goto sleep;
1858
1859         /* do we need to manage? */
1860         if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
1861                 goto recheck;
1862
1863         /*
1864          * ->scheduled list can only be filled while a worker is
1865          * preparing to process a work or actually processing it.
1866          * Make sure nobody diddled with it while I was sleeping.
1867          */
1868         BUG_ON(!list_empty(&worker->scheduled));
1869
1870         /*
1871          * When control reaches this point, we're guaranteed to have
1872          * at least one idle worker or that someone else has already
1873          * assumed the manager role.
1874          */
1875         worker_clr_flags(worker, WORKER_PREP);
1876
1877         do {
1878                 struct work_struct *work =
1879                         list_first_entry(&gcwq->worklist,
1880                                          struct work_struct, entry);
1881
1882                 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1883                         /* optimization path, not strictly necessary */
1884                         process_one_work(worker, work);
1885                         if (unlikely(!list_empty(&worker->scheduled)))
1886                                 process_scheduled_works(worker);
1887                 } else {
1888                         move_linked_works(work, &worker->scheduled, NULL);
1889                         process_scheduled_works(worker);
1890                 }
1891         } while (keep_working(gcwq));
1892
1893         worker_set_flags(worker, WORKER_PREP, false);
1894 sleep:
1895         if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
1896                 goto recheck;
1897
1898         /*
1899          * gcwq->lock is held and there's no work to process and no
1900          * need to manage, sleep.  Workers are woken up only while
1901          * holding gcwq->lock or from local cpu, so setting the
1902          * current state before releasing gcwq->lock is enough to
1903          * prevent losing any event.
1904          */
1905         worker_enter_idle(worker);
1906         __set_current_state(TASK_INTERRUPTIBLE);
1907         spin_unlock_irq(&gcwq->lock);
1908         schedule();
1909         goto woke_up;
1910 }
1911
1912 /**
1913  * rescuer_thread - the rescuer thread function
1914  * @__wq: the associated workqueue
1915  *
1916  * Workqueue rescuer thread function.  There's one rescuer for each
1917  * workqueue which has WQ_RESCUER set.
1918  *
1919  * Regular work processing on a gcwq may block trying to create a new
1920  * worker which uses GFP_KERNEL allocation which has slight chance of
1921  * developing into deadlock if some works currently on the same queue
1922  * need to be processed to satisfy the GFP_KERNEL allocation.  This is
1923  * the problem rescuer solves.
1924  *
1925  * When such condition is possible, the gcwq summons rescuers of all
1926  * workqueues which have works queued on the gcwq and let them process
1927  * those works so that forward progress can be guaranteed.
1928  *
1929  * This should happen rarely.
1930  */
1931 static int rescuer_thread(void *__wq)
1932 {
1933         struct workqueue_struct *wq = __wq;
1934         struct worker *rescuer = wq->rescuer;
1935         struct list_head *scheduled = &rescuer->scheduled;
1936         bool is_unbound = wq->flags & WQ_UNBOUND;
1937         unsigned int cpu;
1938
1939         set_user_nice(current, RESCUER_NICE_LEVEL);
1940 repeat:
1941         set_current_state(TASK_INTERRUPTIBLE);
1942
1943         if (kthread_should_stop())
1944                 return 0;
1945
1946         /*
1947          * See whether any cpu is asking for help.  Unbounded
1948          * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
1949          */
1950         for_each_mayday_cpu(cpu, wq->mayday_mask) {
1951                 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
1952                 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
1953                 struct global_cwq *gcwq = cwq->gcwq;
1954                 struct work_struct *work, *n;
1955
1956                 __set_current_state(TASK_RUNNING);
1957                 mayday_clear_cpu(cpu, wq->mayday_mask);
1958
1959                 /* migrate to the target cpu if possible */
1960                 rescuer->gcwq = gcwq;
1961                 worker_maybe_bind_and_lock(rescuer);
1962
1963                 /*
1964                  * Slurp in all works issued via this workqueue and
1965                  * process'em.
1966                  */
1967                 BUG_ON(!list_empty(&rescuer->scheduled));
1968                 list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
1969                         if (get_work_cwq(work) == cwq)
1970                                 move_linked_works(work, scheduled, &n);
1971
1972                 process_scheduled_works(rescuer);
1973                 spin_unlock_irq(&gcwq->lock);
1974         }
1975
1976         schedule();
1977         goto repeat;
1978 }
1979
1980 struct wq_barrier {
1981         struct work_struct      work;
1982         struct completion       done;
1983 };
1984
1985 static void wq_barrier_func(struct work_struct *work)
1986 {
1987         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
1988         complete(&barr->done);
1989 }
1990
1991 /**
1992  * insert_wq_barrier - insert a barrier work
1993  * @cwq: cwq to insert barrier into
1994  * @barr: wq_barrier to insert
1995  * @target: target work to attach @barr to
1996  * @worker: worker currently executing @target, NULL if @target is not executing
1997  *
1998  * @barr is linked to @target such that @barr is completed only after
1999  * @target finishes execution.  Please note that the ordering
2000  * guarantee is observed only with respect to @target and on the local
2001  * cpu.
2002  *
2003  * Currently, a queued barrier can't be canceled.  This is because
2004  * try_to_grab_pending() can't determine whether the work to be
2005  * grabbed is at the head of the queue and thus can't clear LINKED
2006  * flag of the previous work while there must be a valid next work
2007  * after a work with LINKED flag set.
2008  *
2009  * Note that when @worker is non-NULL, @target may be modified
2010  * underneath us, so we can't reliably determine cwq from @target.
2011  *
2012  * CONTEXT:
2013  * spin_lock_irq(gcwq->lock).
2014  */
2015 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
2016                               struct wq_barrier *barr,
2017                               struct work_struct *target, struct worker *worker)
2018 {
2019         struct list_head *head;
2020         unsigned int linked = 0;
2021
2022         /*
2023          * debugobject calls are safe here even with gcwq->lock locked
2024          * as we know for sure that this will not trigger any of the
2025          * checks and call back into the fixup functions where we
2026          * might deadlock.
2027          */
2028         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
2029         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2030         init_completion(&barr->done);
2031
2032         /*
2033          * If @target is currently being executed, schedule the
2034          * barrier to the worker; otherwise, put it after @target.
2035          */
2036         if (worker)
2037                 head = worker->scheduled.next;
2038         else {
2039                 unsigned long *bits = work_data_bits(target);
2040
2041                 head = target->entry.next;
2042                 /* there can already be other linked works, inherit and set */
2043                 linked = *bits & WORK_STRUCT_LINKED;
2044                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2045         }
2046
2047         debug_work_activate(&barr->work);
2048         insert_work(cwq, &barr->work, head,
2049                     work_color_to_flags(WORK_NO_COLOR) | linked);
2050 }
2051
2052 /**
2053  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
2054  * @wq: workqueue being flushed
2055  * @flush_color: new flush color, < 0 for no-op
2056  * @work_color: new work color, < 0 for no-op
2057  *
2058  * Prepare cwqs for workqueue flushing.
2059  *
2060  * If @flush_color is non-negative, flush_color on all cwqs should be
2061  * -1.  If no cwq has in-flight commands at the specified color, all
2062  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
2063  * has in flight commands, its cwq->flush_color is set to
2064  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
2065  * wakeup logic is armed and %true is returned.
2066  *
2067  * The caller should have initialized @wq->first_flusher prior to
2068  * calling this function with non-negative @flush_color.  If
2069  * @flush_color is negative, no flush color update is done and %false
2070  * is returned.
2071  *
2072  * If @work_color is non-negative, all cwqs should have the same
2073  * work_color which is previous to @work_color and all will be
2074  * advanced to @work_color.
2075  *
2076  * CONTEXT:
2077  * mutex_lock(wq->flush_mutex).
2078  *
2079  * RETURNS:
2080  * %true if @flush_color >= 0 and there's something to flush.  %false
2081  * otherwise.
2082  */
2083 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
2084                                       int flush_color, int work_color)
2085 {
2086         bool wait = false;
2087         unsigned int cpu;
2088
2089         if (flush_color >= 0) {
2090                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
2091                 atomic_set(&wq->nr_cwqs_to_flush, 1);
2092         }
2093
2094         for_each_cwq_cpu(cpu, wq) {
2095                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2096                 struct global_cwq *gcwq = cwq->gcwq;
2097
2098                 spin_lock_irq(&gcwq->lock);
2099
2100                 if (flush_color >= 0) {
2101                         BUG_ON(cwq->flush_color != -1);
2102
2103                         if (cwq->nr_in_flight[flush_color]) {
2104                                 cwq->flush_color = flush_color;
2105                                 atomic_inc(&wq->nr_cwqs_to_flush);
2106                                 wait = true;
2107                         }
2108                 }
2109
2110                 if (work_color >= 0) {
2111                         BUG_ON(work_color != work_next_color(cwq->work_color));
2112                         cwq->work_color = work_color;
2113                 }
2114
2115                 spin_unlock_irq(&gcwq->lock);
2116         }
2117
2118         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
2119                 complete(&wq->first_flusher->done);
2120
2121         return wait;
2122 }
2123
2124 /**
2125  * flush_workqueue - ensure that any scheduled work has run to completion.
2126  * @wq: workqueue to flush
2127  *
2128  * Forces execution of the workqueue and blocks until its completion.
2129  * This is typically used in driver shutdown handlers.
2130  *
2131  * We sleep until all works which were queued on entry have been handled,
2132  * but we are not livelocked by new incoming ones.
2133  */
2134 void flush_workqueue(struct workqueue_struct *wq)
2135 {
2136         struct wq_flusher this_flusher = {
2137                 .list = LIST_HEAD_INIT(this_flusher.list),
2138                 .flush_color = -1,
2139                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
2140         };
2141         int next_color;
2142
2143         lock_map_acquire(&wq->lockdep_map);
2144         lock_map_release(&wq->lockdep_map);
2145
2146         mutex_lock(&wq->flush_mutex);
2147
2148         /*
2149          * Start-to-wait phase
2150          */
2151         next_color = work_next_color(wq->work_color);
2152
2153         if (next_color != wq->flush_color) {
2154                 /*
2155                  * Color space is not full.  The current work_color
2156                  * becomes our flush_color and work_color is advanced
2157                  * by one.
2158                  */
2159                 BUG_ON(!list_empty(&wq->flusher_overflow));
2160                 this_flusher.flush_color = wq->work_color;
2161                 wq->work_color = next_color;
2162
2163                 if (!wq->first_flusher) {
2164                         /* no flush in progress, become the first flusher */
2165                         BUG_ON(wq->flush_color != this_flusher.flush_color);
2166
2167                         wq->first_flusher = &this_flusher;
2168
2169                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
2170                                                        wq->work_color)) {
2171                                 /* nothing to flush, done */
2172                                 wq->flush_color = next_color;
2173                                 wq->first_flusher = NULL;
2174                                 goto out_unlock;
2175                         }
2176                 } else {
2177                         /* wait in queue */
2178                         BUG_ON(wq->flush_color == this_flusher.flush_color);
2179                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
2180                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2181                 }
2182         } else {
2183                 /*
2184                  * Oops, color space is full, wait on overflow queue.
2185                  * The next flush completion will assign us
2186                  * flush_color and transfer to flusher_queue.
2187                  */
2188                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2189         }
2190
2191         mutex_unlock(&wq->flush_mutex);
2192
2193         wait_for_completion(&this_flusher.done);
2194
2195         /*
2196          * Wake-up-and-cascade phase
2197          *
2198          * First flushers are responsible for cascading flushes and
2199          * handling overflow.  Non-first flushers can simply return.
2200          */
2201         if (wq->first_flusher != &this_flusher)
2202                 return;
2203
2204         mutex_lock(&wq->flush_mutex);
2205
2206         /* we might have raced, check again with mutex held */
2207         if (wq->first_flusher != &this_flusher)
2208                 goto out_unlock;
2209
2210         wq->first_flusher = NULL;
2211
2212         BUG_ON(!list_empty(&this_flusher.list));
2213         BUG_ON(wq->flush_color != this_flusher.flush_color);
2214
2215         while (true) {
2216                 struct wq_flusher *next, *tmp;
2217
2218                 /* complete all the flushers sharing the current flush color */
2219                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2220                         if (next->flush_color != wq->flush_color)
2221                                 break;
2222                         list_del_init(&next->list);
2223                         complete(&next->done);
2224                 }
2225
2226                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
2227                        wq->flush_color != work_next_color(wq->work_color));
2228
2229                 /* this flush_color is finished, advance by one */
2230                 wq->flush_color = work_next_color(wq->flush_color);
2231
2232                 /* one color has been freed, handle overflow queue */
2233                 if (!list_empty(&wq->flusher_overflow)) {
2234                         /*
2235                          * Assign the same color to all overflowed
2236                          * flushers, advance work_color and append to
2237                          * flusher_queue.  This is the start-to-wait
2238                          * phase for these overflowed flushers.
2239                          */
2240                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
2241                                 tmp->flush_color = wq->work_color;
2242
2243                         wq->work_color = work_next_color(wq->work_color);
2244
2245                         list_splice_tail_init(&wq->flusher_overflow,
2246                                               &wq->flusher_queue);
2247                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2248                 }
2249
2250                 if (list_empty(&wq->flusher_queue)) {
2251                         BUG_ON(wq->flush_color != wq->work_color);
2252                         break;
2253                 }
2254
2255                 /*
2256                  * Need to flush more colors.  Make the next flusher
2257                  * the new first flusher and arm cwqs.
2258                  */
2259                 BUG_ON(wq->flush_color == wq->work_color);
2260                 BUG_ON(wq->flush_color != next->flush_color);
2261
2262                 list_del_init(&next->list);
2263                 wq->first_flusher = next;
2264
2265                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
2266                         break;
2267
2268                 /*
2269                  * Meh... this color is already done, clear first
2270                  * flusher and repeat cascading.
2271                  */
2272                 wq->first_flusher = NULL;
2273         }
2274
2275 out_unlock:
2276         mutex_unlock(&wq->flush_mutex);
2277 }
2278 EXPORT_SYMBOL_GPL(flush_workqueue);
2279
2280 /**
2281  * flush_work - block until a work_struct's callback has terminated
2282  * @work: the work which is to be flushed
2283  *
2284  * Returns false if @work has already terminated.
2285  *
2286  * It is expected that, prior to calling flush_work(), the caller has
2287  * arranged for the work to not be requeued, otherwise it doesn't make
2288  * sense to use this function.
2289  */
2290 int flush_work(struct work_struct *work)
2291 {
2292         struct worker *worker = NULL;
2293         struct global_cwq *gcwq;
2294         struct cpu_workqueue_struct *cwq;
2295         struct wq_barrier barr;
2296
2297         might_sleep();
2298         gcwq = get_work_gcwq(work);
2299         if (!gcwq)
2300                 return 0;
2301
2302         spin_lock_irq(&gcwq->lock);
2303         if (!list_empty(&work->entry)) {
2304                 /*
2305                  * See the comment near try_to_grab_pending()->smp_rmb().
2306                  * If it was re-queued to a different gcwq under us, we
2307                  * are not going to wait.
2308                  */
2309                 smp_rmb();
2310                 cwq = get_work_cwq(work);
2311                 if (unlikely(!cwq || gcwq != cwq->gcwq))
2312                         goto already_gone;
2313         } else {
2314                 worker = find_worker_executing_work(gcwq, work);
2315                 if (!worker)
2316                         goto already_gone;
2317                 cwq = worker->current_cwq;
2318         }
2319
2320         insert_wq_barrier(cwq, &barr, work, worker);
2321         spin_unlock_irq(&gcwq->lock);
2322
2323         lock_map_acquire(&cwq->wq->lockdep_map);
2324         lock_map_release(&cwq->wq->lockdep_map);
2325
2326         wait_for_completion(&barr.done);
2327         destroy_work_on_stack(&barr.work);
2328         return 1;
2329 already_gone:
2330         spin_unlock_irq(&gcwq->lock);
2331         return 0;
2332 }
2333 EXPORT_SYMBOL_GPL(flush_work);
2334
2335 /*
2336  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2337  * so this work can't be re-armed in any way.
2338  */
2339 static int try_to_grab_pending(struct work_struct *work)
2340 {
2341         struct global_cwq *gcwq;
2342         int ret = -1;
2343
2344         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2345                 return 0;
2346
2347         /*
2348          * The queueing is in progress, or it is already queued. Try to
2349          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2350          */
2351         gcwq = get_work_gcwq(work);
2352         if (!gcwq)
2353                 return ret;
2354
2355         spin_lock_irq(&gcwq->lock);
2356         if (!list_empty(&work->entry)) {
2357                 /*
2358                  * This work is queued, but perhaps we locked the wrong gcwq.
2359                  * In that case we must see the new value after rmb(), see
2360                  * insert_work()->wmb().
2361                  */
2362                 smp_rmb();
2363                 if (gcwq == get_work_gcwq(work)) {
2364                         debug_work_deactivate(work);
2365                         list_del_init(&work->entry);
2366                         cwq_dec_nr_in_flight(get_work_cwq(work),
2367                                              get_work_color(work));
2368                         ret = 1;
2369                 }
2370         }
2371         spin_unlock_irq(&gcwq->lock);
2372
2373         return ret;
2374 }
2375
2376 static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2377 {
2378         struct wq_barrier barr;
2379         struct worker *worker;
2380
2381         spin_lock_irq(&gcwq->lock);
2382
2383         worker = find_worker_executing_work(gcwq, work);
2384         if (unlikely(worker))
2385                 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2386
2387         spin_unlock_irq(&gcwq->lock);
2388
2389         if (unlikely(worker)) {
2390                 wait_for_completion(&barr.done);
2391                 destroy_work_on_stack(&barr.work);
2392         }
2393 }
2394
2395 static void wait_on_work(struct work_struct *work)
2396 {
2397         int cpu;
2398
2399         might_sleep();
2400
2401         lock_map_acquire(&work->lockdep_map);
2402         lock_map_release(&work->lockdep_map);
2403
2404         for_each_gcwq_cpu(cpu)
2405                 wait_on_cpu_work(get_gcwq(cpu), work);
2406 }
2407
2408 static int __cancel_work_timer(struct work_struct *work,
2409                                 struct timer_list* timer)
2410 {
2411         int ret;
2412
2413         do {
2414                 ret = (timer && likely(del_timer(timer)));
2415                 if (!ret)
2416                         ret = try_to_grab_pending(work);
2417                 wait_on_work(work);
2418         } while (unlikely(ret < 0));
2419
2420         clear_work_data(work);
2421         return ret;
2422 }
2423
2424 /**
2425  * cancel_work_sync - block until a work_struct's callback has terminated
2426  * @work: the work which is to be flushed
2427  *
2428  * Returns true if @work was pending.
2429  *
2430  * cancel_work_sync() will cancel the work if it is queued. If the work's
2431  * callback appears to be running, cancel_work_sync() will block until it
2432  * has completed.
2433  *
2434  * It is possible to use this function if the work re-queues itself. It can
2435  * cancel the work even if it migrates to another workqueue, however in that
2436  * case it only guarantees that work->func() has completed on the last queued
2437  * workqueue.
2438  *
2439  * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
2440  * pending, otherwise it goes into a busy-wait loop until the timer expires.
2441  *
2442  * The caller must ensure that workqueue_struct on which this work was last
2443  * queued can't be destroyed before this function returns.
2444  */
2445 int cancel_work_sync(struct work_struct *work)
2446 {
2447         return __cancel_work_timer(work, NULL);
2448 }
2449 EXPORT_SYMBOL_GPL(cancel_work_sync);
2450
2451 /**
2452  * cancel_delayed_work_sync - reliably kill off a delayed work.
2453  * @dwork: the delayed work struct
2454  *
2455  * Returns true if @dwork was pending.
2456  *
2457  * It is possible to use this function if @dwork rearms itself via queue_work()
2458  * or queue_delayed_work(). See also the comment for cancel_work_sync().
2459  */
2460 int cancel_delayed_work_sync(struct delayed_work *dwork)
2461 {
2462         return __cancel_work_timer(&dwork->work, &dwork->timer);
2463 }
2464 EXPORT_SYMBOL(cancel_delayed_work_sync);
2465
2466 /**
2467  * schedule_work - put work task in global workqueue
2468  * @work: job to be done
2469  *
2470  * Returns zero if @work was already on the kernel-global workqueue and
2471  * non-zero otherwise.
2472  *
2473  * This puts a job in the kernel-global workqueue if it was not already
2474  * queued and leaves it in the same position on the kernel-global
2475  * workqueue otherwise.
2476  */
2477 int schedule_work(struct work_struct *work)
2478 {
2479         return queue_work(system_wq, work);
2480 }
2481 EXPORT_SYMBOL(schedule_work);
2482
2483 /*
2484  * schedule_work_on - put work task on a specific cpu
2485  * @cpu: cpu to put the work task on
2486  * @work: job to be done
2487  *
2488  * This puts a job on a specific cpu
2489  */
2490 int schedule_work_on(int cpu, struct work_struct *work)
2491 {
2492         return queue_work_on(cpu, system_wq, work);
2493 }
2494 EXPORT_SYMBOL(schedule_work_on);
2495
2496 /**
2497  * schedule_delayed_work - put work task in global workqueue after delay
2498  * @dwork: job to be done
2499  * @delay: number of jiffies to wait or 0 for immediate execution
2500  *
2501  * After waiting for a given time this puts a job in the kernel-global
2502  * workqueue.
2503  */
2504 int schedule_delayed_work(struct delayed_work *dwork,
2505                                         unsigned long delay)
2506 {
2507         return queue_delayed_work(system_wq, dwork, delay);
2508 }
2509 EXPORT_SYMBOL(schedule_delayed_work);
2510
2511 /**
2512  * flush_delayed_work - block until a dwork_struct's callback has terminated
2513  * @dwork: the delayed work which is to be flushed
2514  *
2515  * Any timeout is cancelled, and any pending work is run immediately.
2516  */
2517 void flush_delayed_work(struct delayed_work *dwork)
2518 {
2519         if (del_timer_sync(&dwork->timer)) {
2520                 __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
2521                              &dwork->work);
2522                 put_cpu();
2523         }
2524         flush_work(&dwork->work);
2525 }
2526 EXPORT_SYMBOL(flush_delayed_work);
2527
2528 /**
2529  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
2530  * @cpu: cpu to use
2531  * @dwork: job to be done
2532  * @delay: number of jiffies to wait
2533  *
2534  * After waiting for a given time this puts a job in the kernel-global
2535  * workqueue on the specified CPU.
2536  */
2537 int schedule_delayed_work_on(int cpu,
2538                         struct delayed_work *dwork, unsigned long delay)
2539 {
2540         return queue_delayed_work_on(cpu, system_wq, dwork, delay);
2541 }
2542 EXPORT_SYMBOL(schedule_delayed_work_on);
2543
2544 /**
2545  * schedule_on_each_cpu - call a function on each online CPU from keventd
2546  * @func: the function to call
2547  *
2548  * Returns zero on success.
2549  * Returns -ve errno on failure.
2550  *
2551  * schedule_on_each_cpu() is very slow.
2552  */
2553 int schedule_on_each_cpu(work_func_t func)
2554 {
2555         int cpu;
2556         struct work_struct *works;
2557
2558         works = alloc_percpu(struct work_struct);
2559         if (!works)
2560                 return -ENOMEM;
2561
2562         get_online_cpus();
2563
2564         for_each_online_cpu(cpu) {
2565                 struct work_struct *work = per_cpu_ptr(works, cpu);
2566
2567                 INIT_WORK(work, func);
2568                 schedule_work_on(cpu, work);
2569         }
2570
2571         for_each_online_cpu(cpu)
2572                 flush_work(per_cpu_ptr(works, cpu));
2573
2574         put_online_cpus();
2575         free_percpu(works);
2576         return 0;
2577 }
2578
2579 /**
2580  * flush_scheduled_work - ensure that any scheduled work has run to completion.
2581  *
2582  * Forces execution of the kernel-global workqueue and blocks until its
2583  * completion.
2584  *
2585  * Think twice before calling this function!  It's very easy to get into
2586  * trouble if you don't take great care.  Either of the following situations
2587  * will lead to deadlock:
2588  *
2589  *      One of the work items currently on the workqueue needs to acquire
2590  *      a lock held by your code or its caller.
2591  *
2592  *      Your code is running in the context of a work routine.
2593  *
2594  * They will be detected by lockdep when they occur, but the first might not
2595  * occur very often.  It depends on what work items are on the workqueue and
2596  * what locks they need, which you have no control over.
2597  *
2598  * In most situations flushing the entire workqueue is overkill; you merely
2599  * need to know that a particular work item isn't queued and isn't running.
2600  * In such cases you should use cancel_delayed_work_sync() or
2601  * cancel_work_sync() instead.
2602  */
2603 void flush_scheduled_work(void)
2604 {
2605         flush_workqueue(system_wq);
2606 }
2607 EXPORT_SYMBOL(flush_scheduled_work);
2608
2609 /**
2610  * execute_in_process_context - reliably execute the routine with user context
2611  * @fn:         the function to execute
2612  * @ew:         guaranteed storage for the execute work structure (must
2613  *              be available when the work executes)
2614  *
2615  * Executes the function immediately if process context is available,
2616  * otherwise schedules the function for delayed execution.
2617  *
2618  * Returns:     0 - function was executed
2619  *              1 - function was scheduled for execution
2620  */
2621 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
2622 {
2623         if (!in_interrupt()) {
2624                 fn(&ew->work);
2625                 return 0;
2626         }
2627
2628         INIT_WORK(&ew->work, fn);
2629         schedule_work(&ew->work);
2630
2631         return 1;
2632 }
2633 EXPORT_SYMBOL_GPL(execute_in_process_context);
2634
2635 int keventd_up(void)
2636 {
2637         return system_wq != NULL;
2638 }
2639
2640 static int alloc_cwqs(struct workqueue_struct *wq)
2641 {
2642         /*
2643          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
2644          * Make sure that the alignment isn't lower than that of
2645          * unsigned long long.
2646          */
2647         const size_t size = sizeof(struct cpu_workqueue_struct);
2648         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
2649                                    __alignof__(unsigned long long));
2650 #ifdef CONFIG_SMP
2651         bool percpu = !(wq->flags & WQ_UNBOUND);
2652 #else
2653         bool percpu = false;
2654 #endif
2655
2656         if (percpu)
2657                 wq->cpu_wq.pcpu = __alloc_percpu(size, align);
2658         else {
2659                 void *ptr;
2660
2661                 /*
2662                  * Allocate enough room to align cwq and put an extra
2663                  * pointer at the end pointing back to the originally
2664                  * allocated pointer which will be used for free.
2665                  */
2666                 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
2667                 if (ptr) {
2668                         wq->cpu_wq.single = PTR_ALIGN(ptr, align);
2669                         *(void **)(wq->cpu_wq.single + 1) = ptr;
2670                 }
2671         }
2672
2673         /* just in case, make sure it's actually aligned */
2674         BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
2675         return wq->cpu_wq.v ? 0 : -ENOMEM;
2676 }
2677
2678 static void free_cwqs(struct workqueue_struct *wq)
2679 {
2680 #ifdef CONFIG_SMP
2681         bool percpu = !(wq->flags & WQ_UNBOUND);
2682 #else
2683         bool percpu = false;
2684 #endif
2685
2686         if (percpu)
2687                 free_percpu(wq->cpu_wq.pcpu);
2688         else if (wq->cpu_wq.single) {
2689                 /* the pointer to free is stored right after the cwq */
2690                 kfree(*(void **)(wq->cpu_wq.single + 1));
2691         }
2692 }
2693
2694 static int wq_clamp_max_active(int max_active, unsigned int flags,
2695                                const char *name)
2696 {
2697         int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
2698
2699         if (max_active < 1 || max_active > lim)
2700                 printk(KERN_WARNING "workqueue: max_active %d requested for %s "
2701                        "is out of range, clamping between %d and %d\n",
2702                        max_active, name, 1, lim);
2703
2704         return clamp_val(max_active, 1, lim);
2705 }
2706
2707 struct workqueue_struct *__alloc_workqueue_key(const char *name,
2708                                                unsigned int flags,
2709                                                int max_active,
2710                                                struct lock_class_key *key,
2711                                                const char *lock_name)
2712 {
2713         struct workqueue_struct *wq;
2714         unsigned int cpu;
2715
2716         /*
2717          * Unbound workqueues aren't concurrency managed and should be
2718          * dispatched to workers immediately.
2719          */
2720         if (flags & WQ_UNBOUND)
2721                 flags |= WQ_HIGHPRI;
2722
2723         max_active = max_active ?: WQ_DFL_ACTIVE;
2724         max_active = wq_clamp_max_active(max_active, flags, name);
2725
2726         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
2727         if (!wq)
2728                 goto err;
2729
2730         wq->flags = flags;
2731         wq->saved_max_active = max_active;
2732         mutex_init(&wq->flush_mutex);
2733         atomic_set(&wq->nr_cwqs_to_flush, 0);
2734         INIT_LIST_HEAD(&wq->flusher_queue);
2735         INIT_LIST_HEAD(&wq->flusher_overflow);
2736
2737         wq->name = name;
2738         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
2739         INIT_LIST_HEAD(&wq->list);
2740
2741         if (alloc_cwqs(wq) < 0)
2742                 goto err;
2743
2744         for_each_cwq_cpu(cpu, wq) {
2745                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2746                 struct global_cwq *gcwq = get_gcwq(cpu);
2747
2748                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
2749                 cwq->gcwq = gcwq;
2750                 cwq->wq = wq;
2751                 cwq->flush_color = -1;
2752                 cwq->max_active = max_active;
2753                 INIT_LIST_HEAD(&cwq->delayed_works);
2754         }
2755
2756         if (flags & WQ_RESCUER) {
2757                 struct worker *rescuer;
2758
2759                 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
2760                         goto err;
2761
2762                 wq->rescuer = rescuer = alloc_worker();
2763                 if (!rescuer)
2764                         goto err;
2765
2766                 rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
2767                 if (IS_ERR(rescuer->task))
2768                         goto err;
2769
2770                 wq->rescuer = rescuer;
2771                 rescuer->task->flags |= PF_THREAD_BOUND;
2772                 wake_up_process(rescuer->task);
2773         }
2774
2775         /*
2776          * workqueue_lock protects global freeze state and workqueues
2777          * list.  Grab it, set max_active accordingly and add the new
2778          * workqueue to workqueues list.
2779          */
2780         spin_lock(&workqueue_lock);
2781
2782         if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
2783                 for_each_cwq_cpu(cpu, wq)
2784                         get_cwq(cpu, wq)->max_active = 0;
2785
2786         list_add(&wq->list, &workqueues);
2787
2788         spin_unlock(&workqueue_lock);
2789
2790         return wq;
2791 err:
2792         if (wq) {
2793                 free_cwqs(wq);
2794                 free_mayday_mask(wq->mayday_mask);
2795                 kfree(wq->rescuer);
2796                 kfree(wq);
2797         }
2798         return NULL;
2799 }
2800 EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
2801
2802 /**
2803  * destroy_workqueue - safely terminate a workqueue
2804  * @wq: target workqueue
2805  *
2806  * Safely destroy a workqueue. All work currently pending will be done first.
2807  */
2808 void destroy_workqueue(struct workqueue_struct *wq)
2809 {
2810         unsigned int cpu;
2811
2812         flush_workqueue(wq);
2813
2814         /*
2815          * wq list is used to freeze wq, remove from list after
2816          * flushing is complete in case freeze races us.
2817          */
2818         spin_lock(&workqueue_lock);
2819         list_del(&wq->list);
2820         spin_unlock(&workqueue_lock);
2821
2822         /* sanity check */
2823         for_each_cwq_cpu(cpu, wq) {
2824                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2825                 int i;
2826
2827                 for (i = 0; i < WORK_NR_COLORS; i++)
2828                         BUG_ON(cwq->nr_in_flight[i]);
2829                 BUG_ON(cwq->nr_active);
2830                 BUG_ON(!list_empty(&cwq->delayed_works));
2831         }
2832
2833         if (wq->flags & WQ_RESCUER) {
2834                 kthread_stop(wq->rescuer->task);
2835                 free_mayday_mask(wq->mayday_mask);
2836         }
2837
2838         free_cwqs(wq);
2839         kfree(wq);
2840 }
2841 EXPORT_SYMBOL_GPL(destroy_workqueue);
2842
2843 /**
2844  * workqueue_set_max_active - adjust max_active of a workqueue
2845  * @wq: target workqueue
2846  * @max_active: new max_active value.
2847  *
2848  * Set max_active of @wq to @max_active.
2849  *
2850  * CONTEXT:
2851  * Don't call from IRQ context.
2852  */
2853 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
2854 {
2855         unsigned int cpu;
2856
2857         max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
2858
2859         spin_lock(&workqueue_lock);
2860
2861         wq->saved_max_active = max_active;
2862
2863         for_each_cwq_cpu(cpu, wq) {
2864                 struct global_cwq *gcwq = get_gcwq(cpu);
2865
2866                 spin_lock_irq(&gcwq->lock);
2867
2868                 if (!(wq->flags & WQ_FREEZEABLE) ||
2869                     !(gcwq->flags & GCWQ_FREEZING))
2870                         get_cwq(gcwq->cpu, wq)->max_active = max_active;
2871
2872                 spin_unlock_irq(&gcwq->lock);
2873         }
2874
2875         spin_unlock(&workqueue_lock);
2876 }
2877 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
2878
2879 /**
2880  * workqueue_congested - test whether a workqueue is congested
2881  * @cpu: CPU in question
2882  * @wq: target workqueue
2883  *
2884  * Test whether @wq's cpu workqueue for @cpu is congested.  There is
2885  * no synchronization around this function and the test result is
2886  * unreliable and only useful as advisory hints or for debugging.
2887  *
2888  * RETURNS:
2889  * %true if congested, %false otherwise.
2890  */
2891 bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
2892 {
2893         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2894
2895         return !list_empty(&cwq->delayed_works);
2896 }
2897 EXPORT_SYMBOL_GPL(workqueue_congested);
2898
2899 /**
2900  * work_cpu - return the last known associated cpu for @work
2901  * @work: the work of interest
2902  *
2903  * RETURNS:
2904  * CPU number if @work was ever queued.  WORK_CPU_NONE otherwise.
2905  */
2906 unsigned int work_cpu(struct work_struct *work)
2907 {
2908         struct global_cwq *gcwq = get_work_gcwq(work);
2909
2910         return gcwq ? gcwq->cpu : WORK_CPU_NONE;
2911 }
2912 EXPORT_SYMBOL_GPL(work_cpu);
2913
2914 /**
2915  * work_busy - test whether a work is currently pending or running
2916  * @work: the work to be tested
2917  *
2918  * Test whether @work is currently pending or running.  There is no
2919  * synchronization around this function and the test result is
2920  * unreliable and only useful as advisory hints or for debugging.
2921  * Especially for reentrant wqs, the pending state might hide the
2922  * running state.
2923  *
2924  * RETURNS:
2925  * OR'd bitmask of WORK_BUSY_* bits.
2926  */
2927 unsigned int work_busy(struct work_struct *work)
2928 {
2929         struct global_cwq *gcwq = get_work_gcwq(work);
2930         unsigned long flags;
2931         unsigned int ret = 0;
2932
2933         if (!gcwq)
2934                 return false;
2935
2936         spin_lock_irqsave(&gcwq->lock, flags);
2937
2938         if (work_pending(work))
2939                 ret |= WORK_BUSY_PENDING;
2940         if (find_worker_executing_work(gcwq, work))
2941                 ret |= WORK_BUSY_RUNNING;
2942
2943         spin_unlock_irqrestore(&gcwq->lock, flags);
2944
2945         return ret;
2946 }
2947 EXPORT_SYMBOL_GPL(work_busy);
2948
2949 /*
2950  * CPU hotplug.
2951  *
2952  * There are two challenges in supporting CPU hotplug.  Firstly, there
2953  * are a lot of assumptions on strong associations among work, cwq and
2954  * gcwq which make migrating pending and scheduled works very
2955  * difficult to implement without impacting hot paths.  Secondly,
2956  * gcwqs serve mix of short, long and very long running works making
2957  * blocked draining impractical.
2958  *
2959  * This is solved by allowing a gcwq to be detached from CPU, running
2960  * it with unbound (rogue) workers and allowing it to be reattached
2961  * later if the cpu comes back online.  A separate thread is created
2962  * to govern a gcwq in such state and is called the trustee of the
2963  * gcwq.
2964  *
2965  * Trustee states and their descriptions.
2966  *
2967  * START        Command state used on startup.  On CPU_DOWN_PREPARE, a
2968  *              new trustee is started with this state.
2969  *
2970  * IN_CHARGE    Once started, trustee will enter this state after
2971  *              assuming the manager role and making all existing
2972  *              workers rogue.  DOWN_PREPARE waits for trustee to
2973  *              enter this state.  After reaching IN_CHARGE, trustee
2974  *              tries to execute the pending worklist until it's empty
2975  *              and the state is set to BUTCHER, or the state is set
2976  *              to RELEASE.
2977  *
2978  * BUTCHER      Command state which is set by the cpu callback after
2979  *              the cpu has went down.  Once this state is set trustee
2980  *              knows that there will be no new works on the worklist
2981  *              and once the worklist is empty it can proceed to
2982  *              killing idle workers.
2983  *
2984  * RELEASE      Command state which is set by the cpu callback if the
2985  *              cpu down has been canceled or it has come online
2986  *              again.  After recognizing this state, trustee stops
2987  *              trying to drain or butcher and clears ROGUE, rebinds
2988  *              all remaining workers back to the cpu and releases
2989  *              manager role.
2990  *
2991  * DONE         Trustee will enter this state after BUTCHER or RELEASE
2992  *              is complete.
2993  *
2994  *          trustee                 CPU                draining
2995  *         took over                down               complete
2996  * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
2997  *                        |                     |                  ^
2998  *                        | CPU is back online  v   return workers |
2999  *                         ----------------> RELEASE --------------
3000  */
3001
3002 /**
3003  * trustee_wait_event_timeout - timed event wait for trustee
3004  * @cond: condition to wait for
3005  * @timeout: timeout in jiffies
3006  *
3007  * wait_event_timeout() for trustee to use.  Handles locking and
3008  * checks for RELEASE request.
3009  *
3010  * CONTEXT:
3011  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3012  * multiple times.  To be used by trustee.
3013  *
3014  * RETURNS:
3015  * Positive indicating left time if @cond is satisfied, 0 if timed
3016  * out, -1 if canceled.
3017  */
3018 #define trustee_wait_event_timeout(cond, timeout) ({                    \
3019         long __ret = (timeout);                                         \
3020         while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
3021                __ret) {                                                 \
3022                 spin_unlock_irq(&gcwq->lock);                           \
3023                 __wait_event_timeout(gcwq->trustee_wait, (cond) ||      \
3024                         (gcwq->trustee_state == TRUSTEE_RELEASE),       \
3025                         __ret);                                         \
3026                 spin_lock_irq(&gcwq->lock);                             \
3027         }                                                               \
3028         gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);          \
3029 })
3030
3031 /**
3032  * trustee_wait_event - event wait for trustee
3033  * @cond: condition to wait for
3034  *
3035  * wait_event() for trustee to use.  Automatically handles locking and
3036  * checks for CANCEL request.
3037  *
3038  * CONTEXT:
3039  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3040  * multiple times.  To be used by trustee.
3041  *
3042  * RETURNS:
3043  * 0 if @cond is satisfied, -1 if canceled.
3044  */
3045 #define trustee_wait_event(cond) ({                                     \
3046         long __ret1;                                                    \
3047         __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
3048         __ret1 < 0 ? -1 : 0;                                            \
3049 })
3050
3051 static int __cpuinit trustee_thread(void *__gcwq)
3052 {
3053         struct global_cwq *gcwq = __gcwq;
3054         struct worker *worker;
3055         struct work_struct *work;
3056         struct hlist_node *pos;
3057         long rc;
3058         int i;
3059
3060         BUG_ON(gcwq->cpu != smp_processor_id());
3061
3062         spin_lock_irq(&gcwq->lock);
3063         /*
3064          * Claim the manager position and make all workers rogue.
3065          * Trustee must be bound to the target cpu and can't be
3066          * cancelled.
3067          */
3068         BUG_ON(gcwq->cpu != smp_processor_id());
3069         rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
3070         BUG_ON(rc < 0);
3071
3072         gcwq->flags |= GCWQ_MANAGING_WORKERS;
3073
3074         list_for_each_entry(worker, &gcwq->idle_list, entry)
3075                 worker->flags |= WORKER_ROGUE;
3076
3077         for_each_busy_worker(worker, i, pos, gcwq)
3078                 worker->flags |= WORKER_ROGUE;
3079
3080         /*
3081          * Call schedule() so that we cross rq->lock and thus can
3082          * guarantee sched callbacks see the rogue flag.  This is
3083          * necessary as scheduler callbacks may be invoked from other
3084          * cpus.
3085          */
3086         spin_unlock_irq(&gcwq->lock);
3087         schedule();
3088         spin_lock_irq(&gcwq->lock);
3089
3090         /*
3091          * Sched callbacks are disabled now.  Zap nr_running.  After
3092          * this, nr_running stays zero and need_more_worker() and
3093          * keep_working() are always true as long as the worklist is
3094          * not empty.
3095          */
3096         atomic_set(get_gcwq_nr_running(gcwq->cpu), 0);
3097
3098         spin_unlock_irq(&gcwq->lock);
3099         del_timer_sync(&gcwq->idle_timer);
3100         spin_lock_irq(&gcwq->lock);
3101
3102         /*
3103          * We're now in charge.  Notify and proceed to drain.  We need
3104          * to keep the gcwq running during the whole CPU down
3105          * procedure as other cpu hotunplug callbacks may need to
3106          * flush currently running tasks.
3107          */
3108         gcwq->trustee_state = TRUSTEE_IN_CHARGE;
3109         wake_up_all(&gcwq->trustee_wait);
3110
3111         /*
3112          * The original cpu is in the process of dying and may go away
3113          * anytime now.  When that happens, we and all workers would
3114          * be migrated to other cpus.  Try draining any left work.  We
3115          * want to get it over with ASAP - spam rescuers, wake up as
3116          * many idlers as necessary and create new ones till the
3117          * worklist is empty.  Note that if the gcwq is frozen, there
3118          * may be frozen works in freezeable cwqs.  Don't declare
3119          * completion while frozen.
3120          */
3121         while (gcwq->nr_workers != gcwq->nr_idle ||
3122                gcwq->flags & GCWQ_FREEZING ||
3123                gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
3124                 int nr_works = 0;
3125
3126                 list_for_each_entry(work, &gcwq->worklist, entry) {
3127                         send_mayday(work);
3128                         nr_works++;
3129                 }
3130
3131                 list_for_each_entry(worker, &gcwq->idle_list, entry) {
3132                         if (!nr_works--)
3133                                 break;
3134                         wake_up_process(worker->task);
3135                 }
3136
3137                 if (need_to_create_worker(gcwq)) {
3138                         spin_unlock_irq(&gcwq->lock);
3139                         worker = create_worker(gcwq, false);
3140                         spin_lock_irq(&gcwq->lock);
3141                         if (worker) {
3142                                 worker->flags |= WORKER_ROGUE;
3143                                 start_worker(worker);
3144                         }
3145                 }
3146
3147                 /* give a breather */
3148                 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
3149                         break;
3150         }
3151
3152         /*
3153          * Either all works have been scheduled and cpu is down, or
3154          * cpu down has already been canceled.  Wait for and butcher
3155          * all workers till we're canceled.
3156          */
3157         do {
3158                 rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
3159                 while (!list_empty(&gcwq->idle_list))
3160                         destroy_worker(list_first_entry(&gcwq->idle_list,
3161                                                         struct worker, entry));
3162         } while (gcwq->nr_workers && rc >= 0);
3163
3164         /*
3165          * At this point, either draining has completed and no worker
3166          * is left, or cpu down has been canceled or the cpu is being
3167          * brought back up.  There shouldn't be any idle one left.
3168          * Tell the remaining busy ones to rebind once it finishes the
3169          * currently scheduled works by scheduling the rebind_work.
3170          */
3171         WARN_ON(!list_empty(&gcwq->idle_list));
3172
3173         for_each_busy_worker(worker, i, pos, gcwq) {
3174                 struct work_struct *rebind_work = &worker->rebind_work;
3175
3176                 /*
3177                  * Rebind_work may race with future cpu hotplug
3178                  * operations.  Use a separate flag to mark that
3179                  * rebinding is scheduled.
3180                  */
3181                 worker->flags |= WORKER_REBIND;
3182                 worker->flags &= ~WORKER_ROGUE;
3183
3184                 /* queue rebind_work, wq doesn't matter, use the default one */
3185                 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
3186                                      work_data_bits(rebind_work)))
3187                         continue;
3188
3189                 debug_work_activate(rebind_work);
3190                 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
3191                             worker->scheduled.next,
3192                             work_color_to_flags(WORK_NO_COLOR));
3193         }
3194
3195         /* relinquish manager role */
3196         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
3197
3198         /* notify completion */
3199         gcwq->trustee = NULL;
3200         gcwq->trustee_state = TRUSTEE_DONE;
3201         wake_up_all(&gcwq->trustee_wait);
3202         spin_unlock_irq(&gcwq->lock);
3203         return 0;
3204 }
3205
3206 /**
3207  * wait_trustee_state - wait for trustee to enter the specified state
3208  * @gcwq: gcwq the trustee of interest belongs to
3209  * @state: target state to wait for
3210  *
3211  * Wait for the trustee to reach @state.  DONE is already matched.
3212  *
3213  * CONTEXT:
3214  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3215  * multiple times.  To be used by cpu_callback.
3216  */
3217 static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
3218 {
3219         if (!(gcwq->trustee_state == state ||
3220               gcwq->trustee_state == TRUSTEE_DONE)) {
3221                 spin_unlock_irq(&gcwq->lock);
3222                 __wait_event(gcwq->trustee_wait,
3223                              gcwq->trustee_state == state ||
3224                              gcwq->trustee_state == TRUSTEE_DONE);
3225                 spin_lock_irq(&gcwq->lock);
3226         }
3227 }
3228
3229 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
3230                                                 unsigned long action,
3231                                                 void *hcpu)
3232 {
3233         unsigned int cpu = (unsigned long)hcpu;
3234         struct global_cwq *gcwq = get_gcwq(cpu);
3235         struct task_struct *new_trustee = NULL;
3236         struct worker *uninitialized_var(new_worker);
3237         unsigned long flags;
3238
3239         action &= ~CPU_TASKS_FROZEN;
3240
3241         switch (action) {
3242         case CPU_DOWN_PREPARE:
3243                 new_trustee = kthread_create(trustee_thread, gcwq,
3244                                              "workqueue_trustee/%d\n", cpu);
3245                 if (IS_ERR(new_trustee))
3246                         return notifier_from_errno(PTR_ERR(new_trustee));
3247                 kthread_bind(new_trustee, cpu);
3248                 /* fall through */
3249         case CPU_UP_PREPARE:
3250                 BUG_ON(gcwq->first_idle);
3251                 new_worker = create_worker(gcwq, false);
3252                 if (!new_worker) {
3253                         if (new_trustee)
3254                                 kthread_stop(new_trustee);
3255                         return NOTIFY_BAD;
3256                 }
3257         }
3258
3259         /* some are called w/ irq disabled, don't disturb irq status */
3260         spin_lock_irqsave(&gcwq->lock, flags);
3261
3262         switch (action) {
3263         case CPU_DOWN_PREPARE:
3264                 /* initialize trustee and tell it to acquire the gcwq */
3265                 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
3266                 gcwq->trustee = new_trustee;
3267                 gcwq->trustee_state = TRUSTEE_START;
3268                 wake_up_process(gcwq->trustee);
3269                 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
3270                 /* fall through */
3271         case CPU_UP_PREPARE:
3272                 BUG_ON(gcwq->first_idle);
3273                 gcwq->first_idle = new_worker;
3274                 break;
3275
3276         case CPU_DYING:
3277                 /*
3278                  * Before this, the trustee and all workers except for
3279                  * the ones which are still executing works from
3280                  * before the last CPU down must be on the cpu.  After
3281                  * this, they'll all be diasporas.
3282                  */
3283                 gcwq->flags |= GCWQ_DISASSOCIATED;
3284                 break;
3285
3286         case CPU_POST_DEAD:
3287                 gcwq->trustee_state = TRUSTEE_BUTCHER;
3288                 /* fall through */
3289         case CPU_UP_CANCELED:
3290                 destroy_worker(gcwq->first_idle);
3291                 gcwq->first_idle = NULL;
3292                 break;
3293
3294         case CPU_DOWN_FAILED:
3295         case CPU_ONLINE:
3296                 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3297                 if (gcwq->trustee_state != TRUSTEE_DONE) {
3298                         gcwq->trustee_state = TRUSTEE_RELEASE;
3299                         wake_up_process(gcwq->trustee);
3300                         wait_trustee_state(gcwq, TRUSTEE_DONE);
3301                 }
3302
3303                 /*
3304                  * Trustee is done and there might be no worker left.
3305                  * Put the first_idle in and request a real manager to
3306                  * take a look.
3307                  */
3308                 spin_unlock_irq(&gcwq->lock);
3309                 kthread_bind(gcwq->first_idle->task, cpu);
3310                 spin_lock_irq(&gcwq->lock);
3311                 gcwq->flags |= GCWQ_MANAGE_WORKERS;
3312                 start_worker(gcwq->first_idle);
3313                 gcwq->first_idle = NULL;
3314                 break;
3315         }
3316
3317         spin_unlock_irqrestore(&gcwq->lock, flags);
3318
3319         return notifier_from_errno(0);
3320 }
3321
3322 #ifdef CONFIG_SMP
3323
3324 struct work_for_cpu {
3325         struct completion completion;
3326         long (*fn)(void *);
3327         void *arg;
3328         long ret;
3329 };
3330
3331 static int do_work_for_cpu(void *_wfc)
3332 {
3333         struct work_for_cpu *wfc = _wfc;
3334         wfc->ret = wfc->fn(wfc->arg);
3335         complete(&wfc->completion);
3336         return 0;
3337 }
3338
3339 /**
3340  * work_on_cpu - run a function in user context on a particular cpu
3341  * @cpu: the cpu to run on
3342  * @fn: the function to run
3343  * @arg: the function arg
3344  *
3345  * This will return the value @fn returns.
3346  * It is up to the caller to ensure that the cpu doesn't go offline.
3347  * The caller must not hold any locks which would prevent @fn from completing.
3348  */
3349 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
3350 {
3351         struct task_struct *sub_thread;
3352         struct work_for_cpu wfc = {
3353                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
3354                 .fn = fn,
3355                 .arg = arg,
3356         };
3357
3358         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
3359         if (IS_ERR(sub_thread))
3360                 return PTR_ERR(sub_thread);
3361         kthread_bind(sub_thread, cpu);
3362         wake_up_process(sub_thread);
3363         wait_for_completion(&wfc.completion);
3364         return wfc.ret;
3365 }
3366 EXPORT_SYMBOL_GPL(work_on_cpu);
3367 #endif /* CONFIG_SMP */
3368
3369 #ifdef CONFIG_FREEZER
3370
3371 /**
3372  * freeze_workqueues_begin - begin freezing workqueues
3373  *
3374  * Start freezing workqueues.  After this function returns, all
3375  * freezeable workqueues will queue new works to their frozen_works
3376  * list instead of gcwq->worklist.
3377  *
3378  * CONTEXT:
3379  * Grabs and releases workqueue_lock and gcwq->lock's.
3380  */
3381 void freeze_workqueues_begin(void)
3382 {
3383         unsigned int cpu;
3384
3385         spin_lock(&workqueue_lock);
3386
3387         BUG_ON(workqueue_freezing);
3388         workqueue_freezing = true;
3389
3390         for_each_gcwq_cpu(cpu) {
3391                 struct global_cwq *gcwq = get_gcwq(cpu);
3392                 struct workqueue_struct *wq;
3393
3394                 spin_lock_irq(&gcwq->lock);
3395
3396                 BUG_ON(gcwq->flags & GCWQ_FREEZING);
3397                 gcwq->flags |= GCWQ_FREEZING;
3398
3399                 list_for_each_entry(wq, &workqueues, list) {
3400                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3401
3402                         if (cwq && wq->flags & WQ_FREEZEABLE)
3403                                 cwq->max_active = 0;
3404                 }
3405
3406                 spin_unlock_irq(&gcwq->lock);
3407         }
3408
3409         spin_unlock(&workqueue_lock);
3410 }
3411
3412 /**
3413  * freeze_workqueues_busy - are freezeable workqueues still busy?
3414  *
3415  * Check whether freezing is complete.  This function must be called
3416  * between freeze_workqueues_begin() and thaw_workqueues().
3417  *
3418  * CONTEXT:
3419  * Grabs and releases workqueue_lock.
3420  *
3421  * RETURNS:
3422  * %true if some freezeable workqueues are still busy.  %false if
3423  * freezing is complete.
3424  */
3425 bool freeze_workqueues_busy(void)
3426 {
3427         unsigned int cpu;
3428         bool busy = false;
3429
3430         spin_lock(&workqueue_lock);
3431
3432         BUG_ON(!workqueue_freezing);
3433
3434         for_each_gcwq_cpu(cpu) {
3435                 struct workqueue_struct *wq;
3436                 /*
3437                  * nr_active is monotonically decreasing.  It's safe
3438                  * to peek without lock.
3439                  */
3440                 list_for_each_entry(wq, &workqueues, list) {
3441                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3442
3443                         if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3444                                 continue;
3445
3446                         BUG_ON(cwq->nr_active < 0);
3447                         if (cwq->nr_active) {
3448                                 busy = true;
3449                                 goto out_unlock;
3450                         }
3451                 }
3452         }
3453 out_unlock:
3454         spin_unlock(&workqueue_lock);
3455         return busy;
3456 }
3457
3458 /**
3459  * thaw_workqueues - thaw workqueues
3460  *
3461  * Thaw workqueues.  Normal queueing is restored and all collected
3462  * frozen works are transferred to their respective gcwq worklists.
3463  *
3464  * CONTEXT:
3465  * Grabs and releases workqueue_lock and gcwq->lock's.
3466  */
3467 void thaw_workqueues(void)
3468 {
3469         unsigned int cpu;
3470
3471         spin_lock(&workqueue_lock);
3472
3473         if (!workqueue_freezing)
3474                 goto out_unlock;
3475
3476         for_each_gcwq_cpu(cpu) {
3477                 struct global_cwq *gcwq = get_gcwq(cpu);
3478                 struct workqueue_struct *wq;
3479
3480                 spin_lock_irq(&gcwq->lock);
3481
3482                 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
3483                 gcwq->flags &= ~GCWQ_FREEZING;
3484
3485                 list_for_each_entry(wq, &workqueues, list) {
3486                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3487
3488                         if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3489                                 continue;
3490
3491                         /* restore max_active and repopulate worklist */
3492                         cwq->max_active = wq->saved_max_active;
3493
3494                         while (!list_empty(&cwq->delayed_works) &&
3495                                cwq->nr_active < cwq->max_active)
3496                                 cwq_activate_first_delayed(cwq);
3497                 }
3498
3499                 wake_up_worker(gcwq);
3500
3501                 spin_unlock_irq(&gcwq->lock);
3502         }
3503
3504         workqueue_freezing = false;
3505 out_unlock:
3506         spin_unlock(&workqueue_lock);
3507 }
3508 #endif /* CONFIG_FREEZER */
3509
3510 static int __init init_workqueues(void)
3511 {
3512         unsigned int cpu;
3513         int i;
3514
3515         hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
3516
3517         /* initialize gcwqs */
3518         for_each_gcwq_cpu(cpu) {
3519                 struct global_cwq *gcwq = get_gcwq(cpu);
3520
3521                 spin_lock_init(&gcwq->lock);
3522                 INIT_LIST_HEAD(&gcwq->worklist);
3523                 gcwq->cpu = cpu;
3524                 if (cpu == WORK_CPU_UNBOUND)
3525                         gcwq->flags |= GCWQ_DISASSOCIATED;
3526
3527                 INIT_LIST_HEAD(&gcwq->idle_list);
3528                 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
3529                         INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
3530
3531                 init_timer_deferrable(&gcwq->idle_timer);
3532                 gcwq->idle_timer.function = idle_worker_timeout;
3533                 gcwq->idle_timer.data = (unsigned long)gcwq;
3534
3535                 setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
3536                             (unsigned long)gcwq);
3537
3538                 ida_init(&gcwq->worker_ida);
3539
3540                 gcwq->trustee_state = TRUSTEE_DONE;
3541                 init_waitqueue_head(&gcwq->trustee_wait);
3542         }
3543
3544         /* create the initial worker */
3545         for_each_online_gcwq_cpu(cpu) {
3546                 struct global_cwq *gcwq = get_gcwq(cpu);
3547                 struct worker *worker;
3548
3549                 worker = create_worker(gcwq, true);
3550                 BUG_ON(!worker);
3551                 spin_lock_irq(&gcwq->lock);
3552                 start_worker(worker);
3553                 spin_unlock_irq(&gcwq->lock);
3554         }
3555
3556         system_wq = alloc_workqueue("events", 0, 0);
3557         system_long_wq = alloc_workqueue("events_long", 0, 0);
3558         system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3559         system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3560                                             WQ_UNBOUND_MAX_ACTIVE);
3561         BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
3562         return 0;
3563 }
3564 early_initcall(init_workqueues);