[PATCH] Support for freezeable workqueues
[linux-2.6.git] / kernel / workqueue.c
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
17  */
18
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h>
33
34 /*
35  * The per-CPU workqueue (if single thread, we always use the first
36  * possible cpu).
37  *
38  * The sequence counters are for flush_scheduled_work().  It wants to wait
39  * until all currently-scheduled works are completed, but it doesn't
40  * want to be livelocked by new, incoming ones.  So it waits until
41  * remove_sequence is >= the insert_sequence which pertained when
42  * flush_scheduled_work() was called.
43  */
44 struct cpu_workqueue_struct {
45
46         spinlock_t lock;
47
48         long remove_sequence;   /* Least-recently added (next to run) */
49         long insert_sequence;   /* Next to add */
50
51         struct list_head worklist;
52         wait_queue_head_t more_work;
53         wait_queue_head_t work_done;
54
55         struct workqueue_struct *wq;
56         struct task_struct *thread;
57
58         int run_depth;          /* Detect run_workqueue() recursion depth */
59
60         int freezeable;         /* Freeze the thread during suspend */
61 } ____cacheline_aligned;
62
63 /*
64  * The externally visible workqueue abstraction is an array of
65  * per-CPU workqueues:
66  */
67 struct workqueue_struct {
68         struct cpu_workqueue_struct *cpu_wq;
69         const char *name;
70         struct list_head list;  /* Empty if single thread */
71 };
72
73 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
74    threads to each one as cpus come/go. */
75 static DEFINE_MUTEX(workqueue_mutex);
76 static LIST_HEAD(workqueues);
77
78 static int singlethread_cpu;
79
80 /* If it's single threaded, it isn't in the list of workqueues. */
81 static inline int is_single_threaded(struct workqueue_struct *wq)
82 {
83         return list_empty(&wq->list);
84 }
85
86 static inline void set_wq_data(struct work_struct *work, void *wq)
87 {
88         unsigned long new, old, res;
89
90         /* assume the pending flag is already set and that the task has already
91          * been queued on this workqueue */
92         new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
93         res = work->management;
94         if (res != new) {
95                 do {
96                         old = res;
97                         new = (unsigned long) wq;
98                         new |= (old & WORK_STRUCT_FLAG_MASK);
99                         res = cmpxchg(&work->management, old, new);
100                 } while (res != old);
101         }
102 }
103
104 static inline void *get_wq_data(struct work_struct *work)
105 {
106         return (void *) (work->management & WORK_STRUCT_WQ_DATA_MASK);
107 }
108
109 /* Preempt must be disabled. */
110 static void __queue_work(struct cpu_workqueue_struct *cwq,
111                          struct work_struct *work)
112 {
113         unsigned long flags;
114
115         spin_lock_irqsave(&cwq->lock, flags);
116         set_wq_data(work, cwq);
117         list_add_tail(&work->entry, &cwq->worklist);
118         cwq->insert_sequence++;
119         wake_up(&cwq->more_work);
120         spin_unlock_irqrestore(&cwq->lock, flags);
121 }
122
123 /**
124  * queue_work - queue work on a workqueue
125  * @wq: workqueue to use
126  * @work: work to queue
127  *
128  * Returns 0 if @work was already on a queue, non-zero otherwise.
129  *
130  * We queue the work to the CPU it was submitted, but there is no
131  * guarantee that it will be processed by that CPU.
132  */
133 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
134 {
135         int ret = 0, cpu = get_cpu();
136
137         if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
138                 if (unlikely(is_single_threaded(wq)))
139                         cpu = singlethread_cpu;
140                 BUG_ON(!list_empty(&work->entry));
141                 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
142                 ret = 1;
143         }
144         put_cpu();
145         return ret;
146 }
147 EXPORT_SYMBOL_GPL(queue_work);
148
149 static void delayed_work_timer_fn(unsigned long __data)
150 {
151         struct delayed_work *dwork = (struct delayed_work *)__data;
152         struct workqueue_struct *wq = get_wq_data(&dwork->work);
153         int cpu = smp_processor_id();
154
155         if (unlikely(is_single_threaded(wq)))
156                 cpu = singlethread_cpu;
157
158         __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
159 }
160
161 /**
162  * queue_delayed_work - queue work on a workqueue after delay
163  * @wq: workqueue to use
164  * @work: delayable work to queue
165  * @delay: number of jiffies to wait before queueing
166  *
167  * Returns 0 if @work was already on a queue, non-zero otherwise.
168  */
169 int fastcall queue_delayed_work(struct workqueue_struct *wq,
170                         struct delayed_work *dwork, unsigned long delay)
171 {
172         int ret = 0;
173         struct timer_list *timer = &dwork->timer;
174         struct work_struct *work = &dwork->work;
175
176         if (delay == 0)
177                 return queue_work(wq, work);
178
179         if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
180                 BUG_ON(timer_pending(timer));
181                 BUG_ON(!list_empty(&work->entry));
182
183                 /* This stores wq for the moment, for the timer_fn */
184                 set_wq_data(work, wq);
185                 timer->expires = jiffies + delay;
186                 timer->data = (unsigned long)dwork;
187                 timer->function = delayed_work_timer_fn;
188                 add_timer(timer);
189                 ret = 1;
190         }
191         return ret;
192 }
193 EXPORT_SYMBOL_GPL(queue_delayed_work);
194
195 /**
196  * queue_delayed_work_on - queue work on specific CPU after delay
197  * @cpu: CPU number to execute work on
198  * @wq: workqueue to use
199  * @work: work to queue
200  * @delay: number of jiffies to wait before queueing
201  *
202  * Returns 0 if @work was already on a queue, non-zero otherwise.
203  */
204 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
205                         struct delayed_work *dwork, unsigned long delay)
206 {
207         int ret = 0;
208         struct timer_list *timer = &dwork->timer;
209         struct work_struct *work = &dwork->work;
210
211         if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
212                 BUG_ON(timer_pending(timer));
213                 BUG_ON(!list_empty(&work->entry));
214
215                 /* This stores wq for the moment, for the timer_fn */
216                 set_wq_data(work, wq);
217                 timer->expires = jiffies + delay;
218                 timer->data = (unsigned long)dwork;
219                 timer->function = delayed_work_timer_fn;
220                 add_timer_on(timer, cpu);
221                 ret = 1;
222         }
223         return ret;
224 }
225 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
226
227 static void run_workqueue(struct cpu_workqueue_struct *cwq)
228 {
229         unsigned long flags;
230
231         /*
232          * Keep taking off work from the queue until
233          * done.
234          */
235         spin_lock_irqsave(&cwq->lock, flags);
236         cwq->run_depth++;
237         if (cwq->run_depth > 3) {
238                 /* morton gets to eat his hat */
239                 printk("%s: recursion depth exceeded: %d\n",
240                         __FUNCTION__, cwq->run_depth);
241                 dump_stack();
242         }
243         while (!list_empty(&cwq->worklist)) {
244                 struct work_struct *work = list_entry(cwq->worklist.next,
245                                                 struct work_struct, entry);
246                 work_func_t f = work->func;
247
248                 list_del_init(cwq->worklist.next);
249                 spin_unlock_irqrestore(&cwq->lock, flags);
250
251                 BUG_ON(get_wq_data(work) != cwq);
252                 if (!test_bit(WORK_STRUCT_NOAUTOREL, &work->management))
253                         work_release(work);
254                 f(work);
255
256                 spin_lock_irqsave(&cwq->lock, flags);
257                 cwq->remove_sequence++;
258                 wake_up(&cwq->work_done);
259         }
260         cwq->run_depth--;
261         spin_unlock_irqrestore(&cwq->lock, flags);
262 }
263
264 static int worker_thread(void *__cwq)
265 {
266         struct cpu_workqueue_struct *cwq = __cwq;
267         DECLARE_WAITQUEUE(wait, current);
268         struct k_sigaction sa;
269         sigset_t blocked;
270
271         if (!cwq->freezeable)
272                 current->flags |= PF_NOFREEZE;
273
274         set_user_nice(current, -5);
275
276         /* Block and flush all signals */
277         sigfillset(&blocked);
278         sigprocmask(SIG_BLOCK, &blocked, NULL);
279         flush_signals(current);
280
281         /*
282          * We inherited MPOL_INTERLEAVE from the booting kernel.
283          * Set MPOL_DEFAULT to insure node local allocations.
284          */
285         numa_default_policy();
286
287         /* SIG_IGN makes children autoreap: see do_notify_parent(). */
288         sa.sa.sa_handler = SIG_IGN;
289         sa.sa.sa_flags = 0;
290         siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
291         do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
292
293         set_current_state(TASK_INTERRUPTIBLE);
294         while (!kthread_should_stop()) {
295                 if (cwq->freezeable)
296                         try_to_freeze();
297
298                 add_wait_queue(&cwq->more_work, &wait);
299                 if (list_empty(&cwq->worklist))
300                         schedule();
301                 else
302                         __set_current_state(TASK_RUNNING);
303                 remove_wait_queue(&cwq->more_work, &wait);
304
305                 if (!list_empty(&cwq->worklist))
306                         run_workqueue(cwq);
307                 set_current_state(TASK_INTERRUPTIBLE);
308         }
309         __set_current_state(TASK_RUNNING);
310         return 0;
311 }
312
313 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
314 {
315         if (cwq->thread == current) {
316                 /*
317                  * Probably keventd trying to flush its own queue. So simply run
318                  * it by hand rather than deadlocking.
319                  */
320                 run_workqueue(cwq);
321         } else {
322                 DEFINE_WAIT(wait);
323                 long sequence_needed;
324
325                 spin_lock_irq(&cwq->lock);
326                 sequence_needed = cwq->insert_sequence;
327
328                 while (sequence_needed - cwq->remove_sequence > 0) {
329                         prepare_to_wait(&cwq->work_done, &wait,
330                                         TASK_UNINTERRUPTIBLE);
331                         spin_unlock_irq(&cwq->lock);
332                         schedule();
333                         spin_lock_irq(&cwq->lock);
334                 }
335                 finish_wait(&cwq->work_done, &wait);
336                 spin_unlock_irq(&cwq->lock);
337         }
338 }
339
340 /**
341  * flush_workqueue - ensure that any scheduled work has run to completion.
342  * @wq: workqueue to flush
343  *
344  * Forces execution of the workqueue and blocks until its completion.
345  * This is typically used in driver shutdown handlers.
346  *
347  * This function will sample each workqueue's current insert_sequence number and
348  * will sleep until the head sequence is greater than or equal to that.  This
349  * means that we sleep until all works which were queued on entry have been
350  * handled, but we are not livelocked by new incoming ones.
351  *
352  * This function used to run the workqueues itself.  Now we just wait for the
353  * helper threads to do it.
354  */
355 void fastcall flush_workqueue(struct workqueue_struct *wq)
356 {
357         might_sleep();
358
359         if (is_single_threaded(wq)) {
360                 /* Always use first cpu's area. */
361                 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
362         } else {
363                 int cpu;
364
365                 mutex_lock(&workqueue_mutex);
366                 for_each_online_cpu(cpu)
367                         flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
368                 mutex_unlock(&workqueue_mutex);
369         }
370 }
371 EXPORT_SYMBOL_GPL(flush_workqueue);
372
373 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
374                                                    int cpu, int freezeable)
375 {
376         struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
377         struct task_struct *p;
378
379         spin_lock_init(&cwq->lock);
380         cwq->wq = wq;
381         cwq->thread = NULL;
382         cwq->insert_sequence = 0;
383         cwq->remove_sequence = 0;
384         cwq->freezeable = freezeable;
385         INIT_LIST_HEAD(&cwq->worklist);
386         init_waitqueue_head(&cwq->more_work);
387         init_waitqueue_head(&cwq->work_done);
388
389         if (is_single_threaded(wq))
390                 p = kthread_create(worker_thread, cwq, "%s", wq->name);
391         else
392                 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
393         if (IS_ERR(p))
394                 return NULL;
395         cwq->thread = p;
396         return p;
397 }
398
399 struct workqueue_struct *__create_workqueue(const char *name,
400                                             int singlethread, int freezeable)
401 {
402         int cpu, destroy = 0;
403         struct workqueue_struct *wq;
404         struct task_struct *p;
405
406         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
407         if (!wq)
408                 return NULL;
409
410         wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
411         if (!wq->cpu_wq) {
412                 kfree(wq);
413                 return NULL;
414         }
415
416         wq->name = name;
417         mutex_lock(&workqueue_mutex);
418         if (singlethread) {
419                 INIT_LIST_HEAD(&wq->list);
420                 p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
421                 if (!p)
422                         destroy = 1;
423                 else
424                         wake_up_process(p);
425         } else {
426                 list_add(&wq->list, &workqueues);
427                 for_each_online_cpu(cpu) {
428                         p = create_workqueue_thread(wq, cpu, freezeable);
429                         if (p) {
430                                 kthread_bind(p, cpu);
431                                 wake_up_process(p);
432                         } else
433                                 destroy = 1;
434                 }
435         }
436         mutex_unlock(&workqueue_mutex);
437
438         /*
439          * Was there any error during startup? If yes then clean up:
440          */
441         if (destroy) {
442                 destroy_workqueue(wq);
443                 wq = NULL;
444         }
445         return wq;
446 }
447 EXPORT_SYMBOL_GPL(__create_workqueue);
448
449 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
450 {
451         struct cpu_workqueue_struct *cwq;
452         unsigned long flags;
453         struct task_struct *p;
454
455         cwq = per_cpu_ptr(wq->cpu_wq, cpu);
456         spin_lock_irqsave(&cwq->lock, flags);
457         p = cwq->thread;
458         cwq->thread = NULL;
459         spin_unlock_irqrestore(&cwq->lock, flags);
460         if (p)
461                 kthread_stop(p);
462 }
463
464 /**
465  * destroy_workqueue - safely terminate a workqueue
466  * @wq: target workqueue
467  *
468  * Safely destroy a workqueue. All work currently pending will be done first.
469  */
470 void destroy_workqueue(struct workqueue_struct *wq)
471 {
472         int cpu;
473
474         flush_workqueue(wq);
475
476         /* We don't need the distraction of CPUs appearing and vanishing. */
477         mutex_lock(&workqueue_mutex);
478         if (is_single_threaded(wq))
479                 cleanup_workqueue_thread(wq, singlethread_cpu);
480         else {
481                 for_each_online_cpu(cpu)
482                         cleanup_workqueue_thread(wq, cpu);
483                 list_del(&wq->list);
484         }
485         mutex_unlock(&workqueue_mutex);
486         free_percpu(wq->cpu_wq);
487         kfree(wq);
488 }
489 EXPORT_SYMBOL_GPL(destroy_workqueue);
490
491 static struct workqueue_struct *keventd_wq;
492
493 /**
494  * schedule_work - put work task in global workqueue
495  * @work: job to be done
496  *
497  * This puts a job in the kernel-global workqueue.
498  */
499 int fastcall schedule_work(struct work_struct *work)
500 {
501         return queue_work(keventd_wq, work);
502 }
503 EXPORT_SYMBOL(schedule_work);
504
505 /**
506  * schedule_delayed_work - put work task in global workqueue after delay
507  * @dwork: job to be done
508  * @delay: number of jiffies to wait or 0 for immediate execution
509  *
510  * After waiting for a given time this puts a job in the kernel-global
511  * workqueue.
512  */
513 int fastcall schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
514 {
515         return queue_delayed_work(keventd_wq, dwork, delay);
516 }
517 EXPORT_SYMBOL(schedule_delayed_work);
518
519 /**
520  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
521  * @cpu: cpu to use
522  * @dwork: job to be done
523  * @delay: number of jiffies to wait
524  *
525  * After waiting for a given time this puts a job in the kernel-global
526  * workqueue on the specified CPU.
527  */
528 int schedule_delayed_work_on(int cpu,
529                         struct delayed_work *dwork, unsigned long delay)
530 {
531         return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
532 }
533 EXPORT_SYMBOL(schedule_delayed_work_on);
534
535 /**
536  * schedule_on_each_cpu - call a function on each online CPU from keventd
537  * @func: the function to call
538  *
539  * Returns zero on success.
540  * Returns -ve errno on failure.
541  *
542  * Appears to be racy against CPU hotplug.
543  *
544  * schedule_on_each_cpu() is very slow.
545  */
546 int schedule_on_each_cpu(work_func_t func)
547 {
548         int cpu;
549         struct work_struct *works;
550
551         works = alloc_percpu(struct work_struct);
552         if (!works)
553                 return -ENOMEM;
554
555         mutex_lock(&workqueue_mutex);
556         for_each_online_cpu(cpu) {
557                 INIT_WORK(per_cpu_ptr(works, cpu), func);
558                 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
559                                 per_cpu_ptr(works, cpu));
560         }
561         mutex_unlock(&workqueue_mutex);
562         flush_workqueue(keventd_wq);
563         free_percpu(works);
564         return 0;
565 }
566
567 void flush_scheduled_work(void)
568 {
569         flush_workqueue(keventd_wq);
570 }
571 EXPORT_SYMBOL(flush_scheduled_work);
572
573 /**
574  * cancel_rearming_delayed_workqueue - reliably kill off a delayed
575  *                      work whose handler rearms the delayed work.
576  * @wq:   the controlling workqueue structure
577  * @dwork: the delayed work struct
578  */
579 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
580                                        struct delayed_work *dwork)
581 {
582         while (!cancel_delayed_work(dwork))
583                 flush_workqueue(wq);
584 }
585 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
586
587 /**
588  * cancel_rearming_delayed_work - reliably kill off a delayed keventd
589  *                      work whose handler rearms the delayed work.
590  * @dwork: the delayed work struct
591  */
592 void cancel_rearming_delayed_work(struct delayed_work *dwork)
593 {
594         cancel_rearming_delayed_workqueue(keventd_wq, dwork);
595 }
596 EXPORT_SYMBOL(cancel_rearming_delayed_work);
597
598 /**
599  * execute_in_process_context - reliably execute the routine with user context
600  * @fn:         the function to execute
601  * @ew:         guaranteed storage for the execute work structure (must
602  *              be available when the work executes)
603  *
604  * Executes the function immediately if process context is available,
605  * otherwise schedules the function for delayed execution.
606  *
607  * Returns:     0 - function was executed
608  *              1 - function was scheduled for execution
609  */
610 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
611 {
612         if (!in_interrupt()) {
613                 fn(&ew->work);
614                 return 0;
615         }
616
617         INIT_WORK(&ew->work, fn);
618         schedule_work(&ew->work);
619
620         return 1;
621 }
622 EXPORT_SYMBOL_GPL(execute_in_process_context);
623
624 int keventd_up(void)
625 {
626         return keventd_wq != NULL;
627 }
628
629 int current_is_keventd(void)
630 {
631         struct cpu_workqueue_struct *cwq;
632         int cpu = smp_processor_id();   /* preempt-safe: keventd is per-cpu */
633         int ret = 0;
634
635         BUG_ON(!keventd_wq);
636
637         cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
638         if (current == cwq->thread)
639                 ret = 1;
640
641         return ret;
642
643 }
644
645 #ifdef CONFIG_HOTPLUG_CPU
646 /* Take the work from this (downed) CPU. */
647 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
648 {
649         struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
650         struct list_head list;
651         struct work_struct *work;
652
653         spin_lock_irq(&cwq->lock);
654         list_replace_init(&cwq->worklist, &list);
655
656         while (!list_empty(&list)) {
657                 printk("Taking work for %s\n", wq->name);
658                 work = list_entry(list.next,struct work_struct,entry);
659                 list_del(&work->entry);
660                 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
661         }
662         spin_unlock_irq(&cwq->lock);
663 }
664
665 /* We're holding the cpucontrol mutex here */
666 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
667                                   unsigned long action,
668                                   void *hcpu)
669 {
670         unsigned int hotcpu = (unsigned long)hcpu;
671         struct workqueue_struct *wq;
672
673         switch (action) {
674         case CPU_UP_PREPARE:
675                 mutex_lock(&workqueue_mutex);
676                 /* Create a new workqueue thread for it. */
677                 list_for_each_entry(wq, &workqueues, list) {
678                         if (!create_workqueue_thread(wq, hotcpu, 0)) {
679                                 printk("workqueue for %i failed\n", hotcpu);
680                                 return NOTIFY_BAD;
681                         }
682                 }
683                 break;
684
685         case CPU_ONLINE:
686                 /* Kick off worker threads. */
687                 list_for_each_entry(wq, &workqueues, list) {
688                         struct cpu_workqueue_struct *cwq;
689
690                         cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
691                         kthread_bind(cwq->thread, hotcpu);
692                         wake_up_process(cwq->thread);
693                 }
694                 mutex_unlock(&workqueue_mutex);
695                 break;
696
697         case CPU_UP_CANCELED:
698                 list_for_each_entry(wq, &workqueues, list) {
699                         if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
700                                 continue;
701                         /* Unbind so it can run. */
702                         kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
703                                      any_online_cpu(cpu_online_map));
704                         cleanup_workqueue_thread(wq, hotcpu);
705                 }
706                 mutex_unlock(&workqueue_mutex);
707                 break;
708
709         case CPU_DOWN_PREPARE:
710                 mutex_lock(&workqueue_mutex);
711                 break;
712
713         case CPU_DOWN_FAILED:
714                 mutex_unlock(&workqueue_mutex);
715                 break;
716
717         case CPU_DEAD:
718                 list_for_each_entry(wq, &workqueues, list)
719                         cleanup_workqueue_thread(wq, hotcpu);
720                 list_for_each_entry(wq, &workqueues, list)
721                         take_over_work(wq, hotcpu);
722                 mutex_unlock(&workqueue_mutex);
723                 break;
724         }
725
726         return NOTIFY_OK;
727 }
728 #endif
729
730 void init_workqueues(void)
731 {
732         singlethread_cpu = first_cpu(cpu_possible_map);
733         hotcpu_notifier(workqueue_cpu_callback, 0);
734         keventd_wq = create_workqueue("events");
735         BUG_ON(!keventd_wq);
736 }
737