workqueue: make single thread workqueue shared worker pool friendly
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h>
36 #include <linux/idr.h>
37
38 enum {
39         /* global_cwq flags */
40         GCWQ_FREEZING           = 1 << 3,       /* freeze in progress */
41
42         /* worker flags */
43         WORKER_STARTED          = 1 << 0,       /* started */
44         WORKER_DIE              = 1 << 1,       /* die die die */
45         WORKER_IDLE             = 1 << 2,       /* is idle */
46         WORKER_ROGUE            = 1 << 4,       /* not bound to any cpu */
47
48         /* gcwq->trustee_state */
49         TRUSTEE_START           = 0,            /* start */
50         TRUSTEE_IN_CHARGE       = 1,            /* trustee in charge of gcwq */
51         TRUSTEE_BUTCHER         = 2,            /* butcher workers */
52         TRUSTEE_RELEASE         = 3,            /* release workers */
53         TRUSTEE_DONE            = 4,            /* trustee is done */
54
55         BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
56         BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
57         BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
58
59         TRUSTEE_COOLDOWN        = HZ / 10,      /* for trustee draining */
60 };
61
62 /*
63  * Structure fields follow one of the following exclusion rules.
64  *
65  * I: Set during initialization and read-only afterwards.
66  *
67  * L: gcwq->lock protected.  Access with gcwq->lock held.
68  *
69  * F: wq->flush_mutex protected.
70  *
71  * W: workqueue_lock protected.
72  */
73
74 struct global_cwq;
75 struct cpu_workqueue_struct;
76
77 struct worker {
78         /* on idle list while idle, on busy hash table while busy */
79         union {
80                 struct list_head        entry;  /* L: while idle */
81                 struct hlist_node       hentry; /* L: while busy */
82         };
83
84         struct work_struct      *current_work;  /* L: work being processed */
85         struct list_head        scheduled;      /* L: scheduled works */
86         struct task_struct      *task;          /* I: worker task */
87         struct global_cwq       *gcwq;          /* I: the associated gcwq */
88         struct cpu_workqueue_struct *cwq;       /* I: the associated cwq */
89         unsigned int            flags;          /* L: flags */
90         int                     id;             /* I: worker id */
91 };
92
93 /*
94  * Global per-cpu workqueue.
95  */
96 struct global_cwq {
97         spinlock_t              lock;           /* the gcwq lock */
98         unsigned int            cpu;            /* I: the associated cpu */
99         unsigned int            flags;          /* L: GCWQ_* flags */
100
101         int                     nr_workers;     /* L: total number of workers */
102         int                     nr_idle;        /* L: currently idle ones */
103
104         /* workers are chained either in the idle_list or busy_hash */
105         struct list_head        idle_list;      /* L: list of idle workers */
106         struct hlist_head       busy_hash[BUSY_WORKER_HASH_SIZE];
107                                                 /* L: hash of busy workers */
108
109         struct ida              worker_ida;     /* L: for worker IDs */
110
111         struct task_struct      *trustee;       /* L: for gcwq shutdown */
112         unsigned int            trustee_state;  /* L: trustee state */
113         wait_queue_head_t       trustee_wait;   /* trustee wait */
114 } ____cacheline_aligned_in_smp;
115
116 /*
117  * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
118  * work_struct->data are used for flags and thus cwqs need to be
119  * aligned at two's power of the number of flag bits.
120  */
121 struct cpu_workqueue_struct {
122         struct global_cwq       *gcwq;          /* I: the associated gcwq */
123         struct list_head worklist;
124         struct worker           *worker;
125         struct workqueue_struct *wq;            /* I: the owning workqueue */
126         int                     work_color;     /* L: current color */
127         int                     flush_color;    /* L: flushing color */
128         int                     nr_in_flight[WORK_NR_COLORS];
129                                                 /* L: nr of in_flight works */
130         int                     nr_active;      /* L: nr of active works */
131         int                     max_active;     /* L: max active works */
132         struct list_head        delayed_works;  /* L: delayed works */
133 };
134
135 /*
136  * Structure used to wait for workqueue flush.
137  */
138 struct wq_flusher {
139         struct list_head        list;           /* F: list of flushers */
140         int                     flush_color;    /* F: flush color waiting for */
141         struct completion       done;           /* flush completion */
142 };
143
144 /*
145  * The externally visible workqueue abstraction is an array of
146  * per-CPU workqueues:
147  */
148 struct workqueue_struct {
149         unsigned int            flags;          /* I: WQ_* flags */
150         struct cpu_workqueue_struct *cpu_wq;    /* I: cwq's */
151         struct list_head        list;           /* W: list of all workqueues */
152
153         struct mutex            flush_mutex;    /* protects wq flushing */
154         int                     work_color;     /* F: current work color */
155         int                     flush_color;    /* F: current flush color */
156         atomic_t                nr_cwqs_to_flush; /* flush in progress */
157         struct wq_flusher       *first_flusher; /* F: first flusher */
158         struct list_head        flusher_queue;  /* F: flush waiters */
159         struct list_head        flusher_overflow; /* F: flush overflow list */
160
161         unsigned long           single_cpu;     /* cpu for single cpu wq */
162
163         int                     saved_max_active; /* I: saved cwq max_active */
164         const char              *name;          /* I: workqueue name */
165 #ifdef CONFIG_LOCKDEP
166         struct lockdep_map      lockdep_map;
167 #endif
168 };
169
170 #define for_each_busy_worker(worker, i, pos, gcwq)                      \
171         for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
172                 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
173
174 #ifdef CONFIG_DEBUG_OBJECTS_WORK
175
176 static struct debug_obj_descr work_debug_descr;
177
178 /*
179  * fixup_init is called when:
180  * - an active object is initialized
181  */
182 static int work_fixup_init(void *addr, enum debug_obj_state state)
183 {
184         struct work_struct *work = addr;
185
186         switch (state) {
187         case ODEBUG_STATE_ACTIVE:
188                 cancel_work_sync(work);
189                 debug_object_init(work, &work_debug_descr);
190                 return 1;
191         default:
192                 return 0;
193         }
194 }
195
196 /*
197  * fixup_activate is called when:
198  * - an active object is activated
199  * - an unknown object is activated (might be a statically initialized object)
200  */
201 static int work_fixup_activate(void *addr, enum debug_obj_state state)
202 {
203         struct work_struct *work = addr;
204
205         switch (state) {
206
207         case ODEBUG_STATE_NOTAVAILABLE:
208                 /*
209                  * This is not really a fixup. The work struct was
210                  * statically initialized. We just make sure that it
211                  * is tracked in the object tracker.
212                  */
213                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
214                         debug_object_init(work, &work_debug_descr);
215                         debug_object_activate(work, &work_debug_descr);
216                         return 0;
217                 }
218                 WARN_ON_ONCE(1);
219                 return 0;
220
221         case ODEBUG_STATE_ACTIVE:
222                 WARN_ON(1);
223
224         default:
225                 return 0;
226         }
227 }
228
229 /*
230  * fixup_free is called when:
231  * - an active object is freed
232  */
233 static int work_fixup_free(void *addr, enum debug_obj_state state)
234 {
235         struct work_struct *work = addr;
236
237         switch (state) {
238         case ODEBUG_STATE_ACTIVE:
239                 cancel_work_sync(work);
240                 debug_object_free(work, &work_debug_descr);
241                 return 1;
242         default:
243                 return 0;
244         }
245 }
246
247 static struct debug_obj_descr work_debug_descr = {
248         .name           = "work_struct",
249         .fixup_init     = work_fixup_init,
250         .fixup_activate = work_fixup_activate,
251         .fixup_free     = work_fixup_free,
252 };
253
254 static inline void debug_work_activate(struct work_struct *work)
255 {
256         debug_object_activate(work, &work_debug_descr);
257 }
258
259 static inline void debug_work_deactivate(struct work_struct *work)
260 {
261         debug_object_deactivate(work, &work_debug_descr);
262 }
263
264 void __init_work(struct work_struct *work, int onstack)
265 {
266         if (onstack)
267                 debug_object_init_on_stack(work, &work_debug_descr);
268         else
269                 debug_object_init(work, &work_debug_descr);
270 }
271 EXPORT_SYMBOL_GPL(__init_work);
272
273 void destroy_work_on_stack(struct work_struct *work)
274 {
275         debug_object_free(work, &work_debug_descr);
276 }
277 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
278
279 #else
280 static inline void debug_work_activate(struct work_struct *work) { }
281 static inline void debug_work_deactivate(struct work_struct *work) { }
282 #endif
283
284 /* Serializes the accesses to the list of workqueues. */
285 static DEFINE_SPINLOCK(workqueue_lock);
286 static LIST_HEAD(workqueues);
287 static bool workqueue_freezing;         /* W: have wqs started freezing? */
288
289 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
290
291 static int worker_thread(void *__worker);
292
293 static struct global_cwq *get_gcwq(unsigned int cpu)
294 {
295         return &per_cpu(global_cwq, cpu);
296 }
297
298 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
299                                             struct workqueue_struct *wq)
300 {
301         return per_cpu_ptr(wq->cpu_wq, cpu);
302 }
303
304 static unsigned int work_color_to_flags(int color)
305 {
306         return color << WORK_STRUCT_COLOR_SHIFT;
307 }
308
309 static int get_work_color(struct work_struct *work)
310 {
311         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
312                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
313 }
314
315 static int work_next_color(int color)
316 {
317         return (color + 1) % WORK_NR_COLORS;
318 }
319
320 /*
321  * Set the workqueue on which a work item is to be run
322  * - Must *only* be called if the pending flag is set
323  */
324 static inline void set_wq_data(struct work_struct *work,
325                                struct cpu_workqueue_struct *cwq,
326                                unsigned long extra_flags)
327 {
328         BUG_ON(!work_pending(work));
329
330         atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
331                         WORK_STRUCT_PENDING | extra_flags);
332 }
333
334 /*
335  * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
336  */
337 static inline void clear_wq_data(struct work_struct *work)
338 {
339         atomic_long_set(&work->data, work_static(work));
340 }
341
342 static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
343 {
344         return (void *)(atomic_long_read(&work->data) &
345                         WORK_STRUCT_WQ_DATA_MASK);
346 }
347
348 /**
349  * busy_worker_head - return the busy hash head for a work
350  * @gcwq: gcwq of interest
351  * @work: work to be hashed
352  *
353  * Return hash head of @gcwq for @work.
354  *
355  * CONTEXT:
356  * spin_lock_irq(gcwq->lock).
357  *
358  * RETURNS:
359  * Pointer to the hash head.
360  */
361 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
362                                            struct work_struct *work)
363 {
364         const int base_shift = ilog2(sizeof(struct work_struct));
365         unsigned long v = (unsigned long)work;
366
367         /* simple shift and fold hash, do we need something better? */
368         v >>= base_shift;
369         v += v >> BUSY_WORKER_HASH_ORDER;
370         v &= BUSY_WORKER_HASH_MASK;
371
372         return &gcwq->busy_hash[v];
373 }
374
375 /**
376  * insert_work - insert a work into cwq
377  * @cwq: cwq @work belongs to
378  * @work: work to insert
379  * @head: insertion point
380  * @extra_flags: extra WORK_STRUCT_* flags to set
381  *
382  * Insert @work into @cwq after @head.
383  *
384  * CONTEXT:
385  * spin_lock_irq(gcwq->lock).
386  */
387 static void insert_work(struct cpu_workqueue_struct *cwq,
388                         struct work_struct *work, struct list_head *head,
389                         unsigned int extra_flags)
390 {
391         /* we own @work, set data and link */
392         set_wq_data(work, cwq, extra_flags);
393
394         /*
395          * Ensure that we get the right work->data if we see the
396          * result of list_add() below, see try_to_grab_pending().
397          */
398         smp_wmb();
399
400         list_add_tail(&work->entry, head);
401         wake_up_process(cwq->worker->task);
402 }
403
404 /**
405  * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
406  * @cwq: cwq to unbind
407  *
408  * Try to unbind @cwq from single cpu workqueue processing.  If
409  * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
410  *
411  * CONTEXT:
412  * spin_lock_irq(gcwq->lock).
413  */
414 static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
415 {
416         struct workqueue_struct *wq = cwq->wq;
417         struct global_cwq *gcwq = cwq->gcwq;
418
419         BUG_ON(wq->single_cpu != gcwq->cpu);
420         /*
421          * Unbind from workqueue if @cwq is not frozen.  If frozen,
422          * thaw_workqueues() will either restart processing on this
423          * cpu or unbind if empty.  This keeps works queued while
424          * frozen fully ordered and flushable.
425          */
426         if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
427                 smp_wmb();      /* paired with cmpxchg() in __queue_work() */
428                 wq->single_cpu = NR_CPUS;
429         }
430 }
431
432 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
433                          struct work_struct *work)
434 {
435         struct global_cwq *gcwq;
436         struct cpu_workqueue_struct *cwq;
437         struct list_head *worklist;
438         unsigned long flags;
439         bool arbitrate;
440
441         debug_work_activate(work);
442
443         /* determine gcwq to use */
444         if (!(wq->flags & WQ_SINGLE_CPU)) {
445                 /* just use the requested cpu for multicpu workqueues */
446                 gcwq = get_gcwq(cpu);
447                 spin_lock_irqsave(&gcwq->lock, flags);
448         } else {
449                 unsigned int req_cpu = cpu;
450
451                 /*
452                  * It's a bit more complex for single cpu workqueues.
453                  * We first need to determine which cpu is going to be
454                  * used.  If no cpu is currently serving this
455                  * workqueue, arbitrate using atomic accesses to
456                  * wq->single_cpu; otherwise, use the current one.
457                  */
458         retry:
459                 cpu = wq->single_cpu;
460                 arbitrate = cpu == NR_CPUS;
461                 if (arbitrate)
462                         cpu = req_cpu;
463
464                 gcwq = get_gcwq(cpu);
465                 spin_lock_irqsave(&gcwq->lock, flags);
466
467                 /*
468                  * The following cmpxchg() is a full barrier paired
469                  * with smp_wmb() in cwq_unbind_single_cpu() and
470                  * guarantees that all changes to wq->st_* fields are
471                  * visible on the new cpu after this point.
472                  */
473                 if (arbitrate)
474                         cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
475
476                 if (unlikely(wq->single_cpu != cpu)) {
477                         spin_unlock_irqrestore(&gcwq->lock, flags);
478                         goto retry;
479                 }
480         }
481
482         /* gcwq determined, get cwq and queue */
483         cwq = get_cwq(gcwq->cpu, wq);
484
485         BUG_ON(!list_empty(&work->entry));
486
487         cwq->nr_in_flight[cwq->work_color]++;
488
489         if (likely(cwq->nr_active < cwq->max_active)) {
490                 cwq->nr_active++;
491                 worklist = &cwq->worklist;
492         } else
493                 worklist = &cwq->delayed_works;
494
495         insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
496
497         spin_unlock_irqrestore(&gcwq->lock, flags);
498 }
499
500 /**
501  * queue_work - queue work on a workqueue
502  * @wq: workqueue to use
503  * @work: work to queue
504  *
505  * Returns 0 if @work was already on a queue, non-zero otherwise.
506  *
507  * We queue the work to the CPU on which it was submitted, but if the CPU dies
508  * it can be processed by another CPU.
509  */
510 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
511 {
512         int ret;
513
514         ret = queue_work_on(get_cpu(), wq, work);
515         put_cpu();
516
517         return ret;
518 }
519 EXPORT_SYMBOL_GPL(queue_work);
520
521 /**
522  * queue_work_on - queue work on specific cpu
523  * @cpu: CPU number to execute work on
524  * @wq: workqueue to use
525  * @work: work to queue
526  *
527  * Returns 0 if @work was already on a queue, non-zero otherwise.
528  *
529  * We queue the work to a specific CPU, the caller must ensure it
530  * can't go away.
531  */
532 int
533 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
534 {
535         int ret = 0;
536
537         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
538                 __queue_work(cpu, wq, work);
539                 ret = 1;
540         }
541         return ret;
542 }
543 EXPORT_SYMBOL_GPL(queue_work_on);
544
545 static void delayed_work_timer_fn(unsigned long __data)
546 {
547         struct delayed_work *dwork = (struct delayed_work *)__data;
548         struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
549
550         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
551 }
552
553 /**
554  * queue_delayed_work - queue work on a workqueue after delay
555  * @wq: workqueue to use
556  * @dwork: delayable work to queue
557  * @delay: number of jiffies to wait before queueing
558  *
559  * Returns 0 if @work was already on a queue, non-zero otherwise.
560  */
561 int queue_delayed_work(struct workqueue_struct *wq,
562                         struct delayed_work *dwork, unsigned long delay)
563 {
564         if (delay == 0)
565                 return queue_work(wq, &dwork->work);
566
567         return queue_delayed_work_on(-1, wq, dwork, delay);
568 }
569 EXPORT_SYMBOL_GPL(queue_delayed_work);
570
571 /**
572  * queue_delayed_work_on - queue work on specific CPU after delay
573  * @cpu: CPU number to execute work on
574  * @wq: workqueue to use
575  * @dwork: work to queue
576  * @delay: number of jiffies to wait before queueing
577  *
578  * Returns 0 if @work was already on a queue, non-zero otherwise.
579  */
580 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
581                         struct delayed_work *dwork, unsigned long delay)
582 {
583         int ret = 0;
584         struct timer_list *timer = &dwork->timer;
585         struct work_struct *work = &dwork->work;
586
587         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
588                 BUG_ON(timer_pending(timer));
589                 BUG_ON(!list_empty(&work->entry));
590
591                 timer_stats_timer_set_start_info(&dwork->timer);
592
593                 /* This stores cwq for the moment, for the timer_fn */
594                 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
595                 timer->expires = jiffies + delay;
596                 timer->data = (unsigned long)dwork;
597                 timer->function = delayed_work_timer_fn;
598
599                 if (unlikely(cpu >= 0))
600                         add_timer_on(timer, cpu);
601                 else
602                         add_timer(timer);
603                 ret = 1;
604         }
605         return ret;
606 }
607 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
608
609 /**
610  * worker_enter_idle - enter idle state
611  * @worker: worker which is entering idle state
612  *
613  * @worker is entering idle state.  Update stats and idle timer if
614  * necessary.
615  *
616  * LOCKING:
617  * spin_lock_irq(gcwq->lock).
618  */
619 static void worker_enter_idle(struct worker *worker)
620 {
621         struct global_cwq *gcwq = worker->gcwq;
622
623         BUG_ON(worker->flags & WORKER_IDLE);
624         BUG_ON(!list_empty(&worker->entry) &&
625                (worker->hentry.next || worker->hentry.pprev));
626
627         worker->flags |= WORKER_IDLE;
628         gcwq->nr_idle++;
629
630         /* idle_list is LIFO */
631         list_add(&worker->entry, &gcwq->idle_list);
632
633         if (unlikely(worker->flags & WORKER_ROGUE))
634                 wake_up_all(&gcwq->trustee_wait);
635 }
636
637 /**
638  * worker_leave_idle - leave idle state
639  * @worker: worker which is leaving idle state
640  *
641  * @worker is leaving idle state.  Update stats.
642  *
643  * LOCKING:
644  * spin_lock_irq(gcwq->lock).
645  */
646 static void worker_leave_idle(struct worker *worker)
647 {
648         struct global_cwq *gcwq = worker->gcwq;
649
650         BUG_ON(!(worker->flags & WORKER_IDLE));
651         worker->flags &= ~WORKER_IDLE;
652         gcwq->nr_idle--;
653         list_del_init(&worker->entry);
654 }
655
656 static struct worker *alloc_worker(void)
657 {
658         struct worker *worker;
659
660         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
661         if (worker) {
662                 INIT_LIST_HEAD(&worker->entry);
663                 INIT_LIST_HEAD(&worker->scheduled);
664         }
665         return worker;
666 }
667
668 /**
669  * create_worker - create a new workqueue worker
670  * @cwq: cwq the new worker will belong to
671  * @bind: whether to set affinity to @cpu or not
672  *
673  * Create a new worker which is bound to @cwq.  The returned worker
674  * can be started by calling start_worker() or destroyed using
675  * destroy_worker().
676  *
677  * CONTEXT:
678  * Might sleep.  Does GFP_KERNEL allocations.
679  *
680  * RETURNS:
681  * Pointer to the newly created worker.
682  */
683 static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
684 {
685         struct global_cwq *gcwq = cwq->gcwq;
686         int id = -1;
687         struct worker *worker = NULL;
688
689         spin_lock_irq(&gcwq->lock);
690         while (ida_get_new(&gcwq->worker_ida, &id)) {
691                 spin_unlock_irq(&gcwq->lock);
692                 if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
693                         goto fail;
694                 spin_lock_irq(&gcwq->lock);
695         }
696         spin_unlock_irq(&gcwq->lock);
697
698         worker = alloc_worker();
699         if (!worker)
700                 goto fail;
701
702         worker->gcwq = gcwq;
703         worker->cwq = cwq;
704         worker->id = id;
705
706         worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
707                                       gcwq->cpu, id);
708         if (IS_ERR(worker->task))
709                 goto fail;
710
711         /*
712          * A rogue worker will become a regular one if CPU comes
713          * online later on.  Make sure every worker has
714          * PF_THREAD_BOUND set.
715          */
716         if (bind)
717                 kthread_bind(worker->task, gcwq->cpu);
718         else
719                 worker->task->flags |= PF_THREAD_BOUND;
720
721         return worker;
722 fail:
723         if (id >= 0) {
724                 spin_lock_irq(&gcwq->lock);
725                 ida_remove(&gcwq->worker_ida, id);
726                 spin_unlock_irq(&gcwq->lock);
727         }
728         kfree(worker);
729         return NULL;
730 }
731
732 /**
733  * start_worker - start a newly created worker
734  * @worker: worker to start
735  *
736  * Make the gcwq aware of @worker and start it.
737  *
738  * CONTEXT:
739  * spin_lock_irq(gcwq->lock).
740  */
741 static void start_worker(struct worker *worker)
742 {
743         worker->flags |= WORKER_STARTED;
744         worker->gcwq->nr_workers++;
745         worker_enter_idle(worker);
746         wake_up_process(worker->task);
747 }
748
749 /**
750  * destroy_worker - destroy a workqueue worker
751  * @worker: worker to be destroyed
752  *
753  * Destroy @worker and adjust @gcwq stats accordingly.
754  *
755  * CONTEXT:
756  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
757  */
758 static void destroy_worker(struct worker *worker)
759 {
760         struct global_cwq *gcwq = worker->gcwq;
761         int id = worker->id;
762
763         /* sanity check frenzy */
764         BUG_ON(worker->current_work);
765         BUG_ON(!list_empty(&worker->scheduled));
766
767         if (worker->flags & WORKER_STARTED)
768                 gcwq->nr_workers--;
769         if (worker->flags & WORKER_IDLE)
770                 gcwq->nr_idle--;
771
772         list_del_init(&worker->entry);
773         worker->flags |= WORKER_DIE;
774
775         spin_unlock_irq(&gcwq->lock);
776
777         kthread_stop(worker->task);
778         kfree(worker);
779
780         spin_lock_irq(&gcwq->lock);
781         ida_remove(&gcwq->worker_ida, id);
782 }
783
784 /**
785  * move_linked_works - move linked works to a list
786  * @work: start of series of works to be scheduled
787  * @head: target list to append @work to
788  * @nextp: out paramter for nested worklist walking
789  *
790  * Schedule linked works starting from @work to @head.  Work series to
791  * be scheduled starts at @work and includes any consecutive work with
792  * WORK_STRUCT_LINKED set in its predecessor.
793  *
794  * If @nextp is not NULL, it's updated to point to the next work of
795  * the last scheduled work.  This allows move_linked_works() to be
796  * nested inside outer list_for_each_entry_safe().
797  *
798  * CONTEXT:
799  * spin_lock_irq(gcwq->lock).
800  */
801 static void move_linked_works(struct work_struct *work, struct list_head *head,
802                               struct work_struct **nextp)
803 {
804         struct work_struct *n;
805
806         /*
807          * Linked worklist will always end before the end of the list,
808          * use NULL for list head.
809          */
810         list_for_each_entry_safe_from(work, n, NULL, entry) {
811                 list_move_tail(&work->entry, head);
812                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
813                         break;
814         }
815
816         /*
817          * If we're already inside safe list traversal and have moved
818          * multiple works to the scheduled queue, the next position
819          * needs to be updated.
820          */
821         if (nextp)
822                 *nextp = n;
823 }
824
825 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
826 {
827         struct work_struct *work = list_first_entry(&cwq->delayed_works,
828                                                     struct work_struct, entry);
829
830         move_linked_works(work, &cwq->worklist, NULL);
831         cwq->nr_active++;
832 }
833
834 /**
835  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
836  * @cwq: cwq of interest
837  * @color: color of work which left the queue
838  *
839  * A work either has completed or is removed from pending queue,
840  * decrement nr_in_flight of its cwq and handle workqueue flushing.
841  *
842  * CONTEXT:
843  * spin_lock_irq(gcwq->lock).
844  */
845 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
846 {
847         /* ignore uncolored works */
848         if (color == WORK_NO_COLOR)
849                 return;
850
851         cwq->nr_in_flight[color]--;
852         cwq->nr_active--;
853
854         if (!list_empty(&cwq->delayed_works)) {
855                 /* one down, submit a delayed one */
856                 if (cwq->nr_active < cwq->max_active)
857                         cwq_activate_first_delayed(cwq);
858         } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
859                 /* this was the last work, unbind from single cpu */
860                 cwq_unbind_single_cpu(cwq);
861         }
862
863         /* is flush in progress and are we at the flushing tip? */
864         if (likely(cwq->flush_color != color))
865                 return;
866
867         /* are there still in-flight works? */
868         if (cwq->nr_in_flight[color])
869                 return;
870
871         /* this cwq is done, clear flush_color */
872         cwq->flush_color = -1;
873
874         /*
875          * If this was the last cwq, wake up the first flusher.  It
876          * will handle the rest.
877          */
878         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
879                 complete(&cwq->wq->first_flusher->done);
880 }
881
882 /**
883  * process_one_work - process single work
884  * @worker: self
885  * @work: work to process
886  *
887  * Process @work.  This function contains all the logics necessary to
888  * process a single work including synchronization against and
889  * interaction with other workers on the same cpu, queueing and
890  * flushing.  As long as context requirement is met, any worker can
891  * call this function to process a work.
892  *
893  * CONTEXT:
894  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
895  */
896 static void process_one_work(struct worker *worker, struct work_struct *work)
897 {
898         struct cpu_workqueue_struct *cwq = worker->cwq;
899         struct global_cwq *gcwq = cwq->gcwq;
900         struct hlist_head *bwh = busy_worker_head(gcwq, work);
901         work_func_t f = work->func;
902         int work_color;
903 #ifdef CONFIG_LOCKDEP
904         /*
905          * It is permissible to free the struct work_struct from
906          * inside the function that is called from it, this we need to
907          * take into account for lockdep too.  To avoid bogus "held
908          * lock freed" warnings as well as problems when looking into
909          * work->lockdep_map, make a copy and use that here.
910          */
911         struct lockdep_map lockdep_map = work->lockdep_map;
912 #endif
913         /* claim and process */
914         debug_work_deactivate(work);
915         hlist_add_head(&worker->hentry, bwh);
916         worker->current_work = work;
917         work_color = get_work_color(work);
918         list_del_init(&work->entry);
919
920         spin_unlock_irq(&gcwq->lock);
921
922         BUG_ON(get_wq_data(work) != cwq);
923         work_clear_pending(work);
924         lock_map_acquire(&cwq->wq->lockdep_map);
925         lock_map_acquire(&lockdep_map);
926         f(work);
927         lock_map_release(&lockdep_map);
928         lock_map_release(&cwq->wq->lockdep_map);
929
930         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
931                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
932                        "%s/0x%08x/%d\n",
933                        current->comm, preempt_count(), task_pid_nr(current));
934                 printk(KERN_ERR "    last function: ");
935                 print_symbol("%s\n", (unsigned long)f);
936                 debug_show_held_locks(current);
937                 dump_stack();
938         }
939
940         spin_lock_irq(&gcwq->lock);
941
942         /* we're done with it, release */
943         hlist_del_init(&worker->hentry);
944         worker->current_work = NULL;
945         cwq_dec_nr_in_flight(cwq, work_color);
946 }
947
948 /**
949  * process_scheduled_works - process scheduled works
950  * @worker: self
951  *
952  * Process all scheduled works.  Please note that the scheduled list
953  * may change while processing a work, so this function repeatedly
954  * fetches a work from the top and executes it.
955  *
956  * CONTEXT:
957  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
958  * multiple times.
959  */
960 static void process_scheduled_works(struct worker *worker)
961 {
962         while (!list_empty(&worker->scheduled)) {
963                 struct work_struct *work = list_first_entry(&worker->scheduled,
964                                                 struct work_struct, entry);
965                 process_one_work(worker, work);
966         }
967 }
968
969 /**
970  * worker_thread - the worker thread function
971  * @__worker: self
972  *
973  * The cwq worker thread function.
974  */
975 static int worker_thread(void *__worker)
976 {
977         struct worker *worker = __worker;
978         struct global_cwq *gcwq = worker->gcwq;
979         struct cpu_workqueue_struct *cwq = worker->cwq;
980
981 woke_up:
982         spin_lock_irq(&gcwq->lock);
983
984         /* DIE can be set only while we're idle, checking here is enough */
985         if (worker->flags & WORKER_DIE) {
986                 spin_unlock_irq(&gcwq->lock);
987                 return 0;
988         }
989
990         worker_leave_idle(worker);
991 recheck:
992         /*
993          * ->scheduled list can only be filled while a worker is
994          * preparing to process a work or actually processing it.
995          * Make sure nobody diddled with it while I was sleeping.
996          */
997         BUG_ON(!list_empty(&worker->scheduled));
998
999         while (!list_empty(&cwq->worklist)) {
1000                 struct work_struct *work =
1001                         list_first_entry(&cwq->worklist,
1002                                          struct work_struct, entry);
1003
1004                 /*
1005                  * The following is a rather inefficient way to close
1006                  * race window against cpu hotplug operations.  Will
1007                  * be replaced soon.
1008                  */
1009                 if (unlikely(!(worker->flags & WORKER_ROGUE) &&
1010                              !cpumask_equal(&worker->task->cpus_allowed,
1011                                             get_cpu_mask(gcwq->cpu)))) {
1012                         spin_unlock_irq(&gcwq->lock);
1013                         set_cpus_allowed_ptr(worker->task,
1014                                              get_cpu_mask(gcwq->cpu));
1015                         cpu_relax();
1016                         spin_lock_irq(&gcwq->lock);
1017                         goto recheck;
1018                 }
1019
1020                 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1021                         /* optimization path, not strictly necessary */
1022                         process_one_work(worker, work);
1023                         if (unlikely(!list_empty(&worker->scheduled)))
1024                                 process_scheduled_works(worker);
1025                 } else {
1026                         move_linked_works(work, &worker->scheduled, NULL);
1027                         process_scheduled_works(worker);
1028                 }
1029         }
1030
1031         /*
1032          * gcwq->lock is held and there's no work to process, sleep.
1033          * Workers are woken up only while holding gcwq->lock, so
1034          * setting the current state before releasing gcwq->lock is
1035          * enough to prevent losing any event.
1036          */
1037         worker_enter_idle(worker);
1038         __set_current_state(TASK_INTERRUPTIBLE);
1039         spin_unlock_irq(&gcwq->lock);
1040         schedule();
1041         goto woke_up;
1042 }
1043
1044 struct wq_barrier {
1045         struct work_struct      work;
1046         struct completion       done;
1047 };
1048
1049 static void wq_barrier_func(struct work_struct *work)
1050 {
1051         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
1052         complete(&barr->done);
1053 }
1054
1055 /**
1056  * insert_wq_barrier - insert a barrier work
1057  * @cwq: cwq to insert barrier into
1058  * @barr: wq_barrier to insert
1059  * @target: target work to attach @barr to
1060  * @worker: worker currently executing @target, NULL if @target is not executing
1061  *
1062  * @barr is linked to @target such that @barr is completed only after
1063  * @target finishes execution.  Please note that the ordering
1064  * guarantee is observed only with respect to @target and on the local
1065  * cpu.
1066  *
1067  * Currently, a queued barrier can't be canceled.  This is because
1068  * try_to_grab_pending() can't determine whether the work to be
1069  * grabbed is at the head of the queue and thus can't clear LINKED
1070  * flag of the previous work while there must be a valid next work
1071  * after a work with LINKED flag set.
1072  *
1073  * Note that when @worker is non-NULL, @target may be modified
1074  * underneath us, so we can't reliably determine cwq from @target.
1075  *
1076  * CONTEXT:
1077  * spin_lock_irq(gcwq->lock).
1078  */
1079 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
1080                               struct wq_barrier *barr,
1081                               struct work_struct *target, struct worker *worker)
1082 {
1083         struct list_head *head;
1084         unsigned int linked = 0;
1085
1086         /*
1087          * debugobject calls are safe here even with gcwq->lock locked
1088          * as we know for sure that this will not trigger any of the
1089          * checks and call back into the fixup functions where we
1090          * might deadlock.
1091          */
1092         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
1093         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
1094         init_completion(&barr->done);
1095
1096         /*
1097          * If @target is currently being executed, schedule the
1098          * barrier to the worker; otherwise, put it after @target.
1099          */
1100         if (worker)
1101                 head = worker->scheduled.next;
1102         else {
1103                 unsigned long *bits = work_data_bits(target);
1104
1105                 head = target->entry.next;
1106                 /* there can already be other linked works, inherit and set */
1107                 linked = *bits & WORK_STRUCT_LINKED;
1108                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
1109         }
1110
1111         debug_work_activate(&barr->work);
1112         insert_work(cwq, &barr->work, head,
1113                     work_color_to_flags(WORK_NO_COLOR) | linked);
1114 }
1115
1116 /**
1117  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
1118  * @wq: workqueue being flushed
1119  * @flush_color: new flush color, < 0 for no-op
1120  * @work_color: new work color, < 0 for no-op
1121  *
1122  * Prepare cwqs for workqueue flushing.
1123  *
1124  * If @flush_color is non-negative, flush_color on all cwqs should be
1125  * -1.  If no cwq has in-flight commands at the specified color, all
1126  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
1127  * has in flight commands, its cwq->flush_color is set to
1128  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
1129  * wakeup logic is armed and %true is returned.
1130  *
1131  * The caller should have initialized @wq->first_flusher prior to
1132  * calling this function with non-negative @flush_color.  If
1133  * @flush_color is negative, no flush color update is done and %false
1134  * is returned.
1135  *
1136  * If @work_color is non-negative, all cwqs should have the same
1137  * work_color which is previous to @work_color and all will be
1138  * advanced to @work_color.
1139  *
1140  * CONTEXT:
1141  * mutex_lock(wq->flush_mutex).
1142  *
1143  * RETURNS:
1144  * %true if @flush_color >= 0 and there's something to flush.  %false
1145  * otherwise.
1146  */
1147 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
1148                                       int flush_color, int work_color)
1149 {
1150         bool wait = false;
1151         unsigned int cpu;
1152
1153         if (flush_color >= 0) {
1154                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
1155                 atomic_set(&wq->nr_cwqs_to_flush, 1);
1156         }
1157
1158         for_each_possible_cpu(cpu) {
1159                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1160                 struct global_cwq *gcwq = cwq->gcwq;
1161
1162                 spin_lock_irq(&gcwq->lock);
1163
1164                 if (flush_color >= 0) {
1165                         BUG_ON(cwq->flush_color != -1);
1166
1167                         if (cwq->nr_in_flight[flush_color]) {
1168                                 cwq->flush_color = flush_color;
1169                                 atomic_inc(&wq->nr_cwqs_to_flush);
1170                                 wait = true;
1171                         }
1172                 }
1173
1174                 if (work_color >= 0) {
1175                         BUG_ON(work_color != work_next_color(cwq->work_color));
1176                         cwq->work_color = work_color;
1177                 }
1178
1179                 spin_unlock_irq(&gcwq->lock);
1180         }
1181
1182         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
1183                 complete(&wq->first_flusher->done);
1184
1185         return wait;
1186 }
1187
1188 /**
1189  * flush_workqueue - ensure that any scheduled work has run to completion.
1190  * @wq: workqueue to flush
1191  *
1192  * Forces execution of the workqueue and blocks until its completion.
1193  * This is typically used in driver shutdown handlers.
1194  *
1195  * We sleep until all works which were queued on entry have been handled,
1196  * but we are not livelocked by new incoming ones.
1197  */
1198 void flush_workqueue(struct workqueue_struct *wq)
1199 {
1200         struct wq_flusher this_flusher = {
1201                 .list = LIST_HEAD_INIT(this_flusher.list),
1202                 .flush_color = -1,
1203                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
1204         };
1205         int next_color;
1206
1207         lock_map_acquire(&wq->lockdep_map);
1208         lock_map_release(&wq->lockdep_map);
1209
1210         mutex_lock(&wq->flush_mutex);
1211
1212         /*
1213          * Start-to-wait phase
1214          */
1215         next_color = work_next_color(wq->work_color);
1216
1217         if (next_color != wq->flush_color) {
1218                 /*
1219                  * Color space is not full.  The current work_color
1220                  * becomes our flush_color and work_color is advanced
1221                  * by one.
1222                  */
1223                 BUG_ON(!list_empty(&wq->flusher_overflow));
1224                 this_flusher.flush_color = wq->work_color;
1225                 wq->work_color = next_color;
1226
1227                 if (!wq->first_flusher) {
1228                         /* no flush in progress, become the first flusher */
1229                         BUG_ON(wq->flush_color != this_flusher.flush_color);
1230
1231                         wq->first_flusher = &this_flusher;
1232
1233                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
1234                                                        wq->work_color)) {
1235                                 /* nothing to flush, done */
1236                                 wq->flush_color = next_color;
1237                                 wq->first_flusher = NULL;
1238                                 goto out_unlock;
1239                         }
1240                 } else {
1241                         /* wait in queue */
1242                         BUG_ON(wq->flush_color == this_flusher.flush_color);
1243                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
1244                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
1245                 }
1246         } else {
1247                 /*
1248                  * Oops, color space is full, wait on overflow queue.
1249                  * The next flush completion will assign us
1250                  * flush_color and transfer to flusher_queue.
1251                  */
1252                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
1253         }
1254
1255         mutex_unlock(&wq->flush_mutex);
1256
1257         wait_for_completion(&this_flusher.done);
1258
1259         /*
1260          * Wake-up-and-cascade phase
1261          *
1262          * First flushers are responsible for cascading flushes and
1263          * handling overflow.  Non-first flushers can simply return.
1264          */
1265         if (wq->first_flusher != &this_flusher)
1266                 return;
1267
1268         mutex_lock(&wq->flush_mutex);
1269
1270         wq->first_flusher = NULL;
1271
1272         BUG_ON(!list_empty(&this_flusher.list));
1273         BUG_ON(wq->flush_color != this_flusher.flush_color);
1274
1275         while (true) {
1276                 struct wq_flusher *next, *tmp;
1277
1278                 /* complete all the flushers sharing the current flush color */
1279                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
1280                         if (next->flush_color != wq->flush_color)
1281                                 break;
1282                         list_del_init(&next->list);
1283                         complete(&next->done);
1284                 }
1285
1286                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
1287                        wq->flush_color != work_next_color(wq->work_color));
1288
1289                 /* this flush_color is finished, advance by one */
1290                 wq->flush_color = work_next_color(wq->flush_color);
1291
1292                 /* one color has been freed, handle overflow queue */
1293                 if (!list_empty(&wq->flusher_overflow)) {
1294                         /*
1295                          * Assign the same color to all overflowed
1296                          * flushers, advance work_color and append to
1297                          * flusher_queue.  This is the start-to-wait
1298                          * phase for these overflowed flushers.
1299                          */
1300                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
1301                                 tmp->flush_color = wq->work_color;
1302
1303                         wq->work_color = work_next_color(wq->work_color);
1304
1305                         list_splice_tail_init(&wq->flusher_overflow,
1306                                               &wq->flusher_queue);
1307                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
1308                 }
1309
1310                 if (list_empty(&wq->flusher_queue)) {
1311                         BUG_ON(wq->flush_color != wq->work_color);
1312                         break;
1313                 }
1314
1315                 /*
1316                  * Need to flush more colors.  Make the next flusher
1317                  * the new first flusher and arm cwqs.
1318                  */
1319                 BUG_ON(wq->flush_color == wq->work_color);
1320                 BUG_ON(wq->flush_color != next->flush_color);
1321
1322                 list_del_init(&next->list);
1323                 wq->first_flusher = next;
1324
1325                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
1326                         break;
1327
1328                 /*
1329                  * Meh... this color is already done, clear first
1330                  * flusher and repeat cascading.
1331                  */
1332                 wq->first_flusher = NULL;
1333         }
1334
1335 out_unlock:
1336         mutex_unlock(&wq->flush_mutex);
1337 }
1338 EXPORT_SYMBOL_GPL(flush_workqueue);
1339
1340 /**
1341  * flush_work - block until a work_struct's callback has terminated
1342  * @work: the work which is to be flushed
1343  *
1344  * Returns false if @work has already terminated.
1345  *
1346  * It is expected that, prior to calling flush_work(), the caller has
1347  * arranged for the work to not be requeued, otherwise it doesn't make
1348  * sense to use this function.
1349  */
1350 int flush_work(struct work_struct *work)
1351 {
1352         struct worker *worker = NULL;
1353         struct cpu_workqueue_struct *cwq;
1354         struct global_cwq *gcwq;
1355         struct wq_barrier barr;
1356
1357         might_sleep();
1358         cwq = get_wq_data(work);
1359         if (!cwq)
1360                 return 0;
1361         gcwq = cwq->gcwq;
1362
1363         lock_map_acquire(&cwq->wq->lockdep_map);
1364         lock_map_release(&cwq->wq->lockdep_map);
1365
1366         spin_lock_irq(&gcwq->lock);
1367         if (!list_empty(&work->entry)) {
1368                 /*
1369                  * See the comment near try_to_grab_pending()->smp_rmb().
1370                  * If it was re-queued under us we are not going to wait.
1371                  */
1372                 smp_rmb();
1373                 if (unlikely(cwq != get_wq_data(work)))
1374                         goto already_gone;
1375         } else {
1376                 if (cwq->worker && cwq->worker->current_work == work)
1377                         worker = cwq->worker;
1378                 if (!worker)
1379                         goto already_gone;
1380         }
1381
1382         insert_wq_barrier(cwq, &barr, work, worker);
1383         spin_unlock_irq(&gcwq->lock);
1384         wait_for_completion(&barr.done);
1385         destroy_work_on_stack(&barr.work);
1386         return 1;
1387 already_gone:
1388         spin_unlock_irq(&gcwq->lock);
1389         return 0;
1390 }
1391 EXPORT_SYMBOL_GPL(flush_work);
1392
1393 /*
1394  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
1395  * so this work can't be re-armed in any way.
1396  */
1397 static int try_to_grab_pending(struct work_struct *work)
1398 {
1399         struct global_cwq *gcwq;
1400         struct cpu_workqueue_struct *cwq;
1401         int ret = -1;
1402
1403         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1404                 return 0;
1405
1406         /*
1407          * The queueing is in progress, or it is already queued. Try to
1408          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1409          */
1410
1411         cwq = get_wq_data(work);
1412         if (!cwq)
1413                 return ret;
1414         gcwq = cwq->gcwq;
1415
1416         spin_lock_irq(&gcwq->lock);
1417         if (!list_empty(&work->entry)) {
1418                 /*
1419                  * This work is queued, but perhaps we locked the wrong cwq.
1420                  * In that case we must see the new value after rmb(), see
1421                  * insert_work()->wmb().
1422                  */
1423                 smp_rmb();
1424                 if (cwq == get_wq_data(work)) {
1425                         debug_work_deactivate(work);
1426                         list_del_init(&work->entry);
1427                         cwq_dec_nr_in_flight(cwq, get_work_color(work));
1428                         ret = 1;
1429                 }
1430         }
1431         spin_unlock_irq(&gcwq->lock);
1432
1433         return ret;
1434 }
1435
1436 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
1437                                 struct work_struct *work)
1438 {
1439         struct global_cwq *gcwq = cwq->gcwq;
1440         struct wq_barrier barr;
1441         struct worker *worker;
1442
1443         spin_lock_irq(&gcwq->lock);
1444
1445         worker = NULL;
1446         if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
1447                 worker = cwq->worker;
1448                 insert_wq_barrier(cwq, &barr, work, worker);
1449         }
1450
1451         spin_unlock_irq(&gcwq->lock);
1452
1453         if (unlikely(worker)) {
1454                 wait_for_completion(&barr.done);
1455                 destroy_work_on_stack(&barr.work);
1456         }
1457 }
1458
1459 static void wait_on_work(struct work_struct *work)
1460 {
1461         struct cpu_workqueue_struct *cwq;
1462         struct workqueue_struct *wq;
1463         int cpu;
1464
1465         might_sleep();
1466
1467         lock_map_acquire(&work->lockdep_map);
1468         lock_map_release(&work->lockdep_map);
1469
1470         cwq = get_wq_data(work);
1471         if (!cwq)
1472                 return;
1473
1474         wq = cwq->wq;
1475
1476         for_each_possible_cpu(cpu)
1477                 wait_on_cpu_work(get_cwq(cpu, wq), work);
1478 }
1479
1480 static int __cancel_work_timer(struct work_struct *work,
1481                                 struct timer_list* timer)
1482 {
1483         int ret;
1484
1485         do {
1486                 ret = (timer && likely(del_timer(timer)));
1487                 if (!ret)
1488                         ret = try_to_grab_pending(work);
1489                 wait_on_work(work);
1490         } while (unlikely(ret < 0));
1491
1492         clear_wq_data(work);
1493         return ret;
1494 }
1495
1496 /**
1497  * cancel_work_sync - block until a work_struct's callback has terminated
1498  * @work: the work which is to be flushed
1499  *
1500  * Returns true if @work was pending.
1501  *
1502  * cancel_work_sync() will cancel the work if it is queued. If the work's
1503  * callback appears to be running, cancel_work_sync() will block until it
1504  * has completed.
1505  *
1506  * It is possible to use this function if the work re-queues itself. It can
1507  * cancel the work even if it migrates to another workqueue, however in that
1508  * case it only guarantees that work->func() has completed on the last queued
1509  * workqueue.
1510  *
1511  * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
1512  * pending, otherwise it goes into a busy-wait loop until the timer expires.
1513  *
1514  * The caller must ensure that workqueue_struct on which this work was last
1515  * queued can't be destroyed before this function returns.
1516  */
1517 int cancel_work_sync(struct work_struct *work)
1518 {
1519         return __cancel_work_timer(work, NULL);
1520 }
1521 EXPORT_SYMBOL_GPL(cancel_work_sync);
1522
1523 /**
1524  * cancel_delayed_work_sync - reliably kill off a delayed work.
1525  * @dwork: the delayed work struct
1526  *
1527  * Returns true if @dwork was pending.
1528  *
1529  * It is possible to use this function if @dwork rearms itself via queue_work()
1530  * or queue_delayed_work(). See also the comment for cancel_work_sync().
1531  */
1532 int cancel_delayed_work_sync(struct delayed_work *dwork)
1533 {
1534         return __cancel_work_timer(&dwork->work, &dwork->timer);
1535 }
1536 EXPORT_SYMBOL(cancel_delayed_work_sync);
1537
1538 static struct workqueue_struct *keventd_wq __read_mostly;
1539
1540 /**
1541  * schedule_work - put work task in global workqueue
1542  * @work: job to be done
1543  *
1544  * Returns zero if @work was already on the kernel-global workqueue and
1545  * non-zero otherwise.
1546  *
1547  * This puts a job in the kernel-global workqueue if it was not already
1548  * queued and leaves it in the same position on the kernel-global
1549  * workqueue otherwise.
1550  */
1551 int schedule_work(struct work_struct *work)
1552 {
1553         return queue_work(keventd_wq, work);
1554 }
1555 EXPORT_SYMBOL(schedule_work);
1556
1557 /*
1558  * schedule_work_on - put work task on a specific cpu
1559  * @cpu: cpu to put the work task on
1560  * @work: job to be done
1561  *
1562  * This puts a job on a specific cpu
1563  */
1564 int schedule_work_on(int cpu, struct work_struct *work)
1565 {
1566         return queue_work_on(cpu, keventd_wq, work);
1567 }
1568 EXPORT_SYMBOL(schedule_work_on);
1569
1570 /**
1571  * schedule_delayed_work - put work task in global workqueue after delay
1572  * @dwork: job to be done
1573  * @delay: number of jiffies to wait or 0 for immediate execution
1574  *
1575  * After waiting for a given time this puts a job in the kernel-global
1576  * workqueue.
1577  */
1578 int schedule_delayed_work(struct delayed_work *dwork,
1579                                         unsigned long delay)
1580 {
1581         return queue_delayed_work(keventd_wq, dwork, delay);
1582 }
1583 EXPORT_SYMBOL(schedule_delayed_work);
1584
1585 /**
1586  * flush_delayed_work - block until a dwork_struct's callback has terminated
1587  * @dwork: the delayed work which is to be flushed
1588  *
1589  * Any timeout is cancelled, and any pending work is run immediately.
1590  */
1591 void flush_delayed_work(struct delayed_work *dwork)
1592 {
1593         if (del_timer_sync(&dwork->timer)) {
1594                 __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
1595                              &dwork->work);
1596                 put_cpu();
1597         }
1598         flush_work(&dwork->work);
1599 }
1600 EXPORT_SYMBOL(flush_delayed_work);
1601
1602 /**
1603  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
1604  * @cpu: cpu to use
1605  * @dwork: job to be done
1606  * @delay: number of jiffies to wait
1607  *
1608  * After waiting for a given time this puts a job in the kernel-global
1609  * workqueue on the specified CPU.
1610  */
1611 int schedule_delayed_work_on(int cpu,
1612                         struct delayed_work *dwork, unsigned long delay)
1613 {
1614         return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
1615 }
1616 EXPORT_SYMBOL(schedule_delayed_work_on);
1617
1618 /**
1619  * schedule_on_each_cpu - call a function on each online CPU from keventd
1620  * @func: the function to call
1621  *
1622  * Returns zero on success.
1623  * Returns -ve errno on failure.
1624  *
1625  * schedule_on_each_cpu() is very slow.
1626  */
1627 int schedule_on_each_cpu(work_func_t func)
1628 {
1629         int cpu;
1630         int orig = -1;
1631         struct work_struct *works;
1632
1633         works = alloc_percpu(struct work_struct);
1634         if (!works)
1635                 return -ENOMEM;
1636
1637         get_online_cpus();
1638
1639         /*
1640          * When running in keventd don't schedule a work item on
1641          * itself.  Can just call directly because the work queue is
1642          * already bound.  This also is faster.
1643          */
1644         if (current_is_keventd())
1645                 orig = raw_smp_processor_id();
1646
1647         for_each_online_cpu(cpu) {
1648                 struct work_struct *work = per_cpu_ptr(works, cpu);
1649
1650                 INIT_WORK(work, func);
1651                 if (cpu != orig)
1652                         schedule_work_on(cpu, work);
1653         }
1654         if (orig >= 0)
1655                 func(per_cpu_ptr(works, orig));
1656
1657         for_each_online_cpu(cpu)
1658                 flush_work(per_cpu_ptr(works, cpu));
1659
1660         put_online_cpus();
1661         free_percpu(works);
1662         return 0;
1663 }
1664
1665 /**
1666  * flush_scheduled_work - ensure that any scheduled work has run to completion.
1667  *
1668  * Forces execution of the kernel-global workqueue and blocks until its
1669  * completion.
1670  *
1671  * Think twice before calling this function!  It's very easy to get into
1672  * trouble if you don't take great care.  Either of the following situations
1673  * will lead to deadlock:
1674  *
1675  *      One of the work items currently on the workqueue needs to acquire
1676  *      a lock held by your code or its caller.
1677  *
1678  *      Your code is running in the context of a work routine.
1679  *
1680  * They will be detected by lockdep when they occur, but the first might not
1681  * occur very often.  It depends on what work items are on the workqueue and
1682  * what locks they need, which you have no control over.
1683  *
1684  * In most situations flushing the entire workqueue is overkill; you merely
1685  * need to know that a particular work item isn't queued and isn't running.
1686  * In such cases you should use cancel_delayed_work_sync() or
1687  * cancel_work_sync() instead.
1688  */
1689 void flush_scheduled_work(void)
1690 {
1691         flush_workqueue(keventd_wq);
1692 }
1693 EXPORT_SYMBOL(flush_scheduled_work);
1694
1695 /**
1696  * execute_in_process_context - reliably execute the routine with user context
1697  * @fn:         the function to execute
1698  * @ew:         guaranteed storage for the execute work structure (must
1699  *              be available when the work executes)
1700  *
1701  * Executes the function immediately if process context is available,
1702  * otherwise schedules the function for delayed execution.
1703  *
1704  * Returns:     0 - function was executed
1705  *              1 - function was scheduled for execution
1706  */
1707 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
1708 {
1709         if (!in_interrupt()) {
1710                 fn(&ew->work);
1711                 return 0;
1712         }
1713
1714         INIT_WORK(&ew->work, fn);
1715         schedule_work(&ew->work);
1716
1717         return 1;
1718 }
1719 EXPORT_SYMBOL_GPL(execute_in_process_context);
1720
1721 int keventd_up(void)
1722 {
1723         return keventd_wq != NULL;
1724 }
1725
1726 int current_is_keventd(void)
1727 {
1728         struct cpu_workqueue_struct *cwq;
1729         int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
1730         int ret = 0;
1731
1732         BUG_ON(!keventd_wq);
1733
1734         cwq = get_cwq(cpu, keventd_wq);
1735         if (current == cwq->worker->task)
1736                 ret = 1;
1737
1738         return ret;
1739
1740 }
1741
1742 static struct cpu_workqueue_struct *alloc_cwqs(void)
1743 {
1744         /*
1745          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
1746          * Make sure that the alignment isn't lower than that of
1747          * unsigned long long.
1748          */
1749         const size_t size = sizeof(struct cpu_workqueue_struct);
1750         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
1751                                    __alignof__(unsigned long long));
1752         struct cpu_workqueue_struct *cwqs;
1753 #ifndef CONFIG_SMP
1754         void *ptr;
1755
1756         /*
1757          * On UP, percpu allocator doesn't honor alignment parameter
1758          * and simply uses arch-dependent default.  Allocate enough
1759          * room to align cwq and put an extra pointer at the end
1760          * pointing back to the originally allocated pointer which
1761          * will be used for free.
1762          *
1763          * FIXME: This really belongs to UP percpu code.  Update UP
1764          * percpu code to honor alignment and remove this ugliness.
1765          */
1766         ptr = __alloc_percpu(size + align + sizeof(void *), 1);
1767         cwqs = PTR_ALIGN(ptr, align);
1768         *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
1769 #else
1770         /* On SMP, percpu allocator can do it itself */
1771         cwqs = __alloc_percpu(size, align);
1772 #endif
1773         /* just in case, make sure it's actually aligned */
1774         BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
1775         return cwqs;
1776 }
1777
1778 static void free_cwqs(struct cpu_workqueue_struct *cwqs)
1779 {
1780 #ifndef CONFIG_SMP
1781         /* on UP, the pointer to free is stored right after the cwq */
1782         if (cwqs)
1783                 free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
1784 #else
1785         free_percpu(cwqs);
1786 #endif
1787 }
1788
1789 struct workqueue_struct *__create_workqueue_key(const char *name,
1790                                                 unsigned int flags,
1791                                                 int max_active,
1792                                                 struct lock_class_key *key,
1793                                                 const char *lock_name)
1794 {
1795         struct workqueue_struct *wq;
1796         bool failed = false;
1797         unsigned int cpu;
1798
1799         max_active = clamp_val(max_active, 1, INT_MAX);
1800
1801         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1802         if (!wq)
1803                 goto err;
1804
1805         wq->cpu_wq = alloc_cwqs();
1806         if (!wq->cpu_wq)
1807                 goto err;
1808
1809         wq->flags = flags;
1810         wq->saved_max_active = max_active;
1811         mutex_init(&wq->flush_mutex);
1812         atomic_set(&wq->nr_cwqs_to_flush, 0);
1813         INIT_LIST_HEAD(&wq->flusher_queue);
1814         INIT_LIST_HEAD(&wq->flusher_overflow);
1815         wq->single_cpu = NR_CPUS;
1816
1817         wq->name = name;
1818         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1819         INIT_LIST_HEAD(&wq->list);
1820
1821         cpu_maps_update_begin();
1822         /*
1823          * We must initialize cwqs for each possible cpu even if we
1824          * are going to call destroy_workqueue() finally. Otherwise
1825          * cpu_up() can hit the uninitialized cwq once we drop the
1826          * lock.
1827          */
1828         for_each_possible_cpu(cpu) {
1829                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1830                 struct global_cwq *gcwq = get_gcwq(cpu);
1831
1832                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1833                 cwq->gcwq = gcwq;
1834                 cwq->wq = wq;
1835                 cwq->flush_color = -1;
1836                 cwq->max_active = max_active;
1837                 INIT_LIST_HEAD(&cwq->worklist);
1838                 INIT_LIST_HEAD(&cwq->delayed_works);
1839
1840                 if (failed)
1841                         continue;
1842                 cwq->worker = create_worker(cwq, cpu_online(cpu));
1843                 if (cwq->worker)
1844                         start_worker(cwq->worker);
1845                 else
1846                         failed = true;
1847         }
1848
1849         /*
1850          * workqueue_lock protects global freeze state and workqueues
1851          * list.  Grab it, set max_active accordingly and add the new
1852          * workqueue to workqueues list.
1853          */
1854         spin_lock(&workqueue_lock);
1855
1856         if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
1857                 for_each_possible_cpu(cpu)
1858                         get_cwq(cpu, wq)->max_active = 0;
1859
1860         list_add(&wq->list, &workqueues);
1861
1862         spin_unlock(&workqueue_lock);
1863
1864         cpu_maps_update_done();
1865
1866         if (failed) {
1867                 destroy_workqueue(wq);
1868                 wq = NULL;
1869         }
1870         return wq;
1871 err:
1872         if (wq) {
1873                 free_cwqs(wq->cpu_wq);
1874                 kfree(wq);
1875         }
1876         return NULL;
1877 }
1878 EXPORT_SYMBOL_GPL(__create_workqueue_key);
1879
1880 /**
1881  * destroy_workqueue - safely terminate a workqueue
1882  * @wq: target workqueue
1883  *
1884  * Safely destroy a workqueue. All work currently pending will be done first.
1885  */
1886 void destroy_workqueue(struct workqueue_struct *wq)
1887 {
1888         unsigned int cpu;
1889
1890         flush_workqueue(wq);
1891
1892         /*
1893          * wq list is used to freeze wq, remove from list after
1894          * flushing is complete in case freeze races us.
1895          */
1896         cpu_maps_update_begin();
1897         spin_lock(&workqueue_lock);
1898         list_del(&wq->list);
1899         spin_unlock(&workqueue_lock);
1900         cpu_maps_update_done();
1901
1902         for_each_possible_cpu(cpu) {
1903                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1904                 int i;
1905
1906                 if (cwq->worker) {
1907                         spin_lock_irq(&cwq->gcwq->lock);
1908                         destroy_worker(cwq->worker);
1909                         cwq->worker = NULL;
1910                         spin_unlock_irq(&cwq->gcwq->lock);
1911                 }
1912
1913                 for (i = 0; i < WORK_NR_COLORS; i++)
1914                         BUG_ON(cwq->nr_in_flight[i]);
1915                 BUG_ON(cwq->nr_active);
1916                 BUG_ON(!list_empty(&cwq->delayed_works));
1917         }
1918
1919         free_cwqs(wq->cpu_wq);
1920         kfree(wq);
1921 }
1922 EXPORT_SYMBOL_GPL(destroy_workqueue);
1923
1924 /*
1925  * CPU hotplug.
1926  *
1927  * CPU hotplug is implemented by allowing cwqs to be detached from
1928  * CPU, running with unbound workers and allowing them to be
1929  * reattached later if the cpu comes back online.  A separate thread
1930  * is created to govern cwqs in such state and is called the trustee.
1931  *
1932  * Trustee states and their descriptions.
1933  *
1934  * START        Command state used on startup.  On CPU_DOWN_PREPARE, a
1935  *              new trustee is started with this state.
1936  *
1937  * IN_CHARGE    Once started, trustee will enter this state after
1938  *              making all existing workers rogue.  DOWN_PREPARE waits
1939  *              for trustee to enter this state.  After reaching
1940  *              IN_CHARGE, trustee tries to execute the pending
1941  *              worklist until it's empty and the state is set to
1942  *              BUTCHER, or the state is set to RELEASE.
1943  *
1944  * BUTCHER      Command state which is set by the cpu callback after
1945  *              the cpu has went down.  Once this state is set trustee
1946  *              knows that there will be no new works on the worklist
1947  *              and once the worklist is empty it can proceed to
1948  *              killing idle workers.
1949  *
1950  * RELEASE      Command state which is set by the cpu callback if the
1951  *              cpu down has been canceled or it has come online
1952  *              again.  After recognizing this state, trustee stops
1953  *              trying to drain or butcher and transits to DONE.
1954  *
1955  * DONE         Trustee will enter this state after BUTCHER or RELEASE
1956  *              is complete.
1957  *
1958  *          trustee                 CPU                draining
1959  *         took over                down               complete
1960  * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
1961  *                        |                     |                  ^
1962  *                        | CPU is back online  v   return workers |
1963  *                         ----------------> RELEASE --------------
1964  */
1965
1966 /**
1967  * trustee_wait_event_timeout - timed event wait for trustee
1968  * @cond: condition to wait for
1969  * @timeout: timeout in jiffies
1970  *
1971  * wait_event_timeout() for trustee to use.  Handles locking and
1972  * checks for RELEASE request.
1973  *
1974  * CONTEXT:
1975  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1976  * multiple times.  To be used by trustee.
1977  *
1978  * RETURNS:
1979  * Positive indicating left time if @cond is satisfied, 0 if timed
1980  * out, -1 if canceled.
1981  */
1982 #define trustee_wait_event_timeout(cond, timeout) ({                    \
1983         long __ret = (timeout);                                         \
1984         while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
1985                __ret) {                                                 \
1986                 spin_unlock_irq(&gcwq->lock);                           \
1987                 __wait_event_timeout(gcwq->trustee_wait, (cond) ||      \
1988                         (gcwq->trustee_state == TRUSTEE_RELEASE),       \
1989                         __ret);                                         \
1990                 spin_lock_irq(&gcwq->lock);                             \
1991         }                                                               \
1992         gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);          \
1993 })
1994
1995 /**
1996  * trustee_wait_event - event wait for trustee
1997  * @cond: condition to wait for
1998  *
1999  * wait_event() for trustee to use.  Automatically handles locking and
2000  * checks for CANCEL request.
2001  *
2002  * CONTEXT:
2003  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2004  * multiple times.  To be used by trustee.
2005  *
2006  * RETURNS:
2007  * 0 if @cond is satisfied, -1 if canceled.
2008  */
2009 #define trustee_wait_event(cond) ({                                     \
2010         long __ret1;                                                    \
2011         __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
2012         __ret1 < 0 ? -1 : 0;                                            \
2013 })
2014
2015 static int __cpuinit trustee_thread(void *__gcwq)
2016 {
2017         struct global_cwq *gcwq = __gcwq;
2018         struct worker *worker;
2019         struct hlist_node *pos;
2020         int i;
2021
2022         BUG_ON(gcwq->cpu != smp_processor_id());
2023
2024         spin_lock_irq(&gcwq->lock);
2025         /*
2026          * Make all workers rogue.  Trustee must be bound to the
2027          * target cpu and can't be cancelled.
2028          */
2029         BUG_ON(gcwq->cpu != smp_processor_id());
2030
2031         list_for_each_entry(worker, &gcwq->idle_list, entry)
2032                 worker->flags |= WORKER_ROGUE;
2033
2034         for_each_busy_worker(worker, i, pos, gcwq)
2035                 worker->flags |= WORKER_ROGUE;
2036
2037         /*
2038          * We're now in charge.  Notify and proceed to drain.  We need
2039          * to keep the gcwq running during the whole CPU down
2040          * procedure as other cpu hotunplug callbacks may need to
2041          * flush currently running tasks.
2042          */
2043         gcwq->trustee_state = TRUSTEE_IN_CHARGE;
2044         wake_up_all(&gcwq->trustee_wait);
2045
2046         /*
2047          * The original cpu is in the process of dying and may go away
2048          * anytime now.  When that happens, we and all workers would
2049          * be migrated to other cpus.  Try draining any left work.
2050          * Note that if the gcwq is frozen, there may be frozen works
2051          * in freezeable cwqs.  Don't declare completion while frozen.
2052          */
2053         while (gcwq->nr_workers != gcwq->nr_idle ||
2054                gcwq->flags & GCWQ_FREEZING ||
2055                gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
2056                 /* give a breather */
2057                 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
2058                         break;
2059         }
2060
2061         /* notify completion */
2062         gcwq->trustee = NULL;
2063         gcwq->trustee_state = TRUSTEE_DONE;
2064         wake_up_all(&gcwq->trustee_wait);
2065         spin_unlock_irq(&gcwq->lock);
2066         return 0;
2067 }
2068
2069 /**
2070  * wait_trustee_state - wait for trustee to enter the specified state
2071  * @gcwq: gcwq the trustee of interest belongs to
2072  * @state: target state to wait for
2073  *
2074  * Wait for the trustee to reach @state.  DONE is already matched.
2075  *
2076  * CONTEXT:
2077  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2078  * multiple times.  To be used by cpu_callback.
2079  */
2080 static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
2081 {
2082         if (!(gcwq->trustee_state == state ||
2083               gcwq->trustee_state == TRUSTEE_DONE)) {
2084                 spin_unlock_irq(&gcwq->lock);
2085                 __wait_event(gcwq->trustee_wait,
2086                              gcwq->trustee_state == state ||
2087                              gcwq->trustee_state == TRUSTEE_DONE);
2088                 spin_lock_irq(&gcwq->lock);
2089         }
2090 }
2091
2092 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2093                                                 unsigned long action,
2094                                                 void *hcpu)
2095 {
2096         unsigned int cpu = (unsigned long)hcpu;
2097         struct global_cwq *gcwq = get_gcwq(cpu);
2098         struct task_struct *new_trustee = NULL;
2099         struct worker *worker;
2100         struct hlist_node *pos;
2101         unsigned long flags;
2102         int i;
2103
2104         action &= ~CPU_TASKS_FROZEN;
2105
2106         switch (action) {
2107         case CPU_DOWN_PREPARE:
2108                 new_trustee = kthread_create(trustee_thread, gcwq,
2109                                              "workqueue_trustee/%d\n", cpu);
2110                 if (IS_ERR(new_trustee))
2111                         return notifier_from_errno(PTR_ERR(new_trustee));
2112                 kthread_bind(new_trustee, cpu);
2113         }
2114
2115         /* some are called w/ irq disabled, don't disturb irq status */
2116         spin_lock_irqsave(&gcwq->lock, flags);
2117
2118         switch (action) {
2119         case CPU_DOWN_PREPARE:
2120                 /* initialize trustee and tell it to acquire the gcwq */
2121                 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
2122                 gcwq->trustee = new_trustee;
2123                 gcwq->trustee_state = TRUSTEE_START;
2124                 wake_up_process(gcwq->trustee);
2125                 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
2126                 break;
2127
2128         case CPU_POST_DEAD:
2129                 gcwq->trustee_state = TRUSTEE_BUTCHER;
2130                 break;
2131
2132         case CPU_DOWN_FAILED:
2133         case CPU_ONLINE:
2134                 if (gcwq->trustee_state != TRUSTEE_DONE) {
2135                         gcwq->trustee_state = TRUSTEE_RELEASE;
2136                         wake_up_process(gcwq->trustee);
2137                         wait_trustee_state(gcwq, TRUSTEE_DONE);
2138                 }
2139
2140                 /* clear ROGUE from all workers */
2141                 list_for_each_entry(worker, &gcwq->idle_list, entry)
2142                         worker->flags &= ~WORKER_ROGUE;
2143
2144                 for_each_busy_worker(worker, i, pos, gcwq)
2145                         worker->flags &= ~WORKER_ROGUE;
2146                 break;
2147         }
2148
2149         spin_unlock_irqrestore(&gcwq->lock, flags);
2150
2151         return notifier_from_errno(0);
2152 }
2153
2154 #ifdef CONFIG_SMP
2155
2156 struct work_for_cpu {
2157         struct completion completion;
2158         long (*fn)(void *);
2159         void *arg;
2160         long ret;
2161 };
2162
2163 static int do_work_for_cpu(void *_wfc)
2164 {
2165         struct work_for_cpu *wfc = _wfc;
2166         wfc->ret = wfc->fn(wfc->arg);
2167         complete(&wfc->completion);
2168         return 0;
2169 }
2170
2171 /**
2172  * work_on_cpu - run a function in user context on a particular cpu
2173  * @cpu: the cpu to run on
2174  * @fn: the function to run
2175  * @arg: the function arg
2176  *
2177  * This will return the value @fn returns.
2178  * It is up to the caller to ensure that the cpu doesn't go offline.
2179  * The caller must not hold any locks which would prevent @fn from completing.
2180  */
2181 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
2182 {
2183         struct task_struct *sub_thread;
2184         struct work_for_cpu wfc = {
2185                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
2186                 .fn = fn,
2187                 .arg = arg,
2188         };
2189
2190         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
2191         if (IS_ERR(sub_thread))
2192                 return PTR_ERR(sub_thread);
2193         kthread_bind(sub_thread, cpu);
2194         wake_up_process(sub_thread);
2195         wait_for_completion(&wfc.completion);
2196         return wfc.ret;
2197 }
2198 EXPORT_SYMBOL_GPL(work_on_cpu);
2199 #endif /* CONFIG_SMP */
2200
2201 #ifdef CONFIG_FREEZER
2202
2203 /**
2204  * freeze_workqueues_begin - begin freezing workqueues
2205  *
2206  * Start freezing workqueues.  After this function returns, all
2207  * freezeable workqueues will queue new works to their frozen_works
2208  * list instead of the cwq ones.
2209  *
2210  * CONTEXT:
2211  * Grabs and releases workqueue_lock and gcwq->lock's.
2212  */
2213 void freeze_workqueues_begin(void)
2214 {
2215         struct workqueue_struct *wq;
2216         unsigned int cpu;
2217
2218         spin_lock(&workqueue_lock);
2219
2220         BUG_ON(workqueue_freezing);
2221         workqueue_freezing = true;
2222
2223         for_each_possible_cpu(cpu) {
2224                 struct global_cwq *gcwq = get_gcwq(cpu);
2225
2226                 spin_lock_irq(&gcwq->lock);
2227
2228                 BUG_ON(gcwq->flags & GCWQ_FREEZING);
2229                 gcwq->flags |= GCWQ_FREEZING;
2230
2231                 list_for_each_entry(wq, &workqueues, list) {
2232                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2233
2234                         if (wq->flags & WQ_FREEZEABLE)
2235                                 cwq->max_active = 0;
2236                 }
2237
2238                 spin_unlock_irq(&gcwq->lock);
2239         }
2240
2241         spin_unlock(&workqueue_lock);
2242 }
2243
2244 /**
2245  * freeze_workqueues_busy - are freezeable workqueues still busy?
2246  *
2247  * Check whether freezing is complete.  This function must be called
2248  * between freeze_workqueues_begin() and thaw_workqueues().
2249  *
2250  * CONTEXT:
2251  * Grabs and releases workqueue_lock.
2252  *
2253  * RETURNS:
2254  * %true if some freezeable workqueues are still busy.  %false if
2255  * freezing is complete.
2256  */
2257 bool freeze_workqueues_busy(void)
2258 {
2259         struct workqueue_struct *wq;
2260         unsigned int cpu;
2261         bool busy = false;
2262
2263         spin_lock(&workqueue_lock);
2264
2265         BUG_ON(!workqueue_freezing);
2266
2267         for_each_possible_cpu(cpu) {
2268                 /*
2269                  * nr_active is monotonically decreasing.  It's safe
2270                  * to peek without lock.
2271                  */
2272                 list_for_each_entry(wq, &workqueues, list) {
2273                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2274
2275                         if (!(wq->flags & WQ_FREEZEABLE))
2276                                 continue;
2277
2278                         BUG_ON(cwq->nr_active < 0);
2279                         if (cwq->nr_active) {
2280                                 busy = true;
2281                                 goto out_unlock;
2282                         }
2283                 }
2284         }
2285 out_unlock:
2286         spin_unlock(&workqueue_lock);
2287         return busy;
2288 }
2289
2290 /**
2291  * thaw_workqueues - thaw workqueues
2292  *
2293  * Thaw workqueues.  Normal queueing is restored and all collected
2294  * frozen works are transferred to their respective cwq worklists.
2295  *
2296  * CONTEXT:
2297  * Grabs and releases workqueue_lock and gcwq->lock's.
2298  */
2299 void thaw_workqueues(void)
2300 {
2301         struct workqueue_struct *wq;
2302         unsigned int cpu;
2303
2304         spin_lock(&workqueue_lock);
2305
2306         if (!workqueue_freezing)
2307                 goto out_unlock;
2308
2309         for_each_possible_cpu(cpu) {
2310                 struct global_cwq *gcwq = get_gcwq(cpu);
2311
2312                 spin_lock_irq(&gcwq->lock);
2313
2314                 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
2315                 gcwq->flags &= ~GCWQ_FREEZING;
2316
2317                 list_for_each_entry(wq, &workqueues, list) {
2318                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2319
2320                         if (!(wq->flags & WQ_FREEZEABLE))
2321                                 continue;
2322
2323                         /* restore max_active and repopulate worklist */
2324                         cwq->max_active = wq->saved_max_active;
2325
2326                         while (!list_empty(&cwq->delayed_works) &&
2327                                cwq->nr_active < cwq->max_active)
2328                                 cwq_activate_first_delayed(cwq);
2329
2330                         /* perform delayed unbind from single cpu if empty */
2331                         if (wq->single_cpu == gcwq->cpu &&
2332                             !cwq->nr_active && list_empty(&cwq->delayed_works))
2333                                 cwq_unbind_single_cpu(cwq);
2334
2335                         wake_up_process(cwq->worker->task);
2336                 }
2337
2338                 spin_unlock_irq(&gcwq->lock);
2339         }
2340
2341         workqueue_freezing = false;
2342 out_unlock:
2343         spin_unlock(&workqueue_lock);
2344 }
2345 #endif /* CONFIG_FREEZER */
2346
2347 void __init init_workqueues(void)
2348 {
2349         unsigned int cpu;
2350         int i;
2351
2352         hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
2353
2354         /* initialize gcwqs */
2355         for_each_possible_cpu(cpu) {
2356                 struct global_cwq *gcwq = get_gcwq(cpu);
2357
2358                 spin_lock_init(&gcwq->lock);
2359                 gcwq->cpu = cpu;
2360
2361                 INIT_LIST_HEAD(&gcwq->idle_list);
2362                 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
2363                         INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
2364
2365                 ida_init(&gcwq->worker_ida);
2366
2367                 gcwq->trustee_state = TRUSTEE_DONE;
2368                 init_waitqueue_head(&gcwq->trustee_wait);
2369         }
2370
2371         keventd_wq = create_workqueue("events");
2372         BUG_ON(!keventd_wq);
2373 }