workqueue: s/__create_workqueue()/alloc_workqueue()/, and add system workqueues
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h>
36 #include <linux/idr.h>
37
38 #include "workqueue_sched.h"
39
40 enum {
41         /* global_cwq flags */
42         GCWQ_MANAGE_WORKERS     = 1 << 0,       /* need to manage workers */
43         GCWQ_MANAGING_WORKERS   = 1 << 1,       /* managing workers */
44         GCWQ_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
45         GCWQ_FREEZING           = 1 << 3,       /* freeze in progress */
46
47         /* worker flags */
48         WORKER_STARTED          = 1 << 0,       /* started */
49         WORKER_DIE              = 1 << 1,       /* die die die */
50         WORKER_IDLE             = 1 << 2,       /* is idle */
51         WORKER_PREP             = 1 << 3,       /* preparing to run works */
52         WORKER_ROGUE            = 1 << 4,       /* not bound to any cpu */
53         WORKER_REBIND           = 1 << 5,       /* mom is home, come back */
54
55         WORKER_NOT_RUNNING      = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND,
56
57         /* gcwq->trustee_state */
58         TRUSTEE_START           = 0,            /* start */
59         TRUSTEE_IN_CHARGE       = 1,            /* trustee in charge of gcwq */
60         TRUSTEE_BUTCHER         = 2,            /* butcher workers */
61         TRUSTEE_RELEASE         = 3,            /* release workers */
62         TRUSTEE_DONE            = 4,            /* trustee is done */
63
64         BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
65         BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
66         BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
67
68         MAX_IDLE_WORKERS_RATIO  = 4,            /* 1/4 of busy can be idle */
69         IDLE_WORKER_TIMEOUT     = 300 * HZ,     /* keep idle ones for 5 mins */
70
71         MAYDAY_INITIAL_TIMEOUT  = HZ / 100,     /* call for help after 10ms */
72         MAYDAY_INTERVAL         = HZ / 10,      /* and then every 100ms */
73         CREATE_COOLDOWN         = HZ,           /* time to breath after fail */
74         TRUSTEE_COOLDOWN        = HZ / 10,      /* for trustee draining */
75
76         /*
77          * Rescue workers are used only on emergencies and shared by
78          * all cpus.  Give -20.
79          */
80         RESCUER_NICE_LEVEL      = -20,
81 };
82
83 /*
84  * Structure fields follow one of the following exclusion rules.
85  *
86  * I: Set during initialization and read-only afterwards.
87  *
88  * P: Preemption protected.  Disabling preemption is enough and should
89  *    only be modified and accessed from the local cpu.
90  *
91  * L: gcwq->lock protected.  Access with gcwq->lock held.
92  *
93  * X: During normal operation, modification requires gcwq->lock and
94  *    should be done only from local cpu.  Either disabling preemption
95  *    on local cpu or grabbing gcwq->lock is enough for read access.
96  *    While trustee is in charge, it's identical to L.
97  *
98  * F: wq->flush_mutex protected.
99  *
100  * W: workqueue_lock protected.
101  */
102
103 struct global_cwq;
104
105 /*
106  * The poor guys doing the actual heavy lifting.  All on-duty workers
107  * are either serving the manager role, on idle list or on busy hash.
108  */
109 struct worker {
110         /* on idle list while idle, on busy hash table while busy */
111         union {
112                 struct list_head        entry;  /* L: while idle */
113                 struct hlist_node       hentry; /* L: while busy */
114         };
115
116         struct work_struct      *current_work;  /* L: work being processed */
117         struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
118         struct list_head        scheduled;      /* L: scheduled works */
119         struct task_struct      *task;          /* I: worker task */
120         struct global_cwq       *gcwq;          /* I: the associated gcwq */
121         /* 64 bytes boundary on 64bit, 32 on 32bit */
122         unsigned long           last_active;    /* L: last active timestamp */
123         unsigned int            flags;          /* X: flags */
124         int                     id;             /* I: worker id */
125         struct work_struct      rebind_work;    /* L: rebind worker to cpu */
126 };
127
128 /*
129  * Global per-cpu workqueue.  There's one and only one for each cpu
130  * and all works are queued and processed here regardless of their
131  * target workqueues.
132  */
133 struct global_cwq {
134         spinlock_t              lock;           /* the gcwq lock */
135         struct list_head        worklist;       /* L: list of pending works */
136         unsigned int            cpu;            /* I: the associated cpu */
137         unsigned int            flags;          /* L: GCWQ_* flags */
138
139         int                     nr_workers;     /* L: total number of workers */
140         int                     nr_idle;        /* L: currently idle ones */
141
142         /* workers are chained either in the idle_list or busy_hash */
143         struct list_head        idle_list;      /* X: list of idle workers */
144         struct hlist_head       busy_hash[BUSY_WORKER_HASH_SIZE];
145                                                 /* L: hash of busy workers */
146
147         struct timer_list       idle_timer;     /* L: worker idle timeout */
148         struct timer_list       mayday_timer;   /* L: SOS timer for dworkers */
149
150         struct ida              worker_ida;     /* L: for worker IDs */
151
152         struct task_struct      *trustee;       /* L: for gcwq shutdown */
153         unsigned int            trustee_state;  /* L: trustee state */
154         wait_queue_head_t       trustee_wait;   /* trustee wait */
155         struct worker           *first_idle;    /* L: first idle worker */
156 } ____cacheline_aligned_in_smp;
157
158 /*
159  * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
160  * work_struct->data are used for flags and thus cwqs need to be
161  * aligned at two's power of the number of flag bits.
162  */
163 struct cpu_workqueue_struct {
164         struct global_cwq       *gcwq;          /* I: the associated gcwq */
165         struct workqueue_struct *wq;            /* I: the owning workqueue */
166         int                     work_color;     /* L: current color */
167         int                     flush_color;    /* L: flushing color */
168         int                     nr_in_flight[WORK_NR_COLORS];
169                                                 /* L: nr of in_flight works */
170         int                     nr_active;      /* L: nr of active works */
171         int                     max_active;     /* L: max active works */
172         struct list_head        delayed_works;  /* L: delayed works */
173 };
174
175 /*
176  * Structure used to wait for workqueue flush.
177  */
178 struct wq_flusher {
179         struct list_head        list;           /* F: list of flushers */
180         int                     flush_color;    /* F: flush color waiting for */
181         struct completion       done;           /* flush completion */
182 };
183
184 /*
185  * The externally visible workqueue abstraction is an array of
186  * per-CPU workqueues:
187  */
188 struct workqueue_struct {
189         unsigned int            flags;          /* I: WQ_* flags */
190         struct cpu_workqueue_struct *cpu_wq;    /* I: cwq's */
191         struct list_head        list;           /* W: list of all workqueues */
192
193         struct mutex            flush_mutex;    /* protects wq flushing */
194         int                     work_color;     /* F: current work color */
195         int                     flush_color;    /* F: current flush color */
196         atomic_t                nr_cwqs_to_flush; /* flush in progress */
197         struct wq_flusher       *first_flusher; /* F: first flusher */
198         struct list_head        flusher_queue;  /* F: flush waiters */
199         struct list_head        flusher_overflow; /* F: flush overflow list */
200
201         unsigned long           single_cpu;     /* cpu for single cpu wq */
202
203         cpumask_var_t           mayday_mask;    /* cpus requesting rescue */
204         struct worker           *rescuer;       /* I: rescue worker */
205
206         int                     saved_max_active; /* I: saved cwq max_active */
207         const char              *name;          /* I: workqueue name */
208 #ifdef CONFIG_LOCKDEP
209         struct lockdep_map      lockdep_map;
210 #endif
211 };
212
213 struct workqueue_struct *system_wq __read_mostly;
214 struct workqueue_struct *system_long_wq __read_mostly;
215 struct workqueue_struct *system_nrt_wq __read_mostly;
216 EXPORT_SYMBOL_GPL(system_wq);
217 EXPORT_SYMBOL_GPL(system_long_wq);
218 EXPORT_SYMBOL_GPL(system_nrt_wq);
219
220 #define for_each_busy_worker(worker, i, pos, gcwq)                      \
221         for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
222                 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
223
224 #ifdef CONFIG_DEBUG_OBJECTS_WORK
225
226 static struct debug_obj_descr work_debug_descr;
227
228 /*
229  * fixup_init is called when:
230  * - an active object is initialized
231  */
232 static int work_fixup_init(void *addr, enum debug_obj_state state)
233 {
234         struct work_struct *work = addr;
235
236         switch (state) {
237         case ODEBUG_STATE_ACTIVE:
238                 cancel_work_sync(work);
239                 debug_object_init(work, &work_debug_descr);
240                 return 1;
241         default:
242                 return 0;
243         }
244 }
245
246 /*
247  * fixup_activate is called when:
248  * - an active object is activated
249  * - an unknown object is activated (might be a statically initialized object)
250  */
251 static int work_fixup_activate(void *addr, enum debug_obj_state state)
252 {
253         struct work_struct *work = addr;
254
255         switch (state) {
256
257         case ODEBUG_STATE_NOTAVAILABLE:
258                 /*
259                  * This is not really a fixup. The work struct was
260                  * statically initialized. We just make sure that it
261                  * is tracked in the object tracker.
262                  */
263                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
264                         debug_object_init(work, &work_debug_descr);
265                         debug_object_activate(work, &work_debug_descr);
266                         return 0;
267                 }
268                 WARN_ON_ONCE(1);
269                 return 0;
270
271         case ODEBUG_STATE_ACTIVE:
272                 WARN_ON(1);
273
274         default:
275                 return 0;
276         }
277 }
278
279 /*
280  * fixup_free is called when:
281  * - an active object is freed
282  */
283 static int work_fixup_free(void *addr, enum debug_obj_state state)
284 {
285         struct work_struct *work = addr;
286
287         switch (state) {
288         case ODEBUG_STATE_ACTIVE:
289                 cancel_work_sync(work);
290                 debug_object_free(work, &work_debug_descr);
291                 return 1;
292         default:
293                 return 0;
294         }
295 }
296
297 static struct debug_obj_descr work_debug_descr = {
298         .name           = "work_struct",
299         .fixup_init     = work_fixup_init,
300         .fixup_activate = work_fixup_activate,
301         .fixup_free     = work_fixup_free,
302 };
303
304 static inline void debug_work_activate(struct work_struct *work)
305 {
306         debug_object_activate(work, &work_debug_descr);
307 }
308
309 static inline void debug_work_deactivate(struct work_struct *work)
310 {
311         debug_object_deactivate(work, &work_debug_descr);
312 }
313
314 void __init_work(struct work_struct *work, int onstack)
315 {
316         if (onstack)
317                 debug_object_init_on_stack(work, &work_debug_descr);
318         else
319                 debug_object_init(work, &work_debug_descr);
320 }
321 EXPORT_SYMBOL_GPL(__init_work);
322
323 void destroy_work_on_stack(struct work_struct *work)
324 {
325         debug_object_free(work, &work_debug_descr);
326 }
327 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
328
329 #else
330 static inline void debug_work_activate(struct work_struct *work) { }
331 static inline void debug_work_deactivate(struct work_struct *work) { }
332 #endif
333
334 /* Serializes the accesses to the list of workqueues. */
335 static DEFINE_SPINLOCK(workqueue_lock);
336 static LIST_HEAD(workqueues);
337 static bool workqueue_freezing;         /* W: have wqs started freezing? */
338
339 /*
340  * The almighty global cpu workqueues.  nr_running is the only field
341  * which is expected to be used frequently by other cpus via
342  * try_to_wake_up().  Put it in a separate cacheline.
343  */
344 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
345 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
346
347 static int worker_thread(void *__worker);
348
349 static struct global_cwq *get_gcwq(unsigned int cpu)
350 {
351         return &per_cpu(global_cwq, cpu);
352 }
353
354 static atomic_t *get_gcwq_nr_running(unsigned int cpu)
355 {
356         return &per_cpu(gcwq_nr_running, cpu);
357 }
358
359 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
360                                             struct workqueue_struct *wq)
361 {
362         return per_cpu_ptr(wq->cpu_wq, cpu);
363 }
364
365 static unsigned int work_color_to_flags(int color)
366 {
367         return color << WORK_STRUCT_COLOR_SHIFT;
368 }
369
370 static int get_work_color(struct work_struct *work)
371 {
372         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
373                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
374 }
375
376 static int work_next_color(int color)
377 {
378         return (color + 1) % WORK_NR_COLORS;
379 }
380
381 /*
382  * Work data points to the cwq while a work is on queue.  Once
383  * execution starts, it points to the cpu the work was last on.  This
384  * can be distinguished by comparing the data value against
385  * PAGE_OFFSET.
386  *
387  * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
388  * cwq, cpu or clear work->data.  These functions should only be
389  * called while the work is owned - ie. while the PENDING bit is set.
390  *
391  * get_work_[g]cwq() can be used to obtain the gcwq or cwq
392  * corresponding to a work.  gcwq is available once the work has been
393  * queued anywhere after initialization.  cwq is available only from
394  * queueing until execution starts.
395  */
396 static inline void set_work_data(struct work_struct *work, unsigned long data,
397                                  unsigned long flags)
398 {
399         BUG_ON(!work_pending(work));
400         atomic_long_set(&work->data, data | flags | work_static(work));
401 }
402
403 static void set_work_cwq(struct work_struct *work,
404                          struct cpu_workqueue_struct *cwq,
405                          unsigned long extra_flags)
406 {
407         set_work_data(work, (unsigned long)cwq,
408                       WORK_STRUCT_PENDING | extra_flags);
409 }
410
411 static void set_work_cpu(struct work_struct *work, unsigned int cpu)
412 {
413         set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
414 }
415
416 static void clear_work_data(struct work_struct *work)
417 {
418         set_work_data(work, WORK_STRUCT_NO_CPU, 0);
419 }
420
421 static inline unsigned long get_work_data(struct work_struct *work)
422 {
423         return atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK;
424 }
425
426 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
427 {
428         unsigned long data = get_work_data(work);
429
430         return data >= PAGE_OFFSET ? (void *)data : NULL;
431 }
432
433 static struct global_cwq *get_work_gcwq(struct work_struct *work)
434 {
435         unsigned long data = get_work_data(work);
436         unsigned int cpu;
437
438         if (data >= PAGE_OFFSET)
439                 return ((struct cpu_workqueue_struct *)data)->gcwq;
440
441         cpu = data >> WORK_STRUCT_FLAG_BITS;
442         if (cpu == NR_CPUS)
443                 return NULL;
444
445         BUG_ON(cpu >= num_possible_cpus());
446         return get_gcwq(cpu);
447 }
448
449 /*
450  * Policy functions.  These define the policies on how the global
451  * worker pool is managed.  Unless noted otherwise, these functions
452  * assume that they're being called with gcwq->lock held.
453  */
454
455 /*
456  * Need to wake up a worker?  Called from anything but currently
457  * running workers.
458  */
459 static bool need_more_worker(struct global_cwq *gcwq)
460 {
461         atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
462
463         return !list_empty(&gcwq->worklist) && !atomic_read(nr_running);
464 }
465
466 /* Can I start working?  Called from busy but !running workers. */
467 static bool may_start_working(struct global_cwq *gcwq)
468 {
469         return gcwq->nr_idle;
470 }
471
472 /* Do I need to keep working?  Called from currently running workers. */
473 static bool keep_working(struct global_cwq *gcwq)
474 {
475         atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
476
477         return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
478 }
479
480 /* Do we need a new worker?  Called from manager. */
481 static bool need_to_create_worker(struct global_cwq *gcwq)
482 {
483         return need_more_worker(gcwq) && !may_start_working(gcwq);
484 }
485
486 /* Do I need to be the manager? */
487 static bool need_to_manage_workers(struct global_cwq *gcwq)
488 {
489         return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
490 }
491
492 /* Do we have too many workers and should some go away? */
493 static bool too_many_workers(struct global_cwq *gcwq)
494 {
495         bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
496         int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
497         int nr_busy = gcwq->nr_workers - nr_idle;
498
499         return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
500 }
501
502 /*
503  * Wake up functions.
504  */
505
506 /* Return the first worker.  Safe with preemption disabled */
507 static struct worker *first_worker(struct global_cwq *gcwq)
508 {
509         if (unlikely(list_empty(&gcwq->idle_list)))
510                 return NULL;
511
512         return list_first_entry(&gcwq->idle_list, struct worker, entry);
513 }
514
515 /**
516  * wake_up_worker - wake up an idle worker
517  * @gcwq: gcwq to wake worker for
518  *
519  * Wake up the first idle worker of @gcwq.
520  *
521  * CONTEXT:
522  * spin_lock_irq(gcwq->lock).
523  */
524 static void wake_up_worker(struct global_cwq *gcwq)
525 {
526         struct worker *worker = first_worker(gcwq);
527
528         if (likely(worker))
529                 wake_up_process(worker->task);
530 }
531
532 /**
533  * wq_worker_waking_up - a worker is waking up
534  * @task: task waking up
535  * @cpu: CPU @task is waking up to
536  *
537  * This function is called during try_to_wake_up() when a worker is
538  * being awoken.
539  *
540  * CONTEXT:
541  * spin_lock_irq(rq->lock)
542  */
543 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
544 {
545         struct worker *worker = kthread_data(task);
546
547         if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
548                 atomic_inc(get_gcwq_nr_running(cpu));
549 }
550
551 /**
552  * wq_worker_sleeping - a worker is going to sleep
553  * @task: task going to sleep
554  * @cpu: CPU in question, must be the current CPU number
555  *
556  * This function is called during schedule() when a busy worker is
557  * going to sleep.  Worker on the same cpu can be woken up by
558  * returning pointer to its task.
559  *
560  * CONTEXT:
561  * spin_lock_irq(rq->lock)
562  *
563  * RETURNS:
564  * Worker task on @cpu to wake up, %NULL if none.
565  */
566 struct task_struct *wq_worker_sleeping(struct task_struct *task,
567                                        unsigned int cpu)
568 {
569         struct worker *worker = kthread_data(task), *to_wakeup = NULL;
570         struct global_cwq *gcwq = get_gcwq(cpu);
571         atomic_t *nr_running = get_gcwq_nr_running(cpu);
572
573         if (unlikely(worker->flags & WORKER_NOT_RUNNING))
574                 return NULL;
575
576         /* this can only happen on the local cpu */
577         BUG_ON(cpu != raw_smp_processor_id());
578
579         /*
580          * The counterpart of the following dec_and_test, implied mb,
581          * worklist not empty test sequence is in insert_work().
582          * Please read comment there.
583          *
584          * NOT_RUNNING is clear.  This means that trustee is not in
585          * charge and we're running on the local cpu w/ rq lock held
586          * and preemption disabled, which in turn means that none else
587          * could be manipulating idle_list, so dereferencing idle_list
588          * without gcwq lock is safe.
589          */
590         if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
591                 to_wakeup = first_worker(gcwq);
592         return to_wakeup ? to_wakeup->task : NULL;
593 }
594
595 /**
596  * worker_set_flags - set worker flags and adjust nr_running accordingly
597  * @worker: worker to set flags for
598  * @flags: flags to set
599  * @wakeup: wakeup an idle worker if necessary
600  *
601  * Set @flags in @worker->flags and adjust nr_running accordingly.  If
602  * nr_running becomes zero and @wakeup is %true, an idle worker is
603  * woken up.
604  *
605  * LOCKING:
606  * spin_lock_irq(gcwq->lock).
607  */
608 static inline void worker_set_flags(struct worker *worker, unsigned int flags,
609                                     bool wakeup)
610 {
611         struct global_cwq *gcwq = worker->gcwq;
612
613         /*
614          * If transitioning into NOT_RUNNING, adjust nr_running and
615          * wake up an idle worker as necessary if requested by
616          * @wakeup.
617          */
618         if ((flags & WORKER_NOT_RUNNING) &&
619             !(worker->flags & WORKER_NOT_RUNNING)) {
620                 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
621
622                 if (wakeup) {
623                         if (atomic_dec_and_test(nr_running) &&
624                             !list_empty(&gcwq->worklist))
625                                 wake_up_worker(gcwq);
626                 } else
627                         atomic_dec(nr_running);
628         }
629
630         worker->flags |= flags;
631 }
632
633 /**
634  * worker_clr_flags - clear worker flags and adjust nr_running accordingly
635  * @worker: worker to set flags for
636  * @flags: flags to clear
637  *
638  * Clear @flags in @worker->flags and adjust nr_running accordingly.
639  *
640  * LOCKING:
641  * spin_lock_irq(gcwq->lock).
642  */
643 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
644 {
645         struct global_cwq *gcwq = worker->gcwq;
646         unsigned int oflags = worker->flags;
647
648         worker->flags &= ~flags;
649
650         /* if transitioning out of NOT_RUNNING, increment nr_running */
651         if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
652                 if (!(worker->flags & WORKER_NOT_RUNNING))
653                         atomic_inc(get_gcwq_nr_running(gcwq->cpu));
654 }
655
656 /**
657  * busy_worker_head - return the busy hash head for a work
658  * @gcwq: gcwq of interest
659  * @work: work to be hashed
660  *
661  * Return hash head of @gcwq for @work.
662  *
663  * CONTEXT:
664  * spin_lock_irq(gcwq->lock).
665  *
666  * RETURNS:
667  * Pointer to the hash head.
668  */
669 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
670                                            struct work_struct *work)
671 {
672         const int base_shift = ilog2(sizeof(struct work_struct));
673         unsigned long v = (unsigned long)work;
674
675         /* simple shift and fold hash, do we need something better? */
676         v >>= base_shift;
677         v += v >> BUSY_WORKER_HASH_ORDER;
678         v &= BUSY_WORKER_HASH_MASK;
679
680         return &gcwq->busy_hash[v];
681 }
682
683 /**
684  * __find_worker_executing_work - find worker which is executing a work
685  * @gcwq: gcwq of interest
686  * @bwh: hash head as returned by busy_worker_head()
687  * @work: work to find worker for
688  *
689  * Find a worker which is executing @work on @gcwq.  @bwh should be
690  * the hash head obtained by calling busy_worker_head() with the same
691  * work.
692  *
693  * CONTEXT:
694  * spin_lock_irq(gcwq->lock).
695  *
696  * RETURNS:
697  * Pointer to worker which is executing @work if found, NULL
698  * otherwise.
699  */
700 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
701                                                    struct hlist_head *bwh,
702                                                    struct work_struct *work)
703 {
704         struct worker *worker;
705         struct hlist_node *tmp;
706
707         hlist_for_each_entry(worker, tmp, bwh, hentry)
708                 if (worker->current_work == work)
709                         return worker;
710         return NULL;
711 }
712
713 /**
714  * find_worker_executing_work - find worker which is executing a work
715  * @gcwq: gcwq of interest
716  * @work: work to find worker for
717  *
718  * Find a worker which is executing @work on @gcwq.  This function is
719  * identical to __find_worker_executing_work() except that this
720  * function calculates @bwh itself.
721  *
722  * CONTEXT:
723  * spin_lock_irq(gcwq->lock).
724  *
725  * RETURNS:
726  * Pointer to worker which is executing @work if found, NULL
727  * otherwise.
728  */
729 static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
730                                                  struct work_struct *work)
731 {
732         return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
733                                             work);
734 }
735
736 /**
737  * insert_work - insert a work into gcwq
738  * @cwq: cwq @work belongs to
739  * @work: work to insert
740  * @head: insertion point
741  * @extra_flags: extra WORK_STRUCT_* flags to set
742  *
743  * Insert @work which belongs to @cwq into @gcwq after @head.
744  * @extra_flags is or'd to work_struct flags.
745  *
746  * CONTEXT:
747  * spin_lock_irq(gcwq->lock).
748  */
749 static void insert_work(struct cpu_workqueue_struct *cwq,
750                         struct work_struct *work, struct list_head *head,
751                         unsigned int extra_flags)
752 {
753         struct global_cwq *gcwq = cwq->gcwq;
754
755         /* we own @work, set data and link */
756         set_work_cwq(work, cwq, extra_flags);
757
758         /*
759          * Ensure that we get the right work->data if we see the
760          * result of list_add() below, see try_to_grab_pending().
761          */
762         smp_wmb();
763
764         list_add_tail(&work->entry, head);
765
766         /*
767          * Ensure either worker_sched_deactivated() sees the above
768          * list_add_tail() or we see zero nr_running to avoid workers
769          * lying around lazily while there are works to be processed.
770          */
771         smp_mb();
772
773         if (!atomic_read(get_gcwq_nr_running(gcwq->cpu)))
774                 wake_up_worker(gcwq);
775 }
776
777 /**
778  * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
779  * @cwq: cwq to unbind
780  *
781  * Try to unbind @cwq from single cpu workqueue processing.  If
782  * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
783  *
784  * CONTEXT:
785  * spin_lock_irq(gcwq->lock).
786  */
787 static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
788 {
789         struct workqueue_struct *wq = cwq->wq;
790         struct global_cwq *gcwq = cwq->gcwq;
791
792         BUG_ON(wq->single_cpu != gcwq->cpu);
793         /*
794          * Unbind from workqueue if @cwq is not frozen.  If frozen,
795          * thaw_workqueues() will either restart processing on this
796          * cpu or unbind if empty.  This keeps works queued while
797          * frozen fully ordered and flushable.
798          */
799         if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
800                 smp_wmb();      /* paired with cmpxchg() in __queue_work() */
801                 wq->single_cpu = NR_CPUS;
802         }
803 }
804
805 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
806                          struct work_struct *work)
807 {
808         struct global_cwq *gcwq;
809         struct cpu_workqueue_struct *cwq;
810         struct list_head *worklist;
811         unsigned long flags;
812         bool arbitrate;
813
814         debug_work_activate(work);
815
816         /*
817          * Determine gcwq to use.  SINGLE_CPU is inherently
818          * NON_REENTRANT, so test it first.
819          */
820         if (!(wq->flags & WQ_SINGLE_CPU)) {
821                 struct global_cwq *last_gcwq;
822
823                 /*
824                  * It's multi cpu.  If @wq is non-reentrant and @work
825                  * was previously on a different cpu, it might still
826                  * be running there, in which case the work needs to
827                  * be queued on that cpu to guarantee non-reentrance.
828                  */
829                 gcwq = get_gcwq(cpu);
830                 if (wq->flags & WQ_NON_REENTRANT &&
831                     (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
832                         struct worker *worker;
833
834                         spin_lock_irqsave(&last_gcwq->lock, flags);
835
836                         worker = find_worker_executing_work(last_gcwq, work);
837
838                         if (worker && worker->current_cwq->wq == wq)
839                                 gcwq = last_gcwq;
840                         else {
841                                 /* meh... not running there, queue here */
842                                 spin_unlock_irqrestore(&last_gcwq->lock, flags);
843                                 spin_lock_irqsave(&gcwq->lock, flags);
844                         }
845                 } else
846                         spin_lock_irqsave(&gcwq->lock, flags);
847         } else {
848                 unsigned int req_cpu = cpu;
849
850                 /*
851                  * It's a bit more complex for single cpu workqueues.
852                  * We first need to determine which cpu is going to be
853                  * used.  If no cpu is currently serving this
854                  * workqueue, arbitrate using atomic accesses to
855                  * wq->single_cpu; otherwise, use the current one.
856                  */
857         retry:
858                 cpu = wq->single_cpu;
859                 arbitrate = cpu == NR_CPUS;
860                 if (arbitrate)
861                         cpu = req_cpu;
862
863                 gcwq = get_gcwq(cpu);
864                 spin_lock_irqsave(&gcwq->lock, flags);
865
866                 /*
867                  * The following cmpxchg() is a full barrier paired
868                  * with smp_wmb() in cwq_unbind_single_cpu() and
869                  * guarantees that all changes to wq->st_* fields are
870                  * visible on the new cpu after this point.
871                  */
872                 if (arbitrate)
873                         cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
874
875                 if (unlikely(wq->single_cpu != cpu)) {
876                         spin_unlock_irqrestore(&gcwq->lock, flags);
877                         goto retry;
878                 }
879         }
880
881         /* gcwq determined, get cwq and queue */
882         cwq = get_cwq(gcwq->cpu, wq);
883
884         BUG_ON(!list_empty(&work->entry));
885
886         cwq->nr_in_flight[cwq->work_color]++;
887
888         if (likely(cwq->nr_active < cwq->max_active)) {
889                 cwq->nr_active++;
890                 worklist = &gcwq->worklist;
891         } else
892                 worklist = &cwq->delayed_works;
893
894         insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
895
896         spin_unlock_irqrestore(&gcwq->lock, flags);
897 }
898
899 /**
900  * queue_work - queue work on a workqueue
901  * @wq: workqueue to use
902  * @work: work to queue
903  *
904  * Returns 0 if @work was already on a queue, non-zero otherwise.
905  *
906  * We queue the work to the CPU on which it was submitted, but if the CPU dies
907  * it can be processed by another CPU.
908  */
909 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
910 {
911         int ret;
912
913         ret = queue_work_on(get_cpu(), wq, work);
914         put_cpu();
915
916         return ret;
917 }
918 EXPORT_SYMBOL_GPL(queue_work);
919
920 /**
921  * queue_work_on - queue work on specific cpu
922  * @cpu: CPU number to execute work on
923  * @wq: workqueue to use
924  * @work: work to queue
925  *
926  * Returns 0 if @work was already on a queue, non-zero otherwise.
927  *
928  * We queue the work to a specific CPU, the caller must ensure it
929  * can't go away.
930  */
931 int
932 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
933 {
934         int ret = 0;
935
936         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
937                 __queue_work(cpu, wq, work);
938                 ret = 1;
939         }
940         return ret;
941 }
942 EXPORT_SYMBOL_GPL(queue_work_on);
943
944 static void delayed_work_timer_fn(unsigned long __data)
945 {
946         struct delayed_work *dwork = (struct delayed_work *)__data;
947         struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
948
949         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
950 }
951
952 /**
953  * queue_delayed_work - queue work on a workqueue after delay
954  * @wq: workqueue to use
955  * @dwork: delayable work to queue
956  * @delay: number of jiffies to wait before queueing
957  *
958  * Returns 0 if @work was already on a queue, non-zero otherwise.
959  */
960 int queue_delayed_work(struct workqueue_struct *wq,
961                         struct delayed_work *dwork, unsigned long delay)
962 {
963         if (delay == 0)
964                 return queue_work(wq, &dwork->work);
965
966         return queue_delayed_work_on(-1, wq, dwork, delay);
967 }
968 EXPORT_SYMBOL_GPL(queue_delayed_work);
969
970 /**
971  * queue_delayed_work_on - queue work on specific CPU after delay
972  * @cpu: CPU number to execute work on
973  * @wq: workqueue to use
974  * @dwork: work to queue
975  * @delay: number of jiffies to wait before queueing
976  *
977  * Returns 0 if @work was already on a queue, non-zero otherwise.
978  */
979 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
980                         struct delayed_work *dwork, unsigned long delay)
981 {
982         int ret = 0;
983         struct timer_list *timer = &dwork->timer;
984         struct work_struct *work = &dwork->work;
985
986         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
987                 struct global_cwq *gcwq = get_work_gcwq(work);
988                 unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();
989
990                 BUG_ON(timer_pending(timer));
991                 BUG_ON(!list_empty(&work->entry));
992
993                 timer_stats_timer_set_start_info(&dwork->timer);
994                 /*
995                  * This stores cwq for the moment, for the timer_fn.
996                  * Note that the work's gcwq is preserved to allow
997                  * reentrance detection for delayed works.
998                  */
999                 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1000                 timer->expires = jiffies + delay;
1001                 timer->data = (unsigned long)dwork;
1002                 timer->function = delayed_work_timer_fn;
1003
1004                 if (unlikely(cpu >= 0))
1005                         add_timer_on(timer, cpu);
1006                 else
1007                         add_timer(timer);
1008                 ret = 1;
1009         }
1010         return ret;
1011 }
1012 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1013
1014 /**
1015  * worker_enter_idle - enter idle state
1016  * @worker: worker which is entering idle state
1017  *
1018  * @worker is entering idle state.  Update stats and idle timer if
1019  * necessary.
1020  *
1021  * LOCKING:
1022  * spin_lock_irq(gcwq->lock).
1023  */
1024 static void worker_enter_idle(struct worker *worker)
1025 {
1026         struct global_cwq *gcwq = worker->gcwq;
1027
1028         BUG_ON(worker->flags & WORKER_IDLE);
1029         BUG_ON(!list_empty(&worker->entry) &&
1030                (worker->hentry.next || worker->hentry.pprev));
1031
1032         worker_set_flags(worker, WORKER_IDLE, false);
1033         gcwq->nr_idle++;
1034         worker->last_active = jiffies;
1035
1036         /* idle_list is LIFO */
1037         list_add(&worker->entry, &gcwq->idle_list);
1038
1039         if (likely(!(worker->flags & WORKER_ROGUE))) {
1040                 if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
1041                         mod_timer(&gcwq->idle_timer,
1042                                   jiffies + IDLE_WORKER_TIMEOUT);
1043         } else
1044                 wake_up_all(&gcwq->trustee_wait);
1045 }
1046
1047 /**
1048  * worker_leave_idle - leave idle state
1049  * @worker: worker which is leaving idle state
1050  *
1051  * @worker is leaving idle state.  Update stats.
1052  *
1053  * LOCKING:
1054  * spin_lock_irq(gcwq->lock).
1055  */
1056 static void worker_leave_idle(struct worker *worker)
1057 {
1058         struct global_cwq *gcwq = worker->gcwq;
1059
1060         BUG_ON(!(worker->flags & WORKER_IDLE));
1061         worker_clr_flags(worker, WORKER_IDLE);
1062         gcwq->nr_idle--;
1063         list_del_init(&worker->entry);
1064 }
1065
1066 /**
1067  * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1068  * @worker: self
1069  *
1070  * Works which are scheduled while the cpu is online must at least be
1071  * scheduled to a worker which is bound to the cpu so that if they are
1072  * flushed from cpu callbacks while cpu is going down, they are
1073  * guaranteed to execute on the cpu.
1074  *
1075  * This function is to be used by rogue workers and rescuers to bind
1076  * themselves to the target cpu and may race with cpu going down or
1077  * coming online.  kthread_bind() can't be used because it may put the
1078  * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1079  * verbatim as it's best effort and blocking and gcwq may be
1080  * [dis]associated in the meantime.
1081  *
1082  * This function tries set_cpus_allowed() and locks gcwq and verifies
1083  * the binding against GCWQ_DISASSOCIATED which is set during
1084  * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1085  * idle state or fetches works without dropping lock, it can guarantee
1086  * the scheduling requirement described in the first paragraph.
1087  *
1088  * CONTEXT:
1089  * Might sleep.  Called without any lock but returns with gcwq->lock
1090  * held.
1091  *
1092  * RETURNS:
1093  * %true if the associated gcwq is online (@worker is successfully
1094  * bound), %false if offline.
1095  */
1096 static bool worker_maybe_bind_and_lock(struct worker *worker)
1097 {
1098         struct global_cwq *gcwq = worker->gcwq;
1099         struct task_struct *task = worker->task;
1100
1101         while (true) {
1102                 /*
1103                  * The following call may fail, succeed or succeed
1104                  * without actually migrating the task to the cpu if
1105                  * it races with cpu hotunplug operation.  Verify
1106                  * against GCWQ_DISASSOCIATED.
1107                  */
1108                 set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
1109
1110                 spin_lock_irq(&gcwq->lock);
1111                 if (gcwq->flags & GCWQ_DISASSOCIATED)
1112                         return false;
1113                 if (task_cpu(task) == gcwq->cpu &&
1114                     cpumask_equal(&current->cpus_allowed,
1115                                   get_cpu_mask(gcwq->cpu)))
1116                         return true;
1117                 spin_unlock_irq(&gcwq->lock);
1118
1119                 /* CPU has come up inbetween, retry migration */
1120                 cpu_relax();
1121         }
1122 }
1123
1124 /*
1125  * Function for worker->rebind_work used to rebind rogue busy workers
1126  * to the associated cpu which is coming back online.  This is
1127  * scheduled by cpu up but can race with other cpu hotplug operations
1128  * and may be executed twice without intervening cpu down.
1129  */
1130 static void worker_rebind_fn(struct work_struct *work)
1131 {
1132         struct worker *worker = container_of(work, struct worker, rebind_work);
1133         struct global_cwq *gcwq = worker->gcwq;
1134
1135         if (worker_maybe_bind_and_lock(worker))
1136                 worker_clr_flags(worker, WORKER_REBIND);
1137
1138         spin_unlock_irq(&gcwq->lock);
1139 }
1140
1141 static struct worker *alloc_worker(void)
1142 {
1143         struct worker *worker;
1144
1145         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
1146         if (worker) {
1147                 INIT_LIST_HEAD(&worker->entry);
1148                 INIT_LIST_HEAD(&worker->scheduled);
1149                 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1150                 /* on creation a worker is in !idle && prep state */
1151                 worker->flags = WORKER_PREP;
1152         }
1153         return worker;
1154 }
1155
1156 /**
1157  * create_worker - create a new workqueue worker
1158  * @gcwq: gcwq the new worker will belong to
1159  * @bind: whether to set affinity to @cpu or not
1160  *
1161  * Create a new worker which is bound to @gcwq.  The returned worker
1162  * can be started by calling start_worker() or destroyed using
1163  * destroy_worker().
1164  *
1165  * CONTEXT:
1166  * Might sleep.  Does GFP_KERNEL allocations.
1167  *
1168  * RETURNS:
1169  * Pointer to the newly created worker.
1170  */
1171 static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
1172 {
1173         int id = -1;
1174         struct worker *worker = NULL;
1175
1176         spin_lock_irq(&gcwq->lock);
1177         while (ida_get_new(&gcwq->worker_ida, &id)) {
1178                 spin_unlock_irq(&gcwq->lock);
1179                 if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
1180                         goto fail;
1181                 spin_lock_irq(&gcwq->lock);
1182         }
1183         spin_unlock_irq(&gcwq->lock);
1184
1185         worker = alloc_worker();
1186         if (!worker)
1187                 goto fail;
1188
1189         worker->gcwq = gcwq;
1190         worker->id = id;
1191
1192         worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
1193                                       gcwq->cpu, id);
1194         if (IS_ERR(worker->task))
1195                 goto fail;
1196
1197         /*
1198          * A rogue worker will become a regular one if CPU comes
1199          * online later on.  Make sure every worker has
1200          * PF_THREAD_BOUND set.
1201          */
1202         if (bind)
1203                 kthread_bind(worker->task, gcwq->cpu);
1204         else
1205                 worker->task->flags |= PF_THREAD_BOUND;
1206
1207         return worker;
1208 fail:
1209         if (id >= 0) {
1210                 spin_lock_irq(&gcwq->lock);
1211                 ida_remove(&gcwq->worker_ida, id);
1212                 spin_unlock_irq(&gcwq->lock);
1213         }
1214         kfree(worker);
1215         return NULL;
1216 }
1217
1218 /**
1219  * start_worker - start a newly created worker
1220  * @worker: worker to start
1221  *
1222  * Make the gcwq aware of @worker and start it.
1223  *
1224  * CONTEXT:
1225  * spin_lock_irq(gcwq->lock).
1226  */
1227 static void start_worker(struct worker *worker)
1228 {
1229         worker_set_flags(worker, WORKER_STARTED, false);
1230         worker->gcwq->nr_workers++;
1231         worker_enter_idle(worker);
1232         wake_up_process(worker->task);
1233 }
1234
1235 /**
1236  * destroy_worker - destroy a workqueue worker
1237  * @worker: worker to be destroyed
1238  *
1239  * Destroy @worker and adjust @gcwq stats accordingly.
1240  *
1241  * CONTEXT:
1242  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1243  */
1244 static void destroy_worker(struct worker *worker)
1245 {
1246         struct global_cwq *gcwq = worker->gcwq;
1247         int id = worker->id;
1248
1249         /* sanity check frenzy */
1250         BUG_ON(worker->current_work);
1251         BUG_ON(!list_empty(&worker->scheduled));
1252
1253         if (worker->flags & WORKER_STARTED)
1254                 gcwq->nr_workers--;
1255         if (worker->flags & WORKER_IDLE)
1256                 gcwq->nr_idle--;
1257
1258         list_del_init(&worker->entry);
1259         worker_set_flags(worker, WORKER_DIE, false);
1260
1261         spin_unlock_irq(&gcwq->lock);
1262
1263         kthread_stop(worker->task);
1264         kfree(worker);
1265
1266         spin_lock_irq(&gcwq->lock);
1267         ida_remove(&gcwq->worker_ida, id);
1268 }
1269
1270 static void idle_worker_timeout(unsigned long __gcwq)
1271 {
1272         struct global_cwq *gcwq = (void *)__gcwq;
1273
1274         spin_lock_irq(&gcwq->lock);
1275
1276         if (too_many_workers(gcwq)) {
1277                 struct worker *worker;
1278                 unsigned long expires;
1279
1280                 /* idle_list is kept in LIFO order, check the last one */
1281                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1282                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1283
1284                 if (time_before(jiffies, expires))
1285                         mod_timer(&gcwq->idle_timer, expires);
1286                 else {
1287                         /* it's been idle for too long, wake up manager */
1288                         gcwq->flags |= GCWQ_MANAGE_WORKERS;
1289                         wake_up_worker(gcwq);
1290                 }
1291         }
1292
1293         spin_unlock_irq(&gcwq->lock);
1294 }
1295
1296 static bool send_mayday(struct work_struct *work)
1297 {
1298         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1299         struct workqueue_struct *wq = cwq->wq;
1300
1301         if (!(wq->flags & WQ_RESCUER))
1302                 return false;
1303
1304         /* mayday mayday mayday */
1305         if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
1306                 wake_up_process(wq->rescuer->task);
1307         return true;
1308 }
1309
1310 static void gcwq_mayday_timeout(unsigned long __gcwq)
1311 {
1312         struct global_cwq *gcwq = (void *)__gcwq;
1313         struct work_struct *work;
1314
1315         spin_lock_irq(&gcwq->lock);
1316
1317         if (need_to_create_worker(gcwq)) {
1318                 /*
1319                  * We've been trying to create a new worker but
1320                  * haven't been successful.  We might be hitting an
1321                  * allocation deadlock.  Send distress signals to
1322                  * rescuers.
1323                  */
1324                 list_for_each_entry(work, &gcwq->worklist, entry)
1325                         send_mayday(work);
1326         }
1327
1328         spin_unlock_irq(&gcwq->lock);
1329
1330         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
1331 }
1332
1333 /**
1334  * maybe_create_worker - create a new worker if necessary
1335  * @gcwq: gcwq to create a new worker for
1336  *
1337  * Create a new worker for @gcwq if necessary.  @gcwq is guaranteed to
1338  * have at least one idle worker on return from this function.  If
1339  * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1340  * sent to all rescuers with works scheduled on @gcwq to resolve
1341  * possible allocation deadlock.
1342  *
1343  * On return, need_to_create_worker() is guaranteed to be false and
1344  * may_start_working() true.
1345  *
1346  * LOCKING:
1347  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1348  * multiple times.  Does GFP_KERNEL allocations.  Called only from
1349  * manager.
1350  *
1351  * RETURNS:
1352  * false if no action was taken and gcwq->lock stayed locked, true
1353  * otherwise.
1354  */
1355 static bool maybe_create_worker(struct global_cwq *gcwq)
1356 {
1357         if (!need_to_create_worker(gcwq))
1358                 return false;
1359 restart:
1360         /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1361         mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1362
1363         while (true) {
1364                 struct worker *worker;
1365
1366                 spin_unlock_irq(&gcwq->lock);
1367
1368                 worker = create_worker(gcwq, true);
1369                 if (worker) {
1370                         del_timer_sync(&gcwq->mayday_timer);
1371                         spin_lock_irq(&gcwq->lock);
1372                         start_worker(worker);
1373                         BUG_ON(need_to_create_worker(gcwq));
1374                         return true;
1375                 }
1376
1377                 if (!need_to_create_worker(gcwq))
1378                         break;
1379
1380                 spin_unlock_irq(&gcwq->lock);
1381                 __set_current_state(TASK_INTERRUPTIBLE);
1382                 schedule_timeout(CREATE_COOLDOWN);
1383                 spin_lock_irq(&gcwq->lock);
1384                 if (!need_to_create_worker(gcwq))
1385                         break;
1386         }
1387
1388         spin_unlock_irq(&gcwq->lock);
1389         del_timer_sync(&gcwq->mayday_timer);
1390         spin_lock_irq(&gcwq->lock);
1391         if (need_to_create_worker(gcwq))
1392                 goto restart;
1393         return true;
1394 }
1395
1396 /**
1397  * maybe_destroy_worker - destroy workers which have been idle for a while
1398  * @gcwq: gcwq to destroy workers for
1399  *
1400  * Destroy @gcwq workers which have been idle for longer than
1401  * IDLE_WORKER_TIMEOUT.
1402  *
1403  * LOCKING:
1404  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1405  * multiple times.  Called only from manager.
1406  *
1407  * RETURNS:
1408  * false if no action was taken and gcwq->lock stayed locked, true
1409  * otherwise.
1410  */
1411 static bool maybe_destroy_workers(struct global_cwq *gcwq)
1412 {
1413         bool ret = false;
1414
1415         while (too_many_workers(gcwq)) {
1416                 struct worker *worker;
1417                 unsigned long expires;
1418
1419                 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1420                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1421
1422                 if (time_before(jiffies, expires)) {
1423                         mod_timer(&gcwq->idle_timer, expires);
1424                         break;
1425                 }
1426
1427                 destroy_worker(worker);
1428                 ret = true;
1429         }
1430
1431         return ret;
1432 }
1433
1434 /**
1435  * manage_workers - manage worker pool
1436  * @worker: self
1437  *
1438  * Assume the manager role and manage gcwq worker pool @worker belongs
1439  * to.  At any given time, there can be only zero or one manager per
1440  * gcwq.  The exclusion is handled automatically by this function.
1441  *
1442  * The caller can safely start processing works on false return.  On
1443  * true return, it's guaranteed that need_to_create_worker() is false
1444  * and may_start_working() is true.
1445  *
1446  * CONTEXT:
1447  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1448  * multiple times.  Does GFP_KERNEL allocations.
1449  *
1450  * RETURNS:
1451  * false if no action was taken and gcwq->lock stayed locked, true if
1452  * some action was taken.
1453  */
1454 static bool manage_workers(struct worker *worker)
1455 {
1456         struct global_cwq *gcwq = worker->gcwq;
1457         bool ret = false;
1458
1459         if (gcwq->flags & GCWQ_MANAGING_WORKERS)
1460                 return ret;
1461
1462         gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
1463         gcwq->flags |= GCWQ_MANAGING_WORKERS;
1464
1465         /*
1466          * Destroy and then create so that may_start_working() is true
1467          * on return.
1468          */
1469         ret |= maybe_destroy_workers(gcwq);
1470         ret |= maybe_create_worker(gcwq);
1471
1472         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
1473
1474         /*
1475          * The trustee might be waiting to take over the manager
1476          * position, tell it we're done.
1477          */
1478         if (unlikely(gcwq->trustee))
1479                 wake_up_all(&gcwq->trustee_wait);
1480
1481         return ret;
1482 }
1483
1484 /**
1485  * move_linked_works - move linked works to a list
1486  * @work: start of series of works to be scheduled
1487  * @head: target list to append @work to
1488  * @nextp: out paramter for nested worklist walking
1489  *
1490  * Schedule linked works starting from @work to @head.  Work series to
1491  * be scheduled starts at @work and includes any consecutive work with
1492  * WORK_STRUCT_LINKED set in its predecessor.
1493  *
1494  * If @nextp is not NULL, it's updated to point to the next work of
1495  * the last scheduled work.  This allows move_linked_works() to be
1496  * nested inside outer list_for_each_entry_safe().
1497  *
1498  * CONTEXT:
1499  * spin_lock_irq(gcwq->lock).
1500  */
1501 static void move_linked_works(struct work_struct *work, struct list_head *head,
1502                               struct work_struct **nextp)
1503 {
1504         struct work_struct *n;
1505
1506         /*
1507          * Linked worklist will always end before the end of the list,
1508          * use NULL for list head.
1509          */
1510         list_for_each_entry_safe_from(work, n, NULL, entry) {
1511                 list_move_tail(&work->entry, head);
1512                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1513                         break;
1514         }
1515
1516         /*
1517          * If we're already inside safe list traversal and have moved
1518          * multiple works to the scheduled queue, the next position
1519          * needs to be updated.
1520          */
1521         if (nextp)
1522                 *nextp = n;
1523 }
1524
1525 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1526 {
1527         struct work_struct *work = list_first_entry(&cwq->delayed_works,
1528                                                     struct work_struct, entry);
1529
1530         move_linked_works(work, &cwq->gcwq->worklist, NULL);
1531         cwq->nr_active++;
1532 }
1533
1534 /**
1535  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1536  * @cwq: cwq of interest
1537  * @color: color of work which left the queue
1538  *
1539  * A work either has completed or is removed from pending queue,
1540  * decrement nr_in_flight of its cwq and handle workqueue flushing.
1541  *
1542  * CONTEXT:
1543  * spin_lock_irq(gcwq->lock).
1544  */
1545 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1546 {
1547         /* ignore uncolored works */
1548         if (color == WORK_NO_COLOR)
1549                 return;
1550
1551         cwq->nr_in_flight[color]--;
1552         cwq->nr_active--;
1553
1554         if (!list_empty(&cwq->delayed_works)) {
1555                 /* one down, submit a delayed one */
1556                 if (cwq->nr_active < cwq->max_active)
1557                         cwq_activate_first_delayed(cwq);
1558         } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
1559                 /* this was the last work, unbind from single cpu */
1560                 cwq_unbind_single_cpu(cwq);
1561         }
1562
1563         /* is flush in progress and are we at the flushing tip? */
1564         if (likely(cwq->flush_color != color))
1565                 return;
1566
1567         /* are there still in-flight works? */
1568         if (cwq->nr_in_flight[color])
1569                 return;
1570
1571         /* this cwq is done, clear flush_color */
1572         cwq->flush_color = -1;
1573
1574         /*
1575          * If this was the last cwq, wake up the first flusher.  It
1576          * will handle the rest.
1577          */
1578         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1579                 complete(&cwq->wq->first_flusher->done);
1580 }
1581
1582 /**
1583  * process_one_work - process single work
1584  * @worker: self
1585  * @work: work to process
1586  *
1587  * Process @work.  This function contains all the logics necessary to
1588  * process a single work including synchronization against and
1589  * interaction with other workers on the same cpu, queueing and
1590  * flushing.  As long as context requirement is met, any worker can
1591  * call this function to process a work.
1592  *
1593  * CONTEXT:
1594  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1595  */
1596 static void process_one_work(struct worker *worker, struct work_struct *work)
1597 {
1598         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1599         struct global_cwq *gcwq = cwq->gcwq;
1600         struct hlist_head *bwh = busy_worker_head(gcwq, work);
1601         work_func_t f = work->func;
1602         int work_color;
1603         struct worker *collision;
1604 #ifdef CONFIG_LOCKDEP
1605         /*
1606          * It is permissible to free the struct work_struct from
1607          * inside the function that is called from it, this we need to
1608          * take into account for lockdep too.  To avoid bogus "held
1609          * lock freed" warnings as well as problems when looking into
1610          * work->lockdep_map, make a copy and use that here.
1611          */
1612         struct lockdep_map lockdep_map = work->lockdep_map;
1613 #endif
1614         /*
1615          * A single work shouldn't be executed concurrently by
1616          * multiple workers on a single cpu.  Check whether anyone is
1617          * already processing the work.  If so, defer the work to the
1618          * currently executing one.
1619          */
1620         collision = __find_worker_executing_work(gcwq, bwh, work);
1621         if (unlikely(collision)) {
1622                 move_linked_works(work, &collision->scheduled, NULL);
1623                 return;
1624         }
1625
1626         /* claim and process */
1627         debug_work_deactivate(work);
1628         hlist_add_head(&worker->hentry, bwh);
1629         worker->current_work = work;
1630         worker->current_cwq = cwq;
1631         work_color = get_work_color(work);
1632
1633         /* record the current cpu number in the work data and dequeue */
1634         set_work_cpu(work, gcwq->cpu);
1635         list_del_init(&work->entry);
1636
1637         spin_unlock_irq(&gcwq->lock);
1638
1639         work_clear_pending(work);
1640         lock_map_acquire(&cwq->wq->lockdep_map);
1641         lock_map_acquire(&lockdep_map);
1642         f(work);
1643         lock_map_release(&lockdep_map);
1644         lock_map_release(&cwq->wq->lockdep_map);
1645
1646         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
1647                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
1648                        "%s/0x%08x/%d\n",
1649                        current->comm, preempt_count(), task_pid_nr(current));
1650                 printk(KERN_ERR "    last function: ");
1651                 print_symbol("%s\n", (unsigned long)f);
1652                 debug_show_held_locks(current);
1653                 dump_stack();
1654         }
1655
1656         spin_lock_irq(&gcwq->lock);
1657
1658         /* we're done with it, release */
1659         hlist_del_init(&worker->hentry);
1660         worker->current_work = NULL;
1661         worker->current_cwq = NULL;
1662         cwq_dec_nr_in_flight(cwq, work_color);
1663 }
1664
1665 /**
1666  * process_scheduled_works - process scheduled works
1667  * @worker: self
1668  *
1669  * Process all scheduled works.  Please note that the scheduled list
1670  * may change while processing a work, so this function repeatedly
1671  * fetches a work from the top and executes it.
1672  *
1673  * CONTEXT:
1674  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1675  * multiple times.
1676  */
1677 static void process_scheduled_works(struct worker *worker)
1678 {
1679         while (!list_empty(&worker->scheduled)) {
1680                 struct work_struct *work = list_first_entry(&worker->scheduled,
1681                                                 struct work_struct, entry);
1682                 process_one_work(worker, work);
1683         }
1684 }
1685
1686 /**
1687  * worker_thread - the worker thread function
1688  * @__worker: self
1689  *
1690  * The gcwq worker thread function.  There's a single dynamic pool of
1691  * these per each cpu.  These workers process all works regardless of
1692  * their specific target workqueue.  The only exception is works which
1693  * belong to workqueues with a rescuer which will be explained in
1694  * rescuer_thread().
1695  */
1696 static int worker_thread(void *__worker)
1697 {
1698         struct worker *worker = __worker;
1699         struct global_cwq *gcwq = worker->gcwq;
1700
1701         /* tell the scheduler that this is a workqueue worker */
1702         worker->task->flags |= PF_WQ_WORKER;
1703 woke_up:
1704         spin_lock_irq(&gcwq->lock);
1705
1706         /* DIE can be set only while we're idle, checking here is enough */
1707         if (worker->flags & WORKER_DIE) {
1708                 spin_unlock_irq(&gcwq->lock);
1709                 worker->task->flags &= ~PF_WQ_WORKER;
1710                 return 0;
1711         }
1712
1713         worker_leave_idle(worker);
1714 recheck:
1715         /* no more worker necessary? */
1716         if (!need_more_worker(gcwq))
1717                 goto sleep;
1718
1719         /* do we need to manage? */
1720         if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
1721                 goto recheck;
1722
1723         /*
1724          * ->scheduled list can only be filled while a worker is
1725          * preparing to process a work or actually processing it.
1726          * Make sure nobody diddled with it while I was sleeping.
1727          */
1728         BUG_ON(!list_empty(&worker->scheduled));
1729
1730         /*
1731          * When control reaches this point, we're guaranteed to have
1732          * at least one idle worker or that someone else has already
1733          * assumed the manager role.
1734          */
1735         worker_clr_flags(worker, WORKER_PREP);
1736
1737         do {
1738                 struct work_struct *work =
1739                         list_first_entry(&gcwq->worklist,
1740                                          struct work_struct, entry);
1741
1742                 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1743                         /* optimization path, not strictly necessary */
1744                         process_one_work(worker, work);
1745                         if (unlikely(!list_empty(&worker->scheduled)))
1746                                 process_scheduled_works(worker);
1747                 } else {
1748                         move_linked_works(work, &worker->scheduled, NULL);
1749                         process_scheduled_works(worker);
1750                 }
1751         } while (keep_working(gcwq));
1752
1753         worker_set_flags(worker, WORKER_PREP, false);
1754
1755         if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
1756                 goto recheck;
1757 sleep:
1758         /*
1759          * gcwq->lock is held and there's no work to process and no
1760          * need to manage, sleep.  Workers are woken up only while
1761          * holding gcwq->lock or from local cpu, so setting the
1762          * current state before releasing gcwq->lock is enough to
1763          * prevent losing any event.
1764          */
1765         worker_enter_idle(worker);
1766         __set_current_state(TASK_INTERRUPTIBLE);
1767         spin_unlock_irq(&gcwq->lock);
1768         schedule();
1769         goto woke_up;
1770 }
1771
1772 /**
1773  * rescuer_thread - the rescuer thread function
1774  * @__wq: the associated workqueue
1775  *
1776  * Workqueue rescuer thread function.  There's one rescuer for each
1777  * workqueue which has WQ_RESCUER set.
1778  *
1779  * Regular work processing on a gcwq may block trying to create a new
1780  * worker which uses GFP_KERNEL allocation which has slight chance of
1781  * developing into deadlock if some works currently on the same queue
1782  * need to be processed to satisfy the GFP_KERNEL allocation.  This is
1783  * the problem rescuer solves.
1784  *
1785  * When such condition is possible, the gcwq summons rescuers of all
1786  * workqueues which have works queued on the gcwq and let them process
1787  * those works so that forward progress can be guaranteed.
1788  *
1789  * This should happen rarely.
1790  */
1791 static int rescuer_thread(void *__wq)
1792 {
1793         struct workqueue_struct *wq = __wq;
1794         struct worker *rescuer = wq->rescuer;
1795         struct list_head *scheduled = &rescuer->scheduled;
1796         unsigned int cpu;
1797
1798         set_user_nice(current, RESCUER_NICE_LEVEL);
1799 repeat:
1800         set_current_state(TASK_INTERRUPTIBLE);
1801
1802         if (kthread_should_stop())
1803                 return 0;
1804
1805         for_each_cpu(cpu, wq->mayday_mask) {
1806                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1807                 struct global_cwq *gcwq = cwq->gcwq;
1808                 struct work_struct *work, *n;
1809
1810                 __set_current_state(TASK_RUNNING);
1811                 cpumask_clear_cpu(cpu, wq->mayday_mask);
1812
1813                 /* migrate to the target cpu if possible */
1814                 rescuer->gcwq = gcwq;
1815                 worker_maybe_bind_and_lock(rescuer);
1816
1817                 /*
1818                  * Slurp in all works issued via this workqueue and
1819                  * process'em.
1820                  */
1821                 BUG_ON(!list_empty(&rescuer->scheduled));
1822                 list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
1823                         if (get_work_cwq(work) == cwq)
1824                                 move_linked_works(work, scheduled, &n);
1825
1826                 process_scheduled_works(rescuer);
1827                 spin_unlock_irq(&gcwq->lock);
1828         }
1829
1830         schedule();
1831         goto repeat;
1832 }
1833
1834 struct wq_barrier {
1835         struct work_struct      work;
1836         struct completion       done;
1837 };
1838
1839 static void wq_barrier_func(struct work_struct *work)
1840 {
1841         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
1842         complete(&barr->done);
1843 }
1844
1845 /**
1846  * insert_wq_barrier - insert a barrier work
1847  * @cwq: cwq to insert barrier into
1848  * @barr: wq_barrier to insert
1849  * @target: target work to attach @barr to
1850  * @worker: worker currently executing @target, NULL if @target is not executing
1851  *
1852  * @barr is linked to @target such that @barr is completed only after
1853  * @target finishes execution.  Please note that the ordering
1854  * guarantee is observed only with respect to @target and on the local
1855  * cpu.
1856  *
1857  * Currently, a queued barrier can't be canceled.  This is because
1858  * try_to_grab_pending() can't determine whether the work to be
1859  * grabbed is at the head of the queue and thus can't clear LINKED
1860  * flag of the previous work while there must be a valid next work
1861  * after a work with LINKED flag set.
1862  *
1863  * Note that when @worker is non-NULL, @target may be modified
1864  * underneath us, so we can't reliably determine cwq from @target.
1865  *
1866  * CONTEXT:
1867  * spin_lock_irq(gcwq->lock).
1868  */
1869 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
1870                               struct wq_barrier *barr,
1871                               struct work_struct *target, struct worker *worker)
1872 {
1873         struct list_head *head;
1874         unsigned int linked = 0;
1875
1876         /*
1877          * debugobject calls are safe here even with gcwq->lock locked
1878          * as we know for sure that this will not trigger any of the
1879          * checks and call back into the fixup functions where we
1880          * might deadlock.
1881          */
1882         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
1883         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
1884         init_completion(&barr->done);
1885
1886         /*
1887          * If @target is currently being executed, schedule the
1888          * barrier to the worker; otherwise, put it after @target.
1889          */
1890         if (worker)
1891                 head = worker->scheduled.next;
1892         else {
1893                 unsigned long *bits = work_data_bits(target);
1894
1895                 head = target->entry.next;
1896                 /* there can already be other linked works, inherit and set */
1897                 linked = *bits & WORK_STRUCT_LINKED;
1898                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
1899         }
1900
1901         debug_work_activate(&barr->work);
1902         insert_work(cwq, &barr->work, head,
1903                     work_color_to_flags(WORK_NO_COLOR) | linked);
1904 }
1905
1906 /**
1907  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
1908  * @wq: workqueue being flushed
1909  * @flush_color: new flush color, < 0 for no-op
1910  * @work_color: new work color, < 0 for no-op
1911  *
1912  * Prepare cwqs for workqueue flushing.
1913  *
1914  * If @flush_color is non-negative, flush_color on all cwqs should be
1915  * -1.  If no cwq has in-flight commands at the specified color, all
1916  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
1917  * has in flight commands, its cwq->flush_color is set to
1918  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
1919  * wakeup logic is armed and %true is returned.
1920  *
1921  * The caller should have initialized @wq->first_flusher prior to
1922  * calling this function with non-negative @flush_color.  If
1923  * @flush_color is negative, no flush color update is done and %false
1924  * is returned.
1925  *
1926  * If @work_color is non-negative, all cwqs should have the same
1927  * work_color which is previous to @work_color and all will be
1928  * advanced to @work_color.
1929  *
1930  * CONTEXT:
1931  * mutex_lock(wq->flush_mutex).
1932  *
1933  * RETURNS:
1934  * %true if @flush_color >= 0 and there's something to flush.  %false
1935  * otherwise.
1936  */
1937 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
1938                                       int flush_color, int work_color)
1939 {
1940         bool wait = false;
1941         unsigned int cpu;
1942
1943         if (flush_color >= 0) {
1944                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
1945                 atomic_set(&wq->nr_cwqs_to_flush, 1);
1946         }
1947
1948         for_each_possible_cpu(cpu) {
1949                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1950                 struct global_cwq *gcwq = cwq->gcwq;
1951
1952                 spin_lock_irq(&gcwq->lock);
1953
1954                 if (flush_color >= 0) {
1955                         BUG_ON(cwq->flush_color != -1);
1956
1957                         if (cwq->nr_in_flight[flush_color]) {
1958                                 cwq->flush_color = flush_color;
1959                                 atomic_inc(&wq->nr_cwqs_to_flush);
1960                                 wait = true;
1961                         }
1962                 }
1963
1964                 if (work_color >= 0) {
1965                         BUG_ON(work_color != work_next_color(cwq->work_color));
1966                         cwq->work_color = work_color;
1967                 }
1968
1969                 spin_unlock_irq(&gcwq->lock);
1970         }
1971
1972         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
1973                 complete(&wq->first_flusher->done);
1974
1975         return wait;
1976 }
1977
1978 /**
1979  * flush_workqueue - ensure that any scheduled work has run to completion.
1980  * @wq: workqueue to flush
1981  *
1982  * Forces execution of the workqueue and blocks until its completion.
1983  * This is typically used in driver shutdown handlers.
1984  *
1985  * We sleep until all works which were queued on entry have been handled,
1986  * but we are not livelocked by new incoming ones.
1987  */
1988 void flush_workqueue(struct workqueue_struct *wq)
1989 {
1990         struct wq_flusher this_flusher = {
1991                 .list = LIST_HEAD_INIT(this_flusher.list),
1992                 .flush_color = -1,
1993                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
1994         };
1995         int next_color;
1996
1997         lock_map_acquire(&wq->lockdep_map);
1998         lock_map_release(&wq->lockdep_map);
1999
2000         mutex_lock(&wq->flush_mutex);
2001
2002         /*
2003          * Start-to-wait phase
2004          */
2005         next_color = work_next_color(wq->work_color);
2006
2007         if (next_color != wq->flush_color) {
2008                 /*
2009                  * Color space is not full.  The current work_color
2010                  * becomes our flush_color and work_color is advanced
2011                  * by one.
2012                  */
2013                 BUG_ON(!list_empty(&wq->flusher_overflow));
2014                 this_flusher.flush_color = wq->work_color;
2015                 wq->work_color = next_color;
2016
2017                 if (!wq->first_flusher) {
2018                         /* no flush in progress, become the first flusher */
2019                         BUG_ON(wq->flush_color != this_flusher.flush_color);
2020
2021                         wq->first_flusher = &this_flusher;
2022
2023                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
2024                                                        wq->work_color)) {
2025                                 /* nothing to flush, done */
2026                                 wq->flush_color = next_color;
2027                                 wq->first_flusher = NULL;
2028                                 goto out_unlock;
2029                         }
2030                 } else {
2031                         /* wait in queue */
2032                         BUG_ON(wq->flush_color == this_flusher.flush_color);
2033                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
2034                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2035                 }
2036         } else {
2037                 /*
2038                  * Oops, color space is full, wait on overflow queue.
2039                  * The next flush completion will assign us
2040                  * flush_color and transfer to flusher_queue.
2041                  */
2042                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2043         }
2044
2045         mutex_unlock(&wq->flush_mutex);
2046
2047         wait_for_completion(&this_flusher.done);
2048
2049         /*
2050          * Wake-up-and-cascade phase
2051          *
2052          * First flushers are responsible for cascading flushes and
2053          * handling overflow.  Non-first flushers can simply return.
2054          */
2055         if (wq->first_flusher != &this_flusher)
2056                 return;
2057
2058         mutex_lock(&wq->flush_mutex);
2059
2060         wq->first_flusher = NULL;
2061
2062         BUG_ON(!list_empty(&this_flusher.list));
2063         BUG_ON(wq->flush_color != this_flusher.flush_color);
2064
2065         while (true) {
2066                 struct wq_flusher *next, *tmp;
2067
2068                 /* complete all the flushers sharing the current flush color */
2069                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2070                         if (next->flush_color != wq->flush_color)
2071                                 break;
2072                         list_del_init(&next->list);
2073                         complete(&next->done);
2074                 }
2075
2076                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
2077                        wq->flush_color != work_next_color(wq->work_color));
2078
2079                 /* this flush_color is finished, advance by one */
2080                 wq->flush_color = work_next_color(wq->flush_color);
2081
2082                 /* one color has been freed, handle overflow queue */
2083                 if (!list_empty(&wq->flusher_overflow)) {
2084                         /*
2085                          * Assign the same color to all overflowed
2086                          * flushers, advance work_color and append to
2087                          * flusher_queue.  This is the start-to-wait
2088                          * phase for these overflowed flushers.
2089                          */
2090                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
2091                                 tmp->flush_color = wq->work_color;
2092
2093                         wq->work_color = work_next_color(wq->work_color);
2094
2095                         list_splice_tail_init(&wq->flusher_overflow,
2096                                               &wq->flusher_queue);
2097                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2098                 }
2099
2100                 if (list_empty(&wq->flusher_queue)) {
2101                         BUG_ON(wq->flush_color != wq->work_color);
2102                         break;
2103                 }
2104
2105                 /*
2106                  * Need to flush more colors.  Make the next flusher
2107                  * the new first flusher and arm cwqs.
2108                  */
2109                 BUG_ON(wq->flush_color == wq->work_color);
2110                 BUG_ON(wq->flush_color != next->flush_color);
2111
2112                 list_del_init(&next->list);
2113                 wq->first_flusher = next;
2114
2115                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
2116                         break;
2117
2118                 /*
2119                  * Meh... this color is already done, clear first
2120                  * flusher and repeat cascading.
2121                  */
2122                 wq->first_flusher = NULL;
2123         }
2124
2125 out_unlock:
2126         mutex_unlock(&wq->flush_mutex);
2127 }
2128 EXPORT_SYMBOL_GPL(flush_workqueue);
2129
2130 /**
2131  * flush_work - block until a work_struct's callback has terminated
2132  * @work: the work which is to be flushed
2133  *
2134  * Returns false if @work has already terminated.
2135  *
2136  * It is expected that, prior to calling flush_work(), the caller has
2137  * arranged for the work to not be requeued, otherwise it doesn't make
2138  * sense to use this function.
2139  */
2140 int flush_work(struct work_struct *work)
2141 {
2142         struct worker *worker = NULL;
2143         struct global_cwq *gcwq;
2144         struct cpu_workqueue_struct *cwq;
2145         struct wq_barrier barr;
2146
2147         might_sleep();
2148         gcwq = get_work_gcwq(work);
2149         if (!gcwq)
2150                 return 0;
2151
2152         spin_lock_irq(&gcwq->lock);
2153         if (!list_empty(&work->entry)) {
2154                 /*
2155                  * See the comment near try_to_grab_pending()->smp_rmb().
2156                  * If it was re-queued to a different gcwq under us, we
2157                  * are not going to wait.
2158                  */
2159                 smp_rmb();
2160                 cwq = get_work_cwq(work);
2161                 if (unlikely(!cwq || gcwq != cwq->gcwq))
2162                         goto already_gone;
2163         } else {
2164                 worker = find_worker_executing_work(gcwq, work);
2165                 if (!worker)
2166                         goto already_gone;
2167                 cwq = worker->current_cwq;
2168         }
2169
2170         insert_wq_barrier(cwq, &barr, work, worker);
2171         spin_unlock_irq(&gcwq->lock);
2172
2173         lock_map_acquire(&cwq->wq->lockdep_map);
2174         lock_map_release(&cwq->wq->lockdep_map);
2175
2176         wait_for_completion(&barr.done);
2177         destroy_work_on_stack(&barr.work);
2178         return 1;
2179 already_gone:
2180         spin_unlock_irq(&gcwq->lock);
2181         return 0;
2182 }
2183 EXPORT_SYMBOL_GPL(flush_work);
2184
2185 /*
2186  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2187  * so this work can't be re-armed in any way.
2188  */
2189 static int try_to_grab_pending(struct work_struct *work)
2190 {
2191         struct global_cwq *gcwq;
2192         int ret = -1;
2193
2194         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2195                 return 0;
2196
2197         /*
2198          * The queueing is in progress, or it is already queued. Try to
2199          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2200          */
2201         gcwq = get_work_gcwq(work);
2202         if (!gcwq)
2203                 return ret;
2204
2205         spin_lock_irq(&gcwq->lock);
2206         if (!list_empty(&work->entry)) {
2207                 /*
2208                  * This work is queued, but perhaps we locked the wrong gcwq.
2209                  * In that case we must see the new value after rmb(), see
2210                  * insert_work()->wmb().
2211                  */
2212                 smp_rmb();
2213                 if (gcwq == get_work_gcwq(work)) {
2214                         debug_work_deactivate(work);
2215                         list_del_init(&work->entry);
2216                         cwq_dec_nr_in_flight(get_work_cwq(work),
2217                                              get_work_color(work));
2218                         ret = 1;
2219                 }
2220         }
2221         spin_unlock_irq(&gcwq->lock);
2222
2223         return ret;
2224 }
2225
2226 static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2227 {
2228         struct wq_barrier barr;
2229         struct worker *worker;
2230
2231         spin_lock_irq(&gcwq->lock);
2232
2233         worker = find_worker_executing_work(gcwq, work);
2234         if (unlikely(worker))
2235                 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2236
2237         spin_unlock_irq(&gcwq->lock);
2238
2239         if (unlikely(worker)) {
2240                 wait_for_completion(&barr.done);
2241                 destroy_work_on_stack(&barr.work);
2242         }
2243 }
2244
2245 static void wait_on_work(struct work_struct *work)
2246 {
2247         int cpu;
2248
2249         might_sleep();
2250
2251         lock_map_acquire(&work->lockdep_map);
2252         lock_map_release(&work->lockdep_map);
2253
2254         for_each_possible_cpu(cpu)
2255                 wait_on_cpu_work(get_gcwq(cpu), work);
2256 }
2257
2258 static int __cancel_work_timer(struct work_struct *work,
2259                                 struct timer_list* timer)
2260 {
2261         int ret;
2262
2263         do {
2264                 ret = (timer && likely(del_timer(timer)));
2265                 if (!ret)
2266                         ret = try_to_grab_pending(work);
2267                 wait_on_work(work);
2268         } while (unlikely(ret < 0));
2269
2270         clear_work_data(work);
2271         return ret;
2272 }
2273
2274 /**
2275  * cancel_work_sync - block until a work_struct's callback has terminated
2276  * @work: the work which is to be flushed
2277  *
2278  * Returns true if @work was pending.
2279  *
2280  * cancel_work_sync() will cancel the work if it is queued. If the work's
2281  * callback appears to be running, cancel_work_sync() will block until it
2282  * has completed.
2283  *
2284  * It is possible to use this function if the work re-queues itself. It can
2285  * cancel the work even if it migrates to another workqueue, however in that
2286  * case it only guarantees that work->func() has completed on the last queued
2287  * workqueue.
2288  *
2289  * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
2290  * pending, otherwise it goes into a busy-wait loop until the timer expires.
2291  *
2292  * The caller must ensure that workqueue_struct on which this work was last
2293  * queued can't be destroyed before this function returns.
2294  */
2295 int cancel_work_sync(struct work_struct *work)
2296 {
2297         return __cancel_work_timer(work, NULL);
2298 }
2299 EXPORT_SYMBOL_GPL(cancel_work_sync);
2300
2301 /**
2302  * cancel_delayed_work_sync - reliably kill off a delayed work.
2303  * @dwork: the delayed work struct
2304  *
2305  * Returns true if @dwork was pending.
2306  *
2307  * It is possible to use this function if @dwork rearms itself via queue_work()
2308  * or queue_delayed_work(). See also the comment for cancel_work_sync().
2309  */
2310 int cancel_delayed_work_sync(struct delayed_work *dwork)
2311 {
2312         return __cancel_work_timer(&dwork->work, &dwork->timer);
2313 }
2314 EXPORT_SYMBOL(cancel_delayed_work_sync);
2315
2316 /**
2317  * schedule_work - put work task in global workqueue
2318  * @work: job to be done
2319  *
2320  * Returns zero if @work was already on the kernel-global workqueue and
2321  * non-zero otherwise.
2322  *
2323  * This puts a job in the kernel-global workqueue if it was not already
2324  * queued and leaves it in the same position on the kernel-global
2325  * workqueue otherwise.
2326  */
2327 int schedule_work(struct work_struct *work)
2328 {
2329         return queue_work(system_wq, work);
2330 }
2331 EXPORT_SYMBOL(schedule_work);
2332
2333 /*
2334  * schedule_work_on - put work task on a specific cpu
2335  * @cpu: cpu to put the work task on
2336  * @work: job to be done
2337  *
2338  * This puts a job on a specific cpu
2339  */
2340 int schedule_work_on(int cpu, struct work_struct *work)
2341 {
2342         return queue_work_on(cpu, system_wq, work);
2343 }
2344 EXPORT_SYMBOL(schedule_work_on);
2345
2346 /**
2347  * schedule_delayed_work - put work task in global workqueue after delay
2348  * @dwork: job to be done
2349  * @delay: number of jiffies to wait or 0 for immediate execution
2350  *
2351  * After waiting for a given time this puts a job in the kernel-global
2352  * workqueue.
2353  */
2354 int schedule_delayed_work(struct delayed_work *dwork,
2355                                         unsigned long delay)
2356 {
2357         return queue_delayed_work(system_wq, dwork, delay);
2358 }
2359 EXPORT_SYMBOL(schedule_delayed_work);
2360
2361 /**
2362  * flush_delayed_work - block until a dwork_struct's callback has terminated
2363  * @dwork: the delayed work which is to be flushed
2364  *
2365  * Any timeout is cancelled, and any pending work is run immediately.
2366  */
2367 void flush_delayed_work(struct delayed_work *dwork)
2368 {
2369         if (del_timer_sync(&dwork->timer)) {
2370                 __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
2371                              &dwork->work);
2372                 put_cpu();
2373         }
2374         flush_work(&dwork->work);
2375 }
2376 EXPORT_SYMBOL(flush_delayed_work);
2377
2378 /**
2379  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
2380  * @cpu: cpu to use
2381  * @dwork: job to be done
2382  * @delay: number of jiffies to wait
2383  *
2384  * After waiting for a given time this puts a job in the kernel-global
2385  * workqueue on the specified CPU.
2386  */
2387 int schedule_delayed_work_on(int cpu,
2388                         struct delayed_work *dwork, unsigned long delay)
2389 {
2390         return queue_delayed_work_on(cpu, system_wq, dwork, delay);
2391 }
2392 EXPORT_SYMBOL(schedule_delayed_work_on);
2393
2394 /**
2395  * schedule_on_each_cpu - call a function on each online CPU from keventd
2396  * @func: the function to call
2397  *
2398  * Returns zero on success.
2399  * Returns -ve errno on failure.
2400  *
2401  * schedule_on_each_cpu() is very slow.
2402  */
2403 int schedule_on_each_cpu(work_func_t func)
2404 {
2405         int cpu;
2406         struct work_struct *works;
2407
2408         works = alloc_percpu(struct work_struct);
2409         if (!works)
2410                 return -ENOMEM;
2411
2412         get_online_cpus();
2413
2414         for_each_online_cpu(cpu) {
2415                 struct work_struct *work = per_cpu_ptr(works, cpu);
2416
2417                 INIT_WORK(work, func);
2418                 schedule_work_on(cpu, work);
2419         }
2420
2421         for_each_online_cpu(cpu)
2422                 flush_work(per_cpu_ptr(works, cpu));
2423
2424         put_online_cpus();
2425         free_percpu(works);
2426         return 0;
2427 }
2428
2429 /**
2430  * flush_scheduled_work - ensure that any scheduled work has run to completion.
2431  *
2432  * Forces execution of the kernel-global workqueue and blocks until its
2433  * completion.
2434  *
2435  * Think twice before calling this function!  It's very easy to get into
2436  * trouble if you don't take great care.  Either of the following situations
2437  * will lead to deadlock:
2438  *
2439  *      One of the work items currently on the workqueue needs to acquire
2440  *      a lock held by your code or its caller.
2441  *
2442  *      Your code is running in the context of a work routine.
2443  *
2444  * They will be detected by lockdep when they occur, but the first might not
2445  * occur very often.  It depends on what work items are on the workqueue and
2446  * what locks they need, which you have no control over.
2447  *
2448  * In most situations flushing the entire workqueue is overkill; you merely
2449  * need to know that a particular work item isn't queued and isn't running.
2450  * In such cases you should use cancel_delayed_work_sync() or
2451  * cancel_work_sync() instead.
2452  */
2453 void flush_scheduled_work(void)
2454 {
2455         flush_workqueue(system_wq);
2456 }
2457 EXPORT_SYMBOL(flush_scheduled_work);
2458
2459 /**
2460  * execute_in_process_context - reliably execute the routine with user context
2461  * @fn:         the function to execute
2462  * @ew:         guaranteed storage for the execute work structure (must
2463  *              be available when the work executes)
2464  *
2465  * Executes the function immediately if process context is available,
2466  * otherwise schedules the function for delayed execution.
2467  *
2468  * Returns:     0 - function was executed
2469  *              1 - function was scheduled for execution
2470  */
2471 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
2472 {
2473         if (!in_interrupt()) {
2474                 fn(&ew->work);
2475                 return 0;
2476         }
2477
2478         INIT_WORK(&ew->work, fn);
2479         schedule_work(&ew->work);
2480
2481         return 1;
2482 }
2483 EXPORT_SYMBOL_GPL(execute_in_process_context);
2484
2485 int keventd_up(void)
2486 {
2487         return system_wq != NULL;
2488 }
2489
2490 static struct cpu_workqueue_struct *alloc_cwqs(void)
2491 {
2492         /*
2493          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
2494          * Make sure that the alignment isn't lower than that of
2495          * unsigned long long.
2496          */
2497         const size_t size = sizeof(struct cpu_workqueue_struct);
2498         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
2499                                    __alignof__(unsigned long long));
2500         struct cpu_workqueue_struct *cwqs;
2501 #ifndef CONFIG_SMP
2502         void *ptr;
2503
2504         /*
2505          * On UP, percpu allocator doesn't honor alignment parameter
2506          * and simply uses arch-dependent default.  Allocate enough
2507          * room to align cwq and put an extra pointer at the end
2508          * pointing back to the originally allocated pointer which
2509          * will be used for free.
2510          *
2511          * FIXME: This really belongs to UP percpu code.  Update UP
2512          * percpu code to honor alignment and remove this ugliness.
2513          */
2514         ptr = __alloc_percpu(size + align + sizeof(void *), 1);
2515         cwqs = PTR_ALIGN(ptr, align);
2516         *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
2517 #else
2518         /* On SMP, percpu allocator can do it itself */
2519         cwqs = __alloc_percpu(size, align);
2520 #endif
2521         /* just in case, make sure it's actually aligned */
2522         BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
2523         return cwqs;
2524 }
2525
2526 static void free_cwqs(struct cpu_workqueue_struct *cwqs)
2527 {
2528 #ifndef CONFIG_SMP
2529         /* on UP, the pointer to free is stored right after the cwq */
2530         if (cwqs)
2531                 free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
2532 #else
2533         free_percpu(cwqs);
2534 #endif
2535 }
2536
2537 static int wq_clamp_max_active(int max_active, const char *name)
2538 {
2539         if (max_active < 1 || max_active > WQ_MAX_ACTIVE)
2540                 printk(KERN_WARNING "workqueue: max_active %d requested for %s "
2541                        "is out of range, clamping between %d and %d\n",
2542                        max_active, name, 1, WQ_MAX_ACTIVE);
2543
2544         return clamp_val(max_active, 1, WQ_MAX_ACTIVE);
2545 }
2546
2547 struct workqueue_struct *__alloc_workqueue_key(const char *name,
2548                                                unsigned int flags,
2549                                                int max_active,
2550                                                struct lock_class_key *key,
2551                                                const char *lock_name)
2552 {
2553         struct workqueue_struct *wq;
2554         unsigned int cpu;
2555
2556         max_active = max_active ?: WQ_DFL_ACTIVE;
2557         max_active = wq_clamp_max_active(max_active, name);
2558
2559         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
2560         if (!wq)
2561                 goto err;
2562
2563         wq->cpu_wq = alloc_cwqs();
2564         if (!wq->cpu_wq)
2565                 goto err;
2566
2567         wq->flags = flags;
2568         wq->saved_max_active = max_active;
2569         mutex_init(&wq->flush_mutex);
2570         atomic_set(&wq->nr_cwqs_to_flush, 0);
2571         INIT_LIST_HEAD(&wq->flusher_queue);
2572         INIT_LIST_HEAD(&wq->flusher_overflow);
2573         wq->single_cpu = NR_CPUS;
2574
2575         wq->name = name;
2576         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
2577         INIT_LIST_HEAD(&wq->list);
2578
2579         for_each_possible_cpu(cpu) {
2580                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2581                 struct global_cwq *gcwq = get_gcwq(cpu);
2582
2583                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
2584                 cwq->gcwq = gcwq;
2585                 cwq->wq = wq;
2586                 cwq->flush_color = -1;
2587                 cwq->max_active = max_active;
2588                 INIT_LIST_HEAD(&cwq->delayed_works);
2589         }
2590
2591         if (flags & WQ_RESCUER) {
2592                 struct worker *rescuer;
2593
2594                 if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
2595                         goto err;
2596
2597                 wq->rescuer = rescuer = alloc_worker();
2598                 if (!rescuer)
2599                         goto err;
2600
2601                 rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
2602                 if (IS_ERR(rescuer->task))
2603                         goto err;
2604
2605                 wq->rescuer = rescuer;
2606                 rescuer->task->flags |= PF_THREAD_BOUND;
2607                 wake_up_process(rescuer->task);
2608         }
2609
2610         /*
2611          * workqueue_lock protects global freeze state and workqueues
2612          * list.  Grab it, set max_active accordingly and add the new
2613          * workqueue to workqueues list.
2614          */
2615         spin_lock(&workqueue_lock);
2616
2617         if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
2618                 for_each_possible_cpu(cpu)
2619                         get_cwq(cpu, wq)->max_active = 0;
2620
2621         list_add(&wq->list, &workqueues);
2622
2623         spin_unlock(&workqueue_lock);
2624
2625         return wq;
2626 err:
2627         if (wq) {
2628                 free_cwqs(wq->cpu_wq);
2629                 free_cpumask_var(wq->mayday_mask);
2630                 kfree(wq->rescuer);
2631                 kfree(wq);
2632         }
2633         return NULL;
2634 }
2635 EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
2636
2637 /**
2638  * destroy_workqueue - safely terminate a workqueue
2639  * @wq: target workqueue
2640  *
2641  * Safely destroy a workqueue. All work currently pending will be done first.
2642  */
2643 void destroy_workqueue(struct workqueue_struct *wq)
2644 {
2645         unsigned int cpu;
2646
2647         flush_workqueue(wq);
2648
2649         /*
2650          * wq list is used to freeze wq, remove from list after
2651          * flushing is complete in case freeze races us.
2652          */
2653         spin_lock(&workqueue_lock);
2654         list_del(&wq->list);
2655         spin_unlock(&workqueue_lock);
2656
2657         /* sanity check */
2658         for_each_possible_cpu(cpu) {
2659                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2660                 int i;
2661
2662                 for (i = 0; i < WORK_NR_COLORS; i++)
2663                         BUG_ON(cwq->nr_in_flight[i]);
2664                 BUG_ON(cwq->nr_active);
2665                 BUG_ON(!list_empty(&cwq->delayed_works));
2666         }
2667
2668         if (wq->flags & WQ_RESCUER) {
2669                 kthread_stop(wq->rescuer->task);
2670                 free_cpumask_var(wq->mayday_mask);
2671         }
2672
2673         free_cwqs(wq->cpu_wq);
2674         kfree(wq);
2675 }
2676 EXPORT_SYMBOL_GPL(destroy_workqueue);
2677
2678 /*
2679  * CPU hotplug.
2680  *
2681  * There are two challenges in supporting CPU hotplug.  Firstly, there
2682  * are a lot of assumptions on strong associations among work, cwq and
2683  * gcwq which make migrating pending and scheduled works very
2684  * difficult to implement without impacting hot paths.  Secondly,
2685  * gcwqs serve mix of short, long and very long running works making
2686  * blocked draining impractical.
2687  *
2688  * This is solved by allowing a gcwq to be detached from CPU, running
2689  * it with unbound (rogue) workers and allowing it to be reattached
2690  * later if the cpu comes back online.  A separate thread is created
2691  * to govern a gcwq in such state and is called the trustee of the
2692  * gcwq.
2693  *
2694  * Trustee states and their descriptions.
2695  *
2696  * START        Command state used on startup.  On CPU_DOWN_PREPARE, a
2697  *              new trustee is started with this state.
2698  *
2699  * IN_CHARGE    Once started, trustee will enter this state after
2700  *              assuming the manager role and making all existing
2701  *              workers rogue.  DOWN_PREPARE waits for trustee to
2702  *              enter this state.  After reaching IN_CHARGE, trustee
2703  *              tries to execute the pending worklist until it's empty
2704  *              and the state is set to BUTCHER, or the state is set
2705  *              to RELEASE.
2706  *
2707  * BUTCHER      Command state which is set by the cpu callback after
2708  *              the cpu has went down.  Once this state is set trustee
2709  *              knows that there will be no new works on the worklist
2710  *              and once the worklist is empty it can proceed to
2711  *              killing idle workers.
2712  *
2713  * RELEASE      Command state which is set by the cpu callback if the
2714  *              cpu down has been canceled or it has come online
2715  *              again.  After recognizing this state, trustee stops
2716  *              trying to drain or butcher and clears ROGUE, rebinds
2717  *              all remaining workers back to the cpu and releases
2718  *              manager role.
2719  *
2720  * DONE         Trustee will enter this state after BUTCHER or RELEASE
2721  *              is complete.
2722  *
2723  *          trustee                 CPU                draining
2724  *         took over                down               complete
2725  * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
2726  *                        |                     |                  ^
2727  *                        | CPU is back online  v   return workers |
2728  *                         ----------------> RELEASE --------------
2729  */
2730
2731 /**
2732  * trustee_wait_event_timeout - timed event wait for trustee
2733  * @cond: condition to wait for
2734  * @timeout: timeout in jiffies
2735  *
2736  * wait_event_timeout() for trustee to use.  Handles locking and
2737  * checks for RELEASE request.
2738  *
2739  * CONTEXT:
2740  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2741  * multiple times.  To be used by trustee.
2742  *
2743  * RETURNS:
2744  * Positive indicating left time if @cond is satisfied, 0 if timed
2745  * out, -1 if canceled.
2746  */
2747 #define trustee_wait_event_timeout(cond, timeout) ({                    \
2748         long __ret = (timeout);                                         \
2749         while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
2750                __ret) {                                                 \
2751                 spin_unlock_irq(&gcwq->lock);                           \
2752                 __wait_event_timeout(gcwq->trustee_wait, (cond) ||      \
2753                         (gcwq->trustee_state == TRUSTEE_RELEASE),       \
2754                         __ret);                                         \
2755                 spin_lock_irq(&gcwq->lock);                             \
2756         }                                                               \
2757         gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);          \
2758 })
2759
2760 /**
2761  * trustee_wait_event - event wait for trustee
2762  * @cond: condition to wait for
2763  *
2764  * wait_event() for trustee to use.  Automatically handles locking and
2765  * checks for CANCEL request.
2766  *
2767  * CONTEXT:
2768  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2769  * multiple times.  To be used by trustee.
2770  *
2771  * RETURNS:
2772  * 0 if @cond is satisfied, -1 if canceled.
2773  */
2774 #define trustee_wait_event(cond) ({                                     \
2775         long __ret1;                                                    \
2776         __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
2777         __ret1 < 0 ? -1 : 0;                                            \
2778 })
2779
2780 static int __cpuinit trustee_thread(void *__gcwq)
2781 {
2782         struct global_cwq *gcwq = __gcwq;
2783         struct worker *worker;
2784         struct work_struct *work;
2785         struct hlist_node *pos;
2786         long rc;
2787         int i;
2788
2789         BUG_ON(gcwq->cpu != smp_processor_id());
2790
2791         spin_lock_irq(&gcwq->lock);
2792         /*
2793          * Claim the manager position and make all workers rogue.
2794          * Trustee must be bound to the target cpu and can't be
2795          * cancelled.
2796          */
2797         BUG_ON(gcwq->cpu != smp_processor_id());
2798         rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
2799         BUG_ON(rc < 0);
2800
2801         gcwq->flags |= GCWQ_MANAGING_WORKERS;
2802
2803         list_for_each_entry(worker, &gcwq->idle_list, entry)
2804                 worker_set_flags(worker, WORKER_ROGUE, false);
2805
2806         for_each_busy_worker(worker, i, pos, gcwq)
2807                 worker_set_flags(worker, WORKER_ROGUE, false);
2808
2809         /*
2810          * Call schedule() so that we cross rq->lock and thus can
2811          * guarantee sched callbacks see the rogue flag.  This is
2812          * necessary as scheduler callbacks may be invoked from other
2813          * cpus.
2814          */
2815         spin_unlock_irq(&gcwq->lock);
2816         schedule();
2817         spin_lock_irq(&gcwq->lock);
2818
2819         /*
2820          * Sched callbacks are disabled now.  gcwq->nr_running should
2821          * be zero and will stay that way, making need_more_worker()
2822          * and keep_working() always return true as long as the
2823          * worklist is not empty.
2824          */
2825         WARN_ON_ONCE(atomic_read(get_gcwq_nr_running(gcwq->cpu)) != 0);
2826
2827         spin_unlock_irq(&gcwq->lock);
2828         del_timer_sync(&gcwq->idle_timer);
2829         spin_lock_irq(&gcwq->lock);
2830
2831         /*
2832          * We're now in charge.  Notify and proceed to drain.  We need
2833          * to keep the gcwq running during the whole CPU down
2834          * procedure as other cpu hotunplug callbacks may need to
2835          * flush currently running tasks.
2836          */
2837         gcwq->trustee_state = TRUSTEE_IN_CHARGE;
2838         wake_up_all(&gcwq->trustee_wait);
2839
2840         /*
2841          * The original cpu is in the process of dying and may go away
2842          * anytime now.  When that happens, we and all workers would
2843          * be migrated to other cpus.  Try draining any left work.  We
2844          * want to get it over with ASAP - spam rescuers, wake up as
2845          * many idlers as necessary and create new ones till the
2846          * worklist is empty.  Note that if the gcwq is frozen, there
2847          * may be frozen works in freezeable cwqs.  Don't declare
2848          * completion while frozen.
2849          */
2850         while (gcwq->nr_workers != gcwq->nr_idle ||
2851                gcwq->flags & GCWQ_FREEZING ||
2852                gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
2853                 int nr_works = 0;
2854
2855                 list_for_each_entry(work, &gcwq->worklist, entry) {
2856                         send_mayday(work);
2857                         nr_works++;
2858                 }
2859
2860                 list_for_each_entry(worker, &gcwq->idle_list, entry) {
2861                         if (!nr_works--)
2862                                 break;
2863                         wake_up_process(worker->task);
2864                 }
2865
2866                 if (need_to_create_worker(gcwq)) {
2867                         spin_unlock_irq(&gcwq->lock);
2868                         worker = create_worker(gcwq, false);
2869                         spin_lock_irq(&gcwq->lock);
2870                         if (worker) {
2871                                 worker_set_flags(worker, WORKER_ROGUE, false);
2872                                 start_worker(worker);
2873                         }
2874                 }
2875
2876                 /* give a breather */
2877                 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
2878                         break;
2879         }
2880
2881         /*
2882          * Either all works have been scheduled and cpu is down, or
2883          * cpu down has already been canceled.  Wait for and butcher
2884          * all workers till we're canceled.
2885          */
2886         do {
2887                 rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
2888                 while (!list_empty(&gcwq->idle_list))
2889                         destroy_worker(list_first_entry(&gcwq->idle_list,
2890                                                         struct worker, entry));
2891         } while (gcwq->nr_workers && rc >= 0);
2892
2893         /*
2894          * At this point, either draining has completed and no worker
2895          * is left, or cpu down has been canceled or the cpu is being
2896          * brought back up.  There shouldn't be any idle one left.
2897          * Tell the remaining busy ones to rebind once it finishes the
2898          * currently scheduled works by scheduling the rebind_work.
2899          */
2900         WARN_ON(!list_empty(&gcwq->idle_list));
2901
2902         for_each_busy_worker(worker, i, pos, gcwq) {
2903                 struct work_struct *rebind_work = &worker->rebind_work;
2904
2905                 /*
2906                  * Rebind_work may race with future cpu hotplug
2907                  * operations.  Use a separate flag to mark that
2908                  * rebinding is scheduled.
2909                  */
2910                 worker_set_flags(worker, WORKER_REBIND, false);
2911                 worker_clr_flags(worker, WORKER_ROGUE);
2912
2913                 /* queue rebind_work, wq doesn't matter, use the default one */
2914                 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
2915                                      work_data_bits(rebind_work)))
2916                         continue;
2917
2918                 debug_work_activate(rebind_work);
2919                 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
2920                             worker->scheduled.next,
2921                             work_color_to_flags(WORK_NO_COLOR));
2922         }
2923
2924         /* relinquish manager role */
2925         gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
2926
2927         /* notify completion */
2928         gcwq->trustee = NULL;
2929         gcwq->trustee_state = TRUSTEE_DONE;
2930         wake_up_all(&gcwq->trustee_wait);
2931         spin_unlock_irq(&gcwq->lock);
2932         return 0;
2933 }
2934
2935 /**
2936  * wait_trustee_state - wait for trustee to enter the specified state
2937  * @gcwq: gcwq the trustee of interest belongs to
2938  * @state: target state to wait for
2939  *
2940  * Wait for the trustee to reach @state.  DONE is already matched.
2941  *
2942  * CONTEXT:
2943  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2944  * multiple times.  To be used by cpu_callback.
2945  */
2946 static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
2947 {
2948         if (!(gcwq->trustee_state == state ||
2949               gcwq->trustee_state == TRUSTEE_DONE)) {
2950                 spin_unlock_irq(&gcwq->lock);
2951                 __wait_event(gcwq->trustee_wait,
2952                              gcwq->trustee_state == state ||
2953                              gcwq->trustee_state == TRUSTEE_DONE);
2954                 spin_lock_irq(&gcwq->lock);
2955         }
2956 }
2957
2958 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2959                                                 unsigned long action,
2960                                                 void *hcpu)
2961 {
2962         unsigned int cpu = (unsigned long)hcpu;
2963         struct global_cwq *gcwq = get_gcwq(cpu);
2964         struct task_struct *new_trustee = NULL;
2965         struct worker *uninitialized_var(new_worker);
2966         unsigned long flags;
2967
2968         action &= ~CPU_TASKS_FROZEN;
2969
2970         switch (action) {
2971         case CPU_DOWN_PREPARE:
2972                 new_trustee = kthread_create(trustee_thread, gcwq,
2973                                              "workqueue_trustee/%d\n", cpu);
2974                 if (IS_ERR(new_trustee))
2975                         return notifier_from_errno(PTR_ERR(new_trustee));
2976                 kthread_bind(new_trustee, cpu);
2977                 /* fall through */
2978         case CPU_UP_PREPARE:
2979                 BUG_ON(gcwq->first_idle);
2980                 new_worker = create_worker(gcwq, false);
2981                 if (!new_worker) {
2982                         if (new_trustee)
2983                                 kthread_stop(new_trustee);
2984                         return NOTIFY_BAD;
2985                 }
2986         }
2987
2988         /* some are called w/ irq disabled, don't disturb irq status */
2989         spin_lock_irqsave(&gcwq->lock, flags);
2990
2991         switch (action) {
2992         case CPU_DOWN_PREPARE:
2993                 /* initialize trustee and tell it to acquire the gcwq */
2994                 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
2995                 gcwq->trustee = new_trustee;
2996                 gcwq->trustee_state = TRUSTEE_START;
2997                 wake_up_process(gcwq->trustee);
2998                 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
2999                 /* fall through */
3000         case CPU_UP_PREPARE:
3001                 BUG_ON(gcwq->first_idle);
3002                 gcwq->first_idle = new_worker;
3003                 break;
3004
3005         case CPU_DYING:
3006                 /*
3007                  * Before this, the trustee and all workers except for
3008                  * the ones which are still executing works from
3009                  * before the last CPU down must be on the cpu.  After
3010                  * this, they'll all be diasporas.
3011                  */
3012                 gcwq->flags |= GCWQ_DISASSOCIATED;
3013                 break;
3014
3015         case CPU_POST_DEAD:
3016                 gcwq->trustee_state = TRUSTEE_BUTCHER;
3017                 /* fall through */
3018         case CPU_UP_CANCELED:
3019                 destroy_worker(gcwq->first_idle);
3020                 gcwq->first_idle = NULL;
3021                 break;
3022
3023         case CPU_DOWN_FAILED:
3024         case CPU_ONLINE:
3025                 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3026                 if (gcwq->trustee_state != TRUSTEE_DONE) {
3027                         gcwq->trustee_state = TRUSTEE_RELEASE;
3028                         wake_up_process(gcwq->trustee);
3029                         wait_trustee_state(gcwq, TRUSTEE_DONE);
3030                 }
3031
3032                 /*
3033                  * Trustee is done and there might be no worker left.
3034                  * Put the first_idle in and request a real manager to
3035                  * take a look.
3036                  */
3037                 spin_unlock_irq(&gcwq->lock);
3038                 kthread_bind(gcwq->first_idle->task, cpu);
3039                 spin_lock_irq(&gcwq->lock);
3040                 gcwq->flags |= GCWQ_MANAGE_WORKERS;
3041                 start_worker(gcwq->first_idle);
3042                 gcwq->first_idle = NULL;
3043                 break;
3044         }
3045
3046         spin_unlock_irqrestore(&gcwq->lock, flags);
3047
3048         return notifier_from_errno(0);
3049 }
3050
3051 #ifdef CONFIG_SMP
3052
3053 struct work_for_cpu {
3054         struct completion completion;
3055         long (*fn)(void *);
3056         void *arg;
3057         long ret;
3058 };
3059
3060 static int do_work_for_cpu(void *_wfc)
3061 {
3062         struct work_for_cpu *wfc = _wfc;
3063         wfc->ret = wfc->fn(wfc->arg);
3064         complete(&wfc->completion);
3065         return 0;
3066 }
3067
3068 /**
3069  * work_on_cpu - run a function in user context on a particular cpu
3070  * @cpu: the cpu to run on
3071  * @fn: the function to run
3072  * @arg: the function arg
3073  *
3074  * This will return the value @fn returns.
3075  * It is up to the caller to ensure that the cpu doesn't go offline.
3076  * The caller must not hold any locks which would prevent @fn from completing.
3077  */
3078 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
3079 {
3080         struct task_struct *sub_thread;
3081         struct work_for_cpu wfc = {
3082                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
3083                 .fn = fn,
3084                 .arg = arg,
3085         };
3086
3087         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
3088         if (IS_ERR(sub_thread))
3089                 return PTR_ERR(sub_thread);
3090         kthread_bind(sub_thread, cpu);
3091         wake_up_process(sub_thread);
3092         wait_for_completion(&wfc.completion);
3093         return wfc.ret;
3094 }
3095 EXPORT_SYMBOL_GPL(work_on_cpu);
3096 #endif /* CONFIG_SMP */
3097
3098 #ifdef CONFIG_FREEZER
3099
3100 /**
3101  * freeze_workqueues_begin - begin freezing workqueues
3102  *
3103  * Start freezing workqueues.  After this function returns, all
3104  * freezeable workqueues will queue new works to their frozen_works
3105  * list instead of gcwq->worklist.
3106  *
3107  * CONTEXT:
3108  * Grabs and releases workqueue_lock and gcwq->lock's.
3109  */
3110 void freeze_workqueues_begin(void)
3111 {
3112         struct workqueue_struct *wq;
3113         unsigned int cpu;
3114
3115         spin_lock(&workqueue_lock);
3116
3117         BUG_ON(workqueue_freezing);
3118         workqueue_freezing = true;
3119
3120         for_each_possible_cpu(cpu) {
3121                 struct global_cwq *gcwq = get_gcwq(cpu);
3122
3123                 spin_lock_irq(&gcwq->lock);
3124
3125                 BUG_ON(gcwq->flags & GCWQ_FREEZING);
3126                 gcwq->flags |= GCWQ_FREEZING;
3127
3128                 list_for_each_entry(wq, &workqueues, list) {
3129                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3130
3131                         if (wq->flags & WQ_FREEZEABLE)
3132                                 cwq->max_active = 0;
3133                 }
3134
3135                 spin_unlock_irq(&gcwq->lock);
3136         }
3137
3138         spin_unlock(&workqueue_lock);
3139 }
3140
3141 /**
3142  * freeze_workqueues_busy - are freezeable workqueues still busy?
3143  *
3144  * Check whether freezing is complete.  This function must be called
3145  * between freeze_workqueues_begin() and thaw_workqueues().
3146  *
3147  * CONTEXT:
3148  * Grabs and releases workqueue_lock.
3149  *
3150  * RETURNS:
3151  * %true if some freezeable workqueues are still busy.  %false if
3152  * freezing is complete.
3153  */
3154 bool freeze_workqueues_busy(void)
3155 {
3156         struct workqueue_struct *wq;
3157         unsigned int cpu;
3158         bool busy = false;
3159
3160         spin_lock(&workqueue_lock);
3161
3162         BUG_ON(!workqueue_freezing);
3163
3164         for_each_possible_cpu(cpu) {
3165                 /*
3166                  * nr_active is monotonically decreasing.  It's safe
3167                  * to peek without lock.
3168                  */
3169                 list_for_each_entry(wq, &workqueues, list) {
3170                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3171
3172                         if (!(wq->flags & WQ_FREEZEABLE))
3173                                 continue;
3174
3175                         BUG_ON(cwq->nr_active < 0);
3176                         if (cwq->nr_active) {
3177                                 busy = true;
3178                                 goto out_unlock;
3179                         }
3180                 }
3181         }
3182 out_unlock:
3183         spin_unlock(&workqueue_lock);
3184         return busy;
3185 }
3186
3187 /**
3188  * thaw_workqueues - thaw workqueues
3189  *
3190  * Thaw workqueues.  Normal queueing is restored and all collected
3191  * frozen works are transferred to their respective gcwq worklists.
3192  *
3193  * CONTEXT:
3194  * Grabs and releases workqueue_lock and gcwq->lock's.
3195  */
3196 void thaw_workqueues(void)
3197 {
3198         struct workqueue_struct *wq;
3199         unsigned int cpu;
3200
3201         spin_lock(&workqueue_lock);
3202
3203         if (!workqueue_freezing)
3204                 goto out_unlock;
3205
3206         for_each_possible_cpu(cpu) {
3207                 struct global_cwq *gcwq = get_gcwq(cpu);
3208
3209                 spin_lock_irq(&gcwq->lock);
3210
3211                 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
3212                 gcwq->flags &= ~GCWQ_FREEZING;
3213
3214                 list_for_each_entry(wq, &workqueues, list) {
3215                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3216
3217                         if (!(wq->flags & WQ_FREEZEABLE))
3218                                 continue;
3219
3220                         /* restore max_active and repopulate worklist */
3221                         cwq->max_active = wq->saved_max_active;
3222
3223                         while (!list_empty(&cwq->delayed_works) &&
3224                                cwq->nr_active < cwq->max_active)
3225                                 cwq_activate_first_delayed(cwq);
3226
3227                         /* perform delayed unbind from single cpu if empty */
3228                         if (wq->single_cpu == gcwq->cpu &&
3229                             !cwq->nr_active && list_empty(&cwq->delayed_works))
3230                                 cwq_unbind_single_cpu(cwq);
3231                 }
3232
3233                 wake_up_worker(gcwq);
3234
3235                 spin_unlock_irq(&gcwq->lock);
3236         }
3237
3238         workqueue_freezing = false;
3239 out_unlock:
3240         spin_unlock(&workqueue_lock);
3241 }
3242 #endif /* CONFIG_FREEZER */
3243
3244 void __init init_workqueues(void)
3245 {
3246         unsigned int cpu;
3247         int i;
3248
3249         /*
3250          * The pointer part of work->data is either pointing to the
3251          * cwq or contains the cpu number the work ran last on.  Make
3252          * sure cpu number won't overflow into kernel pointer area so
3253          * that they can be distinguished.
3254          */
3255         BUILD_BUG_ON(NR_CPUS << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
3256
3257         hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
3258
3259         /* initialize gcwqs */
3260         for_each_possible_cpu(cpu) {
3261                 struct global_cwq *gcwq = get_gcwq(cpu);
3262
3263                 spin_lock_init(&gcwq->lock);
3264                 INIT_LIST_HEAD(&gcwq->worklist);
3265                 gcwq->cpu = cpu;
3266
3267                 INIT_LIST_HEAD(&gcwq->idle_list);
3268                 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
3269                         INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
3270
3271                 init_timer_deferrable(&gcwq->idle_timer);
3272                 gcwq->idle_timer.function = idle_worker_timeout;
3273                 gcwq->idle_timer.data = (unsigned long)gcwq;
3274
3275                 setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
3276                             (unsigned long)gcwq);
3277
3278                 ida_init(&gcwq->worker_ida);
3279
3280                 gcwq->trustee_state = TRUSTEE_DONE;
3281                 init_waitqueue_head(&gcwq->trustee_wait);
3282         }
3283
3284         /* create the initial worker */
3285         for_each_online_cpu(cpu) {
3286                 struct global_cwq *gcwq = get_gcwq(cpu);
3287                 struct worker *worker;
3288
3289                 worker = create_worker(gcwq, true);
3290                 BUG_ON(!worker);
3291                 spin_lock_irq(&gcwq->lock);
3292                 start_worker(worker);
3293                 spin_unlock_irq(&gcwq->lock);
3294         }
3295
3296         system_wq = alloc_workqueue("events", 0, 0);
3297         system_long_wq = alloc_workqueue("events_long", 0, 0);
3298         system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3299         BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
3300 }