workqueue: reimplement work flushing using linked works
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h>
36 #include <linux/idr.h>
37
38 /*
39  * Structure fields follow one of the following exclusion rules.
40  *
41  * I: Set during initialization and read-only afterwards.
42  *
43  * L: cwq->lock protected.  Access with cwq->lock held.
44  *
45  * F: wq->flush_mutex protected.
46  *
47  * W: workqueue_lock protected.
48  */
49
50 struct cpu_workqueue_struct;
51
52 struct worker {
53         struct work_struct      *current_work;  /* L: work being processed */
54         struct list_head        scheduled;      /* L: scheduled works */
55         struct task_struct      *task;          /* I: worker task */
56         struct cpu_workqueue_struct *cwq;       /* I: the associated cwq */
57         int                     id;             /* I: worker id */
58 };
59
60 /*
61  * The per-CPU workqueue (if single thread, we always use the first
62  * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
63  * work_struct->data are used for flags and thus cwqs need to be
64  * aligned at two's power of the number of flag bits.
65  */
66 struct cpu_workqueue_struct {
67
68         spinlock_t lock;
69
70         struct list_head worklist;
71         wait_queue_head_t more_work;
72         unsigned int            cpu;
73         struct worker           *worker;
74
75         struct workqueue_struct *wq;            /* I: the owning workqueue */
76         int                     work_color;     /* L: current color */
77         int                     flush_color;    /* L: flushing color */
78         int                     nr_in_flight[WORK_NR_COLORS];
79                                                 /* L: nr of in_flight works */
80 };
81
82 /*
83  * Structure used to wait for workqueue flush.
84  */
85 struct wq_flusher {
86         struct list_head        list;           /* F: list of flushers */
87         int                     flush_color;    /* F: flush color waiting for */
88         struct completion       done;           /* flush completion */
89 };
90
91 /*
92  * The externally visible workqueue abstraction is an array of
93  * per-CPU workqueues:
94  */
95 struct workqueue_struct {
96         unsigned int            flags;          /* I: WQ_* flags */
97         struct cpu_workqueue_struct *cpu_wq;    /* I: cwq's */
98         struct list_head        list;           /* W: list of all workqueues */
99
100         struct mutex            flush_mutex;    /* protects wq flushing */
101         int                     work_color;     /* F: current work color */
102         int                     flush_color;    /* F: current flush color */
103         atomic_t                nr_cwqs_to_flush; /* flush in progress */
104         struct wq_flusher       *first_flusher; /* F: first flusher */
105         struct list_head        flusher_queue;  /* F: flush waiters */
106         struct list_head        flusher_overflow; /* F: flush overflow list */
107
108         const char              *name;          /* I: workqueue name */
109 #ifdef CONFIG_LOCKDEP
110         struct lockdep_map      lockdep_map;
111 #endif
112 };
113
114 #ifdef CONFIG_DEBUG_OBJECTS_WORK
115
116 static struct debug_obj_descr work_debug_descr;
117
118 /*
119  * fixup_init is called when:
120  * - an active object is initialized
121  */
122 static int work_fixup_init(void *addr, enum debug_obj_state state)
123 {
124         struct work_struct *work = addr;
125
126         switch (state) {
127         case ODEBUG_STATE_ACTIVE:
128                 cancel_work_sync(work);
129                 debug_object_init(work, &work_debug_descr);
130                 return 1;
131         default:
132                 return 0;
133         }
134 }
135
136 /*
137  * fixup_activate is called when:
138  * - an active object is activated
139  * - an unknown object is activated (might be a statically initialized object)
140  */
141 static int work_fixup_activate(void *addr, enum debug_obj_state state)
142 {
143         struct work_struct *work = addr;
144
145         switch (state) {
146
147         case ODEBUG_STATE_NOTAVAILABLE:
148                 /*
149                  * This is not really a fixup. The work struct was
150                  * statically initialized. We just make sure that it
151                  * is tracked in the object tracker.
152                  */
153                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
154                         debug_object_init(work, &work_debug_descr);
155                         debug_object_activate(work, &work_debug_descr);
156                         return 0;
157                 }
158                 WARN_ON_ONCE(1);
159                 return 0;
160
161         case ODEBUG_STATE_ACTIVE:
162                 WARN_ON(1);
163
164         default:
165                 return 0;
166         }
167 }
168
169 /*
170  * fixup_free is called when:
171  * - an active object is freed
172  */
173 static int work_fixup_free(void *addr, enum debug_obj_state state)
174 {
175         struct work_struct *work = addr;
176
177         switch (state) {
178         case ODEBUG_STATE_ACTIVE:
179                 cancel_work_sync(work);
180                 debug_object_free(work, &work_debug_descr);
181                 return 1;
182         default:
183                 return 0;
184         }
185 }
186
187 static struct debug_obj_descr work_debug_descr = {
188         .name           = "work_struct",
189         .fixup_init     = work_fixup_init,
190         .fixup_activate = work_fixup_activate,
191         .fixup_free     = work_fixup_free,
192 };
193
194 static inline void debug_work_activate(struct work_struct *work)
195 {
196         debug_object_activate(work, &work_debug_descr);
197 }
198
199 static inline void debug_work_deactivate(struct work_struct *work)
200 {
201         debug_object_deactivate(work, &work_debug_descr);
202 }
203
204 void __init_work(struct work_struct *work, int onstack)
205 {
206         if (onstack)
207                 debug_object_init_on_stack(work, &work_debug_descr);
208         else
209                 debug_object_init(work, &work_debug_descr);
210 }
211 EXPORT_SYMBOL_GPL(__init_work);
212
213 void destroy_work_on_stack(struct work_struct *work)
214 {
215         debug_object_free(work, &work_debug_descr);
216 }
217 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
218
219 #else
220 static inline void debug_work_activate(struct work_struct *work) { }
221 static inline void debug_work_deactivate(struct work_struct *work) { }
222 #endif
223
224 /* Serializes the accesses to the list of workqueues. */
225 static DEFINE_SPINLOCK(workqueue_lock);
226 static LIST_HEAD(workqueues);
227 static DEFINE_PER_CPU(struct ida, worker_ida);
228
229 static int worker_thread(void *__worker);
230
231 static int singlethread_cpu __read_mostly;
232
233 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
234                                             struct workqueue_struct *wq)
235 {
236         return per_cpu_ptr(wq->cpu_wq, cpu);
237 }
238
239 static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
240                                                struct workqueue_struct *wq)
241 {
242         if (unlikely(wq->flags & WQ_SINGLE_THREAD))
243                 cpu = singlethread_cpu;
244         return get_cwq(cpu, wq);
245 }
246
247 static unsigned int work_color_to_flags(int color)
248 {
249         return color << WORK_STRUCT_COLOR_SHIFT;
250 }
251
252 static int get_work_color(struct work_struct *work)
253 {
254         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
255                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
256 }
257
258 static int work_next_color(int color)
259 {
260         return (color + 1) % WORK_NR_COLORS;
261 }
262
263 /*
264  * Set the workqueue on which a work item is to be run
265  * - Must *only* be called if the pending flag is set
266  */
267 static inline void set_wq_data(struct work_struct *work,
268                                struct cpu_workqueue_struct *cwq,
269                                unsigned long extra_flags)
270 {
271         BUG_ON(!work_pending(work));
272
273         atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
274                         WORK_STRUCT_PENDING | extra_flags);
275 }
276
277 /*
278  * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
279  */
280 static inline void clear_wq_data(struct work_struct *work)
281 {
282         atomic_long_set(&work->data, work_static(work));
283 }
284
285 static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
286 {
287         return (void *)(atomic_long_read(&work->data) &
288                         WORK_STRUCT_WQ_DATA_MASK);
289 }
290
291 /**
292  * insert_work - insert a work into cwq
293  * @cwq: cwq @work belongs to
294  * @work: work to insert
295  * @head: insertion point
296  * @extra_flags: extra WORK_STRUCT_* flags to set
297  *
298  * Insert @work into @cwq after @head.
299  *
300  * CONTEXT:
301  * spin_lock_irq(cwq->lock).
302  */
303 static void insert_work(struct cpu_workqueue_struct *cwq,
304                         struct work_struct *work, struct list_head *head,
305                         unsigned int extra_flags)
306 {
307         /* we own @work, set data and link */
308         set_wq_data(work, cwq, extra_flags);
309
310         /*
311          * Ensure that we get the right work->data if we see the
312          * result of list_add() below, see try_to_grab_pending().
313          */
314         smp_wmb();
315
316         list_add_tail(&work->entry, head);
317         wake_up(&cwq->more_work);
318 }
319
320 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
321                          struct work_struct *work)
322 {
323         struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
324         unsigned long flags;
325
326         debug_work_activate(work);
327         spin_lock_irqsave(&cwq->lock, flags);
328         BUG_ON(!list_empty(&work->entry));
329         cwq->nr_in_flight[cwq->work_color]++;
330         insert_work(cwq, work, &cwq->worklist,
331                     work_color_to_flags(cwq->work_color));
332         spin_unlock_irqrestore(&cwq->lock, flags);
333 }
334
335 /**
336  * queue_work - queue work on a workqueue
337  * @wq: workqueue to use
338  * @work: work to queue
339  *
340  * Returns 0 if @work was already on a queue, non-zero otherwise.
341  *
342  * We queue the work to the CPU on which it was submitted, but if the CPU dies
343  * it can be processed by another CPU.
344  */
345 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
346 {
347         int ret;
348
349         ret = queue_work_on(get_cpu(), wq, work);
350         put_cpu();
351
352         return ret;
353 }
354 EXPORT_SYMBOL_GPL(queue_work);
355
356 /**
357  * queue_work_on - queue work on specific cpu
358  * @cpu: CPU number to execute work on
359  * @wq: workqueue to use
360  * @work: work to queue
361  *
362  * Returns 0 if @work was already on a queue, non-zero otherwise.
363  *
364  * We queue the work to a specific CPU, the caller must ensure it
365  * can't go away.
366  */
367 int
368 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
369 {
370         int ret = 0;
371
372         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
373                 __queue_work(cpu, wq, work);
374                 ret = 1;
375         }
376         return ret;
377 }
378 EXPORT_SYMBOL_GPL(queue_work_on);
379
380 static void delayed_work_timer_fn(unsigned long __data)
381 {
382         struct delayed_work *dwork = (struct delayed_work *)__data;
383         struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
384
385         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
386 }
387
388 /**
389  * queue_delayed_work - queue work on a workqueue after delay
390  * @wq: workqueue to use
391  * @dwork: delayable work to queue
392  * @delay: number of jiffies to wait before queueing
393  *
394  * Returns 0 if @work was already on a queue, non-zero otherwise.
395  */
396 int queue_delayed_work(struct workqueue_struct *wq,
397                         struct delayed_work *dwork, unsigned long delay)
398 {
399         if (delay == 0)
400                 return queue_work(wq, &dwork->work);
401
402         return queue_delayed_work_on(-1, wq, dwork, delay);
403 }
404 EXPORT_SYMBOL_GPL(queue_delayed_work);
405
406 /**
407  * queue_delayed_work_on - queue work on specific CPU after delay
408  * @cpu: CPU number to execute work on
409  * @wq: workqueue to use
410  * @dwork: work to queue
411  * @delay: number of jiffies to wait before queueing
412  *
413  * Returns 0 if @work was already on a queue, non-zero otherwise.
414  */
415 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
416                         struct delayed_work *dwork, unsigned long delay)
417 {
418         int ret = 0;
419         struct timer_list *timer = &dwork->timer;
420         struct work_struct *work = &dwork->work;
421
422         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
423                 BUG_ON(timer_pending(timer));
424                 BUG_ON(!list_empty(&work->entry));
425
426                 timer_stats_timer_set_start_info(&dwork->timer);
427
428                 /* This stores cwq for the moment, for the timer_fn */
429                 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
430                 timer->expires = jiffies + delay;
431                 timer->data = (unsigned long)dwork;
432                 timer->function = delayed_work_timer_fn;
433
434                 if (unlikely(cpu >= 0))
435                         add_timer_on(timer, cpu);
436                 else
437                         add_timer(timer);
438                 ret = 1;
439         }
440         return ret;
441 }
442 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
443
444 static struct worker *alloc_worker(void)
445 {
446         struct worker *worker;
447
448         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
449         if (worker)
450                 INIT_LIST_HEAD(&worker->scheduled);
451         return worker;
452 }
453
454 /**
455  * create_worker - create a new workqueue worker
456  * @cwq: cwq the new worker will belong to
457  * @bind: whether to set affinity to @cpu or not
458  *
459  * Create a new worker which is bound to @cwq.  The returned worker
460  * can be started by calling start_worker() or destroyed using
461  * destroy_worker().
462  *
463  * CONTEXT:
464  * Might sleep.  Does GFP_KERNEL allocations.
465  *
466  * RETURNS:
467  * Pointer to the newly created worker.
468  */
469 static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
470 {
471         int id = -1;
472         struct worker *worker = NULL;
473
474         spin_lock(&workqueue_lock);
475         while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
476                 spin_unlock(&workqueue_lock);
477                 if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
478                         goto fail;
479                 spin_lock(&workqueue_lock);
480         }
481         spin_unlock(&workqueue_lock);
482
483         worker = alloc_worker();
484         if (!worker)
485                 goto fail;
486
487         worker->cwq = cwq;
488         worker->id = id;
489
490         worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
491                                       cwq->cpu, id);
492         if (IS_ERR(worker->task))
493                 goto fail;
494
495         if (bind)
496                 kthread_bind(worker->task, cwq->cpu);
497
498         return worker;
499 fail:
500         if (id >= 0) {
501                 spin_lock(&workqueue_lock);
502                 ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
503                 spin_unlock(&workqueue_lock);
504         }
505         kfree(worker);
506         return NULL;
507 }
508
509 /**
510  * start_worker - start a newly created worker
511  * @worker: worker to start
512  *
513  * Start @worker.
514  *
515  * CONTEXT:
516  * spin_lock_irq(cwq->lock).
517  */
518 static void start_worker(struct worker *worker)
519 {
520         wake_up_process(worker->task);
521 }
522
523 /**
524  * destroy_worker - destroy a workqueue worker
525  * @worker: worker to be destroyed
526  *
527  * Destroy @worker.
528  */
529 static void destroy_worker(struct worker *worker)
530 {
531         int cpu = worker->cwq->cpu;
532         int id = worker->id;
533
534         /* sanity check frenzy */
535         BUG_ON(worker->current_work);
536         BUG_ON(!list_empty(&worker->scheduled));
537
538         kthread_stop(worker->task);
539         kfree(worker);
540
541         spin_lock(&workqueue_lock);
542         ida_remove(&per_cpu(worker_ida, cpu), id);
543         spin_unlock(&workqueue_lock);
544 }
545
546 /**
547  * move_linked_works - move linked works to a list
548  * @work: start of series of works to be scheduled
549  * @head: target list to append @work to
550  * @nextp: out paramter for nested worklist walking
551  *
552  * Schedule linked works starting from @work to @head.  Work series to
553  * be scheduled starts at @work and includes any consecutive work with
554  * WORK_STRUCT_LINKED set in its predecessor.
555  *
556  * If @nextp is not NULL, it's updated to point to the next work of
557  * the last scheduled work.  This allows move_linked_works() to be
558  * nested inside outer list_for_each_entry_safe().
559  *
560  * CONTEXT:
561  * spin_lock_irq(cwq->lock).
562  */
563 static void move_linked_works(struct work_struct *work, struct list_head *head,
564                               struct work_struct **nextp)
565 {
566         struct work_struct *n;
567
568         /*
569          * Linked worklist will always end before the end of the list,
570          * use NULL for list head.
571          */
572         list_for_each_entry_safe_from(work, n, NULL, entry) {
573                 list_move_tail(&work->entry, head);
574                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
575                         break;
576         }
577
578         /*
579          * If we're already inside safe list traversal and have moved
580          * multiple works to the scheduled queue, the next position
581          * needs to be updated.
582          */
583         if (nextp)
584                 *nextp = n;
585 }
586
587 /**
588  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
589  * @cwq: cwq of interest
590  * @color: color of work which left the queue
591  *
592  * A work either has completed or is removed from pending queue,
593  * decrement nr_in_flight of its cwq and handle workqueue flushing.
594  *
595  * CONTEXT:
596  * spin_lock_irq(cwq->lock).
597  */
598 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
599 {
600         /* ignore uncolored works */
601         if (color == WORK_NO_COLOR)
602                 return;
603
604         cwq->nr_in_flight[color]--;
605
606         /* is flush in progress and are we at the flushing tip? */
607         if (likely(cwq->flush_color != color))
608                 return;
609
610         /* are there still in-flight works? */
611         if (cwq->nr_in_flight[color])
612                 return;
613
614         /* this cwq is done, clear flush_color */
615         cwq->flush_color = -1;
616
617         /*
618          * If this was the last cwq, wake up the first flusher.  It
619          * will handle the rest.
620          */
621         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
622                 complete(&cwq->wq->first_flusher->done);
623 }
624
625 /**
626  * process_one_work - process single work
627  * @worker: self
628  * @work: work to process
629  *
630  * Process @work.  This function contains all the logics necessary to
631  * process a single work including synchronization against and
632  * interaction with other workers on the same cpu, queueing and
633  * flushing.  As long as context requirement is met, any worker can
634  * call this function to process a work.
635  *
636  * CONTEXT:
637  * spin_lock_irq(cwq->lock) which is released and regrabbed.
638  */
639 static void process_one_work(struct worker *worker, struct work_struct *work)
640 {
641         struct cpu_workqueue_struct *cwq = worker->cwq;
642         work_func_t f = work->func;
643         int work_color;
644 #ifdef CONFIG_LOCKDEP
645         /*
646          * It is permissible to free the struct work_struct from
647          * inside the function that is called from it, this we need to
648          * take into account for lockdep too.  To avoid bogus "held
649          * lock freed" warnings as well as problems when looking into
650          * work->lockdep_map, make a copy and use that here.
651          */
652         struct lockdep_map lockdep_map = work->lockdep_map;
653 #endif
654         /* claim and process */
655         debug_work_deactivate(work);
656         worker->current_work = work;
657         work_color = get_work_color(work);
658         list_del_init(&work->entry);
659
660         spin_unlock_irq(&cwq->lock);
661
662         BUG_ON(get_wq_data(work) != cwq);
663         work_clear_pending(work);
664         lock_map_acquire(&cwq->wq->lockdep_map);
665         lock_map_acquire(&lockdep_map);
666         f(work);
667         lock_map_release(&lockdep_map);
668         lock_map_release(&cwq->wq->lockdep_map);
669
670         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
671                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
672                        "%s/0x%08x/%d\n",
673                        current->comm, preempt_count(), task_pid_nr(current));
674                 printk(KERN_ERR "    last function: ");
675                 print_symbol("%s\n", (unsigned long)f);
676                 debug_show_held_locks(current);
677                 dump_stack();
678         }
679
680         spin_lock_irq(&cwq->lock);
681
682         /* we're done with it, release */
683         worker->current_work = NULL;
684         cwq_dec_nr_in_flight(cwq, work_color);
685 }
686
687 /**
688  * process_scheduled_works - process scheduled works
689  * @worker: self
690  *
691  * Process all scheduled works.  Please note that the scheduled list
692  * may change while processing a work, so this function repeatedly
693  * fetches a work from the top and executes it.
694  *
695  * CONTEXT:
696  * spin_lock_irq(cwq->lock) which may be released and regrabbed
697  * multiple times.
698  */
699 static void process_scheduled_works(struct worker *worker)
700 {
701         while (!list_empty(&worker->scheduled)) {
702                 struct work_struct *work = list_first_entry(&worker->scheduled,
703                                                 struct work_struct, entry);
704                 process_one_work(worker, work);
705         }
706 }
707
708 /**
709  * worker_thread - the worker thread function
710  * @__worker: self
711  *
712  * The cwq worker thread function.
713  */
714 static int worker_thread(void *__worker)
715 {
716         struct worker *worker = __worker;
717         struct cpu_workqueue_struct *cwq = worker->cwq;
718         DEFINE_WAIT(wait);
719
720         if (cwq->wq->flags & WQ_FREEZEABLE)
721                 set_freezable();
722
723         for (;;) {
724                 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
725                 if (!freezing(current) &&
726                     !kthread_should_stop() &&
727                     list_empty(&cwq->worklist))
728                         schedule();
729                 finish_wait(&cwq->more_work, &wait);
730
731                 try_to_freeze();
732
733                 if (kthread_should_stop())
734                         break;
735
736                 if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
737                                             get_cpu_mask(cwq->cpu))))
738                         set_cpus_allowed_ptr(worker->task,
739                                              get_cpu_mask(cwq->cpu));
740
741                 spin_lock_irq(&cwq->lock);
742
743                 while (!list_empty(&cwq->worklist)) {
744                         struct work_struct *work =
745                                 list_first_entry(&cwq->worklist,
746                                                  struct work_struct, entry);
747
748                         if (likely(!(*work_data_bits(work) &
749                                      WORK_STRUCT_LINKED))) {
750                                 /* optimization path, not strictly necessary */
751                                 process_one_work(worker, work);
752                                 if (unlikely(!list_empty(&worker->scheduled)))
753                                         process_scheduled_works(worker);
754                         } else {
755                                 move_linked_works(work, &worker->scheduled,
756                                                   NULL);
757                                 process_scheduled_works(worker);
758                         }
759                 }
760
761                 spin_unlock_irq(&cwq->lock);
762         }
763
764         return 0;
765 }
766
767 struct wq_barrier {
768         struct work_struct      work;
769         struct completion       done;
770 };
771
772 static void wq_barrier_func(struct work_struct *work)
773 {
774         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
775         complete(&barr->done);
776 }
777
778 /**
779  * insert_wq_barrier - insert a barrier work
780  * @cwq: cwq to insert barrier into
781  * @barr: wq_barrier to insert
782  * @target: target work to attach @barr to
783  * @worker: worker currently executing @target, NULL if @target is not executing
784  *
785  * @barr is linked to @target such that @barr is completed only after
786  * @target finishes execution.  Please note that the ordering
787  * guarantee is observed only with respect to @target and on the local
788  * cpu.
789  *
790  * Currently, a queued barrier can't be canceled.  This is because
791  * try_to_grab_pending() can't determine whether the work to be
792  * grabbed is at the head of the queue and thus can't clear LINKED
793  * flag of the previous work while there must be a valid next work
794  * after a work with LINKED flag set.
795  *
796  * Note that when @worker is non-NULL, @target may be modified
797  * underneath us, so we can't reliably determine cwq from @target.
798  *
799  * CONTEXT:
800  * spin_lock_irq(cwq->lock).
801  */
802 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
803                               struct wq_barrier *barr,
804                               struct work_struct *target, struct worker *worker)
805 {
806         struct list_head *head;
807         unsigned int linked = 0;
808
809         /*
810          * debugobject calls are safe here even with cwq->lock locked
811          * as we know for sure that this will not trigger any of the
812          * checks and call back into the fixup functions where we
813          * might deadlock.
814          */
815         INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
816         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
817         init_completion(&barr->done);
818
819         /*
820          * If @target is currently being executed, schedule the
821          * barrier to the worker; otherwise, put it after @target.
822          */
823         if (worker)
824                 head = worker->scheduled.next;
825         else {
826                 unsigned long *bits = work_data_bits(target);
827
828                 head = target->entry.next;
829                 /* there can already be other linked works, inherit and set */
830                 linked = *bits & WORK_STRUCT_LINKED;
831                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
832         }
833
834         debug_work_activate(&barr->work);
835         insert_work(cwq, &barr->work, head,
836                     work_color_to_flags(WORK_NO_COLOR) | linked);
837 }
838
839 /**
840  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
841  * @wq: workqueue being flushed
842  * @flush_color: new flush color, < 0 for no-op
843  * @work_color: new work color, < 0 for no-op
844  *
845  * Prepare cwqs for workqueue flushing.
846  *
847  * If @flush_color is non-negative, flush_color on all cwqs should be
848  * -1.  If no cwq has in-flight commands at the specified color, all
849  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
850  * has in flight commands, its cwq->flush_color is set to
851  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
852  * wakeup logic is armed and %true is returned.
853  *
854  * The caller should have initialized @wq->first_flusher prior to
855  * calling this function with non-negative @flush_color.  If
856  * @flush_color is negative, no flush color update is done and %false
857  * is returned.
858  *
859  * If @work_color is non-negative, all cwqs should have the same
860  * work_color which is previous to @work_color and all will be
861  * advanced to @work_color.
862  *
863  * CONTEXT:
864  * mutex_lock(wq->flush_mutex).
865  *
866  * RETURNS:
867  * %true if @flush_color >= 0 and there's something to flush.  %false
868  * otherwise.
869  */
870 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
871                                       int flush_color, int work_color)
872 {
873         bool wait = false;
874         unsigned int cpu;
875
876         if (flush_color >= 0) {
877                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
878                 atomic_set(&wq->nr_cwqs_to_flush, 1);
879         }
880
881         for_each_possible_cpu(cpu) {
882                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
883
884                 spin_lock_irq(&cwq->lock);
885
886                 if (flush_color >= 0) {
887                         BUG_ON(cwq->flush_color != -1);
888
889                         if (cwq->nr_in_flight[flush_color]) {
890                                 cwq->flush_color = flush_color;
891                                 atomic_inc(&wq->nr_cwqs_to_flush);
892                                 wait = true;
893                         }
894                 }
895
896                 if (work_color >= 0) {
897                         BUG_ON(work_color != work_next_color(cwq->work_color));
898                         cwq->work_color = work_color;
899                 }
900
901                 spin_unlock_irq(&cwq->lock);
902         }
903
904         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
905                 complete(&wq->first_flusher->done);
906
907         return wait;
908 }
909
910 /**
911  * flush_workqueue - ensure that any scheduled work has run to completion.
912  * @wq: workqueue to flush
913  *
914  * Forces execution of the workqueue and blocks until its completion.
915  * This is typically used in driver shutdown handlers.
916  *
917  * We sleep until all works which were queued on entry have been handled,
918  * but we are not livelocked by new incoming ones.
919  */
920 void flush_workqueue(struct workqueue_struct *wq)
921 {
922         struct wq_flusher this_flusher = {
923                 .list = LIST_HEAD_INIT(this_flusher.list),
924                 .flush_color = -1,
925                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
926         };
927         int next_color;
928
929         lock_map_acquire(&wq->lockdep_map);
930         lock_map_release(&wq->lockdep_map);
931
932         mutex_lock(&wq->flush_mutex);
933
934         /*
935          * Start-to-wait phase
936          */
937         next_color = work_next_color(wq->work_color);
938
939         if (next_color != wq->flush_color) {
940                 /*
941                  * Color space is not full.  The current work_color
942                  * becomes our flush_color and work_color is advanced
943                  * by one.
944                  */
945                 BUG_ON(!list_empty(&wq->flusher_overflow));
946                 this_flusher.flush_color = wq->work_color;
947                 wq->work_color = next_color;
948
949                 if (!wq->first_flusher) {
950                         /* no flush in progress, become the first flusher */
951                         BUG_ON(wq->flush_color != this_flusher.flush_color);
952
953                         wq->first_flusher = &this_flusher;
954
955                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
956                                                        wq->work_color)) {
957                                 /* nothing to flush, done */
958                                 wq->flush_color = next_color;
959                                 wq->first_flusher = NULL;
960                                 goto out_unlock;
961                         }
962                 } else {
963                         /* wait in queue */
964                         BUG_ON(wq->flush_color == this_flusher.flush_color);
965                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
966                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
967                 }
968         } else {
969                 /*
970                  * Oops, color space is full, wait on overflow queue.
971                  * The next flush completion will assign us
972                  * flush_color and transfer to flusher_queue.
973                  */
974                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
975         }
976
977         mutex_unlock(&wq->flush_mutex);
978
979         wait_for_completion(&this_flusher.done);
980
981         /*
982          * Wake-up-and-cascade phase
983          *
984          * First flushers are responsible for cascading flushes and
985          * handling overflow.  Non-first flushers can simply return.
986          */
987         if (wq->first_flusher != &this_flusher)
988                 return;
989
990         mutex_lock(&wq->flush_mutex);
991
992         wq->first_flusher = NULL;
993
994         BUG_ON(!list_empty(&this_flusher.list));
995         BUG_ON(wq->flush_color != this_flusher.flush_color);
996
997         while (true) {
998                 struct wq_flusher *next, *tmp;
999
1000                 /* complete all the flushers sharing the current flush color */
1001                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
1002                         if (next->flush_color != wq->flush_color)
1003                                 break;
1004                         list_del_init(&next->list);
1005                         complete(&next->done);
1006                 }
1007
1008                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
1009                        wq->flush_color != work_next_color(wq->work_color));
1010
1011                 /* this flush_color is finished, advance by one */
1012                 wq->flush_color = work_next_color(wq->flush_color);
1013
1014                 /* one color has been freed, handle overflow queue */
1015                 if (!list_empty(&wq->flusher_overflow)) {
1016                         /*
1017                          * Assign the same color to all overflowed
1018                          * flushers, advance work_color and append to
1019                          * flusher_queue.  This is the start-to-wait
1020                          * phase for these overflowed flushers.
1021                          */
1022                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
1023                                 tmp->flush_color = wq->work_color;
1024
1025                         wq->work_color = work_next_color(wq->work_color);
1026
1027                         list_splice_tail_init(&wq->flusher_overflow,
1028                                               &wq->flusher_queue);
1029                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
1030                 }
1031
1032                 if (list_empty(&wq->flusher_queue)) {
1033                         BUG_ON(wq->flush_color != wq->work_color);
1034                         break;
1035                 }
1036
1037                 /*
1038                  * Need to flush more colors.  Make the next flusher
1039                  * the new first flusher and arm cwqs.
1040                  */
1041                 BUG_ON(wq->flush_color == wq->work_color);
1042                 BUG_ON(wq->flush_color != next->flush_color);
1043
1044                 list_del_init(&next->list);
1045                 wq->first_flusher = next;
1046
1047                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
1048                         break;
1049
1050                 /*
1051                  * Meh... this color is already done, clear first
1052                  * flusher and repeat cascading.
1053                  */
1054                 wq->first_flusher = NULL;
1055         }
1056
1057 out_unlock:
1058         mutex_unlock(&wq->flush_mutex);
1059 }
1060 EXPORT_SYMBOL_GPL(flush_workqueue);
1061
1062 /**
1063  * flush_work - block until a work_struct's callback has terminated
1064  * @work: the work which is to be flushed
1065  *
1066  * Returns false if @work has already terminated.
1067  *
1068  * It is expected that, prior to calling flush_work(), the caller has
1069  * arranged for the work to not be requeued, otherwise it doesn't make
1070  * sense to use this function.
1071  */
1072 int flush_work(struct work_struct *work)
1073 {
1074         struct worker *worker = NULL;
1075         struct cpu_workqueue_struct *cwq;
1076         struct wq_barrier barr;
1077
1078         might_sleep();
1079         cwq = get_wq_data(work);
1080         if (!cwq)
1081                 return 0;
1082
1083         lock_map_acquire(&cwq->wq->lockdep_map);
1084         lock_map_release(&cwq->wq->lockdep_map);
1085
1086         spin_lock_irq(&cwq->lock);
1087         if (!list_empty(&work->entry)) {
1088                 /*
1089                  * See the comment near try_to_grab_pending()->smp_rmb().
1090                  * If it was re-queued under us we are not going to wait.
1091                  */
1092                 smp_rmb();
1093                 if (unlikely(cwq != get_wq_data(work)))
1094                         goto already_gone;
1095         } else {
1096                 if (cwq->worker && cwq->worker->current_work == work)
1097                         worker = cwq->worker;
1098                 if (!worker)
1099                         goto already_gone;
1100         }
1101
1102         insert_wq_barrier(cwq, &barr, work, worker);
1103         spin_unlock_irq(&cwq->lock);
1104         wait_for_completion(&barr.done);
1105         destroy_work_on_stack(&barr.work);
1106         return 1;
1107 already_gone:
1108         spin_unlock_irq(&cwq->lock);
1109         return 0;
1110 }
1111 EXPORT_SYMBOL_GPL(flush_work);
1112
1113 /*
1114  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
1115  * so this work can't be re-armed in any way.
1116  */
1117 static int try_to_grab_pending(struct work_struct *work)
1118 {
1119         struct cpu_workqueue_struct *cwq;
1120         int ret = -1;
1121
1122         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1123                 return 0;
1124
1125         /*
1126          * The queueing is in progress, or it is already queued. Try to
1127          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1128          */
1129
1130         cwq = get_wq_data(work);
1131         if (!cwq)
1132                 return ret;
1133
1134         spin_lock_irq(&cwq->lock);
1135         if (!list_empty(&work->entry)) {
1136                 /*
1137                  * This work is queued, but perhaps we locked the wrong cwq.
1138                  * In that case we must see the new value after rmb(), see
1139                  * insert_work()->wmb().
1140                  */
1141                 smp_rmb();
1142                 if (cwq == get_wq_data(work)) {
1143                         debug_work_deactivate(work);
1144                         list_del_init(&work->entry);
1145                         cwq_dec_nr_in_flight(cwq, get_work_color(work));
1146                         ret = 1;
1147                 }
1148         }
1149         spin_unlock_irq(&cwq->lock);
1150
1151         return ret;
1152 }
1153
1154 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
1155                                 struct work_struct *work)
1156 {
1157         struct wq_barrier barr;
1158         struct worker *worker;
1159
1160         spin_lock_irq(&cwq->lock);
1161
1162         worker = NULL;
1163         if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
1164                 worker = cwq->worker;
1165                 insert_wq_barrier(cwq, &barr, work, worker);
1166         }
1167
1168         spin_unlock_irq(&cwq->lock);
1169
1170         if (unlikely(worker)) {
1171                 wait_for_completion(&barr.done);
1172                 destroy_work_on_stack(&barr.work);
1173         }
1174 }
1175
1176 static void wait_on_work(struct work_struct *work)
1177 {
1178         struct cpu_workqueue_struct *cwq;
1179         struct workqueue_struct *wq;
1180         int cpu;
1181
1182         might_sleep();
1183
1184         lock_map_acquire(&work->lockdep_map);
1185         lock_map_release(&work->lockdep_map);
1186
1187         cwq = get_wq_data(work);
1188         if (!cwq)
1189                 return;
1190
1191         wq = cwq->wq;
1192
1193         for_each_possible_cpu(cpu)
1194                 wait_on_cpu_work(get_cwq(cpu, wq), work);
1195 }
1196
1197 static int __cancel_work_timer(struct work_struct *work,
1198                                 struct timer_list* timer)
1199 {
1200         int ret;
1201
1202         do {
1203                 ret = (timer && likely(del_timer(timer)));
1204                 if (!ret)
1205                         ret = try_to_grab_pending(work);
1206                 wait_on_work(work);
1207         } while (unlikely(ret < 0));
1208
1209         clear_wq_data(work);
1210         return ret;
1211 }
1212
1213 /**
1214  * cancel_work_sync - block until a work_struct's callback has terminated
1215  * @work: the work which is to be flushed
1216  *
1217  * Returns true if @work was pending.
1218  *
1219  * cancel_work_sync() will cancel the work if it is queued. If the work's
1220  * callback appears to be running, cancel_work_sync() will block until it
1221  * has completed.
1222  *
1223  * It is possible to use this function if the work re-queues itself. It can
1224  * cancel the work even if it migrates to another workqueue, however in that
1225  * case it only guarantees that work->func() has completed on the last queued
1226  * workqueue.
1227  *
1228  * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
1229  * pending, otherwise it goes into a busy-wait loop until the timer expires.
1230  *
1231  * The caller must ensure that workqueue_struct on which this work was last
1232  * queued can't be destroyed before this function returns.
1233  */
1234 int cancel_work_sync(struct work_struct *work)
1235 {
1236         return __cancel_work_timer(work, NULL);
1237 }
1238 EXPORT_SYMBOL_GPL(cancel_work_sync);
1239
1240 /**
1241  * cancel_delayed_work_sync - reliably kill off a delayed work.
1242  * @dwork: the delayed work struct
1243  *
1244  * Returns true if @dwork was pending.
1245  *
1246  * It is possible to use this function if @dwork rearms itself via queue_work()
1247  * or queue_delayed_work(). See also the comment for cancel_work_sync().
1248  */
1249 int cancel_delayed_work_sync(struct delayed_work *dwork)
1250 {
1251         return __cancel_work_timer(&dwork->work, &dwork->timer);
1252 }
1253 EXPORT_SYMBOL(cancel_delayed_work_sync);
1254
1255 static struct workqueue_struct *keventd_wq __read_mostly;
1256
1257 /**
1258  * schedule_work - put work task in global workqueue
1259  * @work: job to be done
1260  *
1261  * Returns zero if @work was already on the kernel-global workqueue and
1262  * non-zero otherwise.
1263  *
1264  * This puts a job in the kernel-global workqueue if it was not already
1265  * queued and leaves it in the same position on the kernel-global
1266  * workqueue otherwise.
1267  */
1268 int schedule_work(struct work_struct *work)
1269 {
1270         return queue_work(keventd_wq, work);
1271 }
1272 EXPORT_SYMBOL(schedule_work);
1273
1274 /*
1275  * schedule_work_on - put work task on a specific cpu
1276  * @cpu: cpu to put the work task on
1277  * @work: job to be done
1278  *
1279  * This puts a job on a specific cpu
1280  */
1281 int schedule_work_on(int cpu, struct work_struct *work)
1282 {
1283         return queue_work_on(cpu, keventd_wq, work);
1284 }
1285 EXPORT_SYMBOL(schedule_work_on);
1286
1287 /**
1288  * schedule_delayed_work - put work task in global workqueue after delay
1289  * @dwork: job to be done
1290  * @delay: number of jiffies to wait or 0 for immediate execution
1291  *
1292  * After waiting for a given time this puts a job in the kernel-global
1293  * workqueue.
1294  */
1295 int schedule_delayed_work(struct delayed_work *dwork,
1296                                         unsigned long delay)
1297 {
1298         return queue_delayed_work(keventd_wq, dwork, delay);
1299 }
1300 EXPORT_SYMBOL(schedule_delayed_work);
1301
1302 /**
1303  * flush_delayed_work - block until a dwork_struct's callback has terminated
1304  * @dwork: the delayed work which is to be flushed
1305  *
1306  * Any timeout is cancelled, and any pending work is run immediately.
1307  */
1308 void flush_delayed_work(struct delayed_work *dwork)
1309 {
1310         if (del_timer_sync(&dwork->timer)) {
1311                 __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
1312                              &dwork->work);
1313                 put_cpu();
1314         }
1315         flush_work(&dwork->work);
1316 }
1317 EXPORT_SYMBOL(flush_delayed_work);
1318
1319 /**
1320  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
1321  * @cpu: cpu to use
1322  * @dwork: job to be done
1323  * @delay: number of jiffies to wait
1324  *
1325  * After waiting for a given time this puts a job in the kernel-global
1326  * workqueue on the specified CPU.
1327  */
1328 int schedule_delayed_work_on(int cpu,
1329                         struct delayed_work *dwork, unsigned long delay)
1330 {
1331         return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
1332 }
1333 EXPORT_SYMBOL(schedule_delayed_work_on);
1334
1335 /**
1336  * schedule_on_each_cpu - call a function on each online CPU from keventd
1337  * @func: the function to call
1338  *
1339  * Returns zero on success.
1340  * Returns -ve errno on failure.
1341  *
1342  * schedule_on_each_cpu() is very slow.
1343  */
1344 int schedule_on_each_cpu(work_func_t func)
1345 {
1346         int cpu;
1347         int orig = -1;
1348         struct work_struct *works;
1349
1350         works = alloc_percpu(struct work_struct);
1351         if (!works)
1352                 return -ENOMEM;
1353
1354         get_online_cpus();
1355
1356         /*
1357          * When running in keventd don't schedule a work item on
1358          * itself.  Can just call directly because the work queue is
1359          * already bound.  This also is faster.
1360          */
1361         if (current_is_keventd())
1362                 orig = raw_smp_processor_id();
1363
1364         for_each_online_cpu(cpu) {
1365                 struct work_struct *work = per_cpu_ptr(works, cpu);
1366
1367                 INIT_WORK(work, func);
1368                 if (cpu != orig)
1369                         schedule_work_on(cpu, work);
1370         }
1371         if (orig >= 0)
1372                 func(per_cpu_ptr(works, orig));
1373
1374         for_each_online_cpu(cpu)
1375                 flush_work(per_cpu_ptr(works, cpu));
1376
1377         put_online_cpus();
1378         free_percpu(works);
1379         return 0;
1380 }
1381
1382 /**
1383  * flush_scheduled_work - ensure that any scheduled work has run to completion.
1384  *
1385  * Forces execution of the kernel-global workqueue and blocks until its
1386  * completion.
1387  *
1388  * Think twice before calling this function!  It's very easy to get into
1389  * trouble if you don't take great care.  Either of the following situations
1390  * will lead to deadlock:
1391  *
1392  *      One of the work items currently on the workqueue needs to acquire
1393  *      a lock held by your code or its caller.
1394  *
1395  *      Your code is running in the context of a work routine.
1396  *
1397  * They will be detected by lockdep when they occur, but the first might not
1398  * occur very often.  It depends on what work items are on the workqueue and
1399  * what locks they need, which you have no control over.
1400  *
1401  * In most situations flushing the entire workqueue is overkill; you merely
1402  * need to know that a particular work item isn't queued and isn't running.
1403  * In such cases you should use cancel_delayed_work_sync() or
1404  * cancel_work_sync() instead.
1405  */
1406 void flush_scheduled_work(void)
1407 {
1408         flush_workqueue(keventd_wq);
1409 }
1410 EXPORT_SYMBOL(flush_scheduled_work);
1411
1412 /**
1413  * execute_in_process_context - reliably execute the routine with user context
1414  * @fn:         the function to execute
1415  * @ew:         guaranteed storage for the execute work structure (must
1416  *              be available when the work executes)
1417  *
1418  * Executes the function immediately if process context is available,
1419  * otherwise schedules the function for delayed execution.
1420  *
1421  * Returns:     0 - function was executed
1422  *              1 - function was scheduled for execution
1423  */
1424 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
1425 {
1426         if (!in_interrupt()) {
1427                 fn(&ew->work);
1428                 return 0;
1429         }
1430
1431         INIT_WORK(&ew->work, fn);
1432         schedule_work(&ew->work);
1433
1434         return 1;
1435 }
1436 EXPORT_SYMBOL_GPL(execute_in_process_context);
1437
1438 int keventd_up(void)
1439 {
1440         return keventd_wq != NULL;
1441 }
1442
1443 int current_is_keventd(void)
1444 {
1445         struct cpu_workqueue_struct *cwq;
1446         int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
1447         int ret = 0;
1448
1449         BUG_ON(!keventd_wq);
1450
1451         cwq = get_cwq(cpu, keventd_wq);
1452         if (current == cwq->worker->task)
1453                 ret = 1;
1454
1455         return ret;
1456
1457 }
1458
1459 static struct cpu_workqueue_struct *alloc_cwqs(void)
1460 {
1461         /*
1462          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
1463          * Make sure that the alignment isn't lower than that of
1464          * unsigned long long.
1465          */
1466         const size_t size = sizeof(struct cpu_workqueue_struct);
1467         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
1468                                    __alignof__(unsigned long long));
1469         struct cpu_workqueue_struct *cwqs;
1470 #ifndef CONFIG_SMP
1471         void *ptr;
1472
1473         /*
1474          * On UP, percpu allocator doesn't honor alignment parameter
1475          * and simply uses arch-dependent default.  Allocate enough
1476          * room to align cwq and put an extra pointer at the end
1477          * pointing back to the originally allocated pointer which
1478          * will be used for free.
1479          *
1480          * FIXME: This really belongs to UP percpu code.  Update UP
1481          * percpu code to honor alignment and remove this ugliness.
1482          */
1483         ptr = __alloc_percpu(size + align + sizeof(void *), 1);
1484         cwqs = PTR_ALIGN(ptr, align);
1485         *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
1486 #else
1487         /* On SMP, percpu allocator can do it itself */
1488         cwqs = __alloc_percpu(size, align);
1489 #endif
1490         /* just in case, make sure it's actually aligned */
1491         BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
1492         return cwqs;
1493 }
1494
1495 static void free_cwqs(struct cpu_workqueue_struct *cwqs)
1496 {
1497 #ifndef CONFIG_SMP
1498         /* on UP, the pointer to free is stored right after the cwq */
1499         if (cwqs)
1500                 free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
1501 #else
1502         free_percpu(cwqs);
1503 #endif
1504 }
1505
1506 struct workqueue_struct *__create_workqueue_key(const char *name,
1507                                                 unsigned int flags,
1508                                                 struct lock_class_key *key,
1509                                                 const char *lock_name)
1510 {
1511         bool singlethread = flags & WQ_SINGLE_THREAD;
1512         struct workqueue_struct *wq;
1513         bool failed = false;
1514         unsigned int cpu;
1515
1516         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1517         if (!wq)
1518                 goto err;
1519
1520         wq->cpu_wq = alloc_cwqs();
1521         if (!wq->cpu_wq)
1522                 goto err;
1523
1524         wq->flags = flags;
1525         mutex_init(&wq->flush_mutex);
1526         atomic_set(&wq->nr_cwqs_to_flush, 0);
1527         INIT_LIST_HEAD(&wq->flusher_queue);
1528         INIT_LIST_HEAD(&wq->flusher_overflow);
1529         wq->name = name;
1530         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1531         INIT_LIST_HEAD(&wq->list);
1532
1533         cpu_maps_update_begin();
1534         /*
1535          * We must initialize cwqs for each possible cpu even if we
1536          * are going to call destroy_workqueue() finally. Otherwise
1537          * cpu_up() can hit the uninitialized cwq once we drop the
1538          * lock.
1539          */
1540         for_each_possible_cpu(cpu) {
1541                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1542
1543                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1544                 cwq->cpu = cpu;
1545                 cwq->wq = wq;
1546                 cwq->flush_color = -1;
1547                 spin_lock_init(&cwq->lock);
1548                 INIT_LIST_HEAD(&cwq->worklist);
1549                 init_waitqueue_head(&cwq->more_work);
1550
1551                 if (failed)
1552                         continue;
1553                 cwq->worker = create_worker(cwq,
1554                                             cpu_online(cpu) && !singlethread);
1555                 if (cwq->worker)
1556                         start_worker(cwq->worker);
1557                 else
1558                         failed = true;
1559         }
1560
1561         spin_lock(&workqueue_lock);
1562         list_add(&wq->list, &workqueues);
1563         spin_unlock(&workqueue_lock);
1564
1565         cpu_maps_update_done();
1566
1567         if (failed) {
1568                 destroy_workqueue(wq);
1569                 wq = NULL;
1570         }
1571         return wq;
1572 err:
1573         if (wq) {
1574                 free_cwqs(wq->cpu_wq);
1575                 kfree(wq);
1576         }
1577         return NULL;
1578 }
1579 EXPORT_SYMBOL_GPL(__create_workqueue_key);
1580
1581 /**
1582  * destroy_workqueue - safely terminate a workqueue
1583  * @wq: target workqueue
1584  *
1585  * Safely destroy a workqueue. All work currently pending will be done first.
1586  */
1587 void destroy_workqueue(struct workqueue_struct *wq)
1588 {
1589         int cpu;
1590
1591         cpu_maps_update_begin();
1592         spin_lock(&workqueue_lock);
1593         list_del(&wq->list);
1594         spin_unlock(&workqueue_lock);
1595         cpu_maps_update_done();
1596
1597         flush_workqueue(wq);
1598
1599         for_each_possible_cpu(cpu) {
1600                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1601                 int i;
1602
1603                 if (cwq->worker) {
1604                         destroy_worker(cwq->worker);
1605                         cwq->worker = NULL;
1606                 }
1607
1608                 for (i = 0; i < WORK_NR_COLORS; i++)
1609                         BUG_ON(cwq->nr_in_flight[i]);
1610         }
1611
1612         free_cwqs(wq->cpu_wq);
1613         kfree(wq);
1614 }
1615 EXPORT_SYMBOL_GPL(destroy_workqueue);
1616
1617 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1618                                                 unsigned long action,
1619                                                 void *hcpu)
1620 {
1621         unsigned int cpu = (unsigned long)hcpu;
1622         struct cpu_workqueue_struct *cwq;
1623         struct workqueue_struct *wq;
1624
1625         action &= ~CPU_TASKS_FROZEN;
1626
1627         list_for_each_entry(wq, &workqueues, list) {
1628                 if (wq->flags & WQ_SINGLE_THREAD)
1629                         continue;
1630
1631                 cwq = get_cwq(cpu, wq);
1632
1633                 switch (action) {
1634                 case CPU_POST_DEAD:
1635                         flush_workqueue(wq);
1636                         break;
1637                 }
1638         }
1639
1640         return notifier_from_errno(0);
1641 }
1642
1643 #ifdef CONFIG_SMP
1644
1645 struct work_for_cpu {
1646         struct completion completion;
1647         long (*fn)(void *);
1648         void *arg;
1649         long ret;
1650 };
1651
1652 static int do_work_for_cpu(void *_wfc)
1653 {
1654         struct work_for_cpu *wfc = _wfc;
1655         wfc->ret = wfc->fn(wfc->arg);
1656         complete(&wfc->completion);
1657         return 0;
1658 }
1659
1660 /**
1661  * work_on_cpu - run a function in user context on a particular cpu
1662  * @cpu: the cpu to run on
1663  * @fn: the function to run
1664  * @arg: the function arg
1665  *
1666  * This will return the value @fn returns.
1667  * It is up to the caller to ensure that the cpu doesn't go offline.
1668  * The caller must not hold any locks which would prevent @fn from completing.
1669  */
1670 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
1671 {
1672         struct task_struct *sub_thread;
1673         struct work_for_cpu wfc = {
1674                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
1675                 .fn = fn,
1676                 .arg = arg,
1677         };
1678
1679         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
1680         if (IS_ERR(sub_thread))
1681                 return PTR_ERR(sub_thread);
1682         kthread_bind(sub_thread, cpu);
1683         wake_up_process(sub_thread);
1684         wait_for_completion(&wfc.completion);
1685         return wfc.ret;
1686 }
1687 EXPORT_SYMBOL_GPL(work_on_cpu);
1688 #endif /* CONFIG_SMP */
1689
1690 void __init init_workqueues(void)
1691 {
1692         unsigned int cpu;
1693
1694         for_each_possible_cpu(cpu)
1695                 ida_init(&per_cpu(worker_ida, cpu));
1696
1697         singlethread_cpu = cpumask_first(cpu_possible_mask);
1698         hotcpu_notifier(workqueue_cpu_callback, 0);
1699         keventd_wq = create_workqueue("events");
1700         BUG_ON(!keventd_wq);
1701 }