]> nv-tegra.nvidia Code Review - linux-2.6.git/blobdiff - kernel/workqueue.c
9p: eliminate callback complexity
[linux-2.6.git] / kernel / workqueue.c
index d9a2d65cc63ea38dace4aac9098a8313476644dc..714afad46539616e4bbcab15240160ca4d188226 100644 (file)
@@ -9,7 +9,7 @@
  * Derived from the taskqueue/keventd code by:
  *
  *   David Woodhouse <dwmw2@infradead.org>
- *   Andrew Morton <andrewm@uow.edu.au>
+ *   Andrew Morton
  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
  *   Theodore Ts'o <tytso@mit.edu>
  *
@@ -159,14 +159,11 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
  */
 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
 {
-       int ret = 0;
+       int ret;
+
+       ret = queue_work_on(get_cpu(), wq, work);
+       put_cpu();
 
-       if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
-               BUG_ON(!list_empty(&work->entry));
-               __queue_work(wq_per_cpu(wq, get_cpu()), work);
-               put_cpu();
-               ret = 1;
-       }
        return ret;
 }
 EXPORT_SYMBOL_GPL(queue_work);
@@ -293,11 +290,11 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
 
                BUG_ON(get_wq_data(work) != cwq);
                work_clear_pending(work);
-               lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
-               lock_acquire(&lockdep_map, 0, 0, 0, 2, _THIS_IP_);
+               lock_map_acquire(&cwq->wq->lockdep_map);
+               lock_map_acquire(&lockdep_map);
                f(work);
-               lock_release(&lockdep_map, 1, _THIS_IP_);
-               lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
+               lock_map_release(&lockdep_map);
+               lock_map_release(&cwq->wq->lockdep_map);
 
                if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
                        printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
@@ -416,13 +413,64 @@ void flush_workqueue(struct workqueue_struct *wq)
        int cpu;
 
        might_sleep();
-       lock_acquire(&wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
-       lock_release(&wq->lockdep_map, 1, _THIS_IP_);
+       lock_map_acquire(&wq->lockdep_map);
+       lock_map_release(&wq->lockdep_map);
        for_each_cpu_mask_nr(cpu, *cpu_map)
                flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
 }
 EXPORT_SYMBOL_GPL(flush_workqueue);
 
+/**
+ * flush_work - block until a work_struct's callback has terminated
+ * @work: the work which is to be flushed
+ *
+ * Returns false if @work has already terminated.
+ *
+ * It is expected that, prior to calling flush_work(), the caller has
+ * arranged for the work to not be requeued, otherwise it doesn't make
+ * sense to use this function.
+ */
+int flush_work(struct work_struct *work)
+{
+       struct cpu_workqueue_struct *cwq;
+       struct list_head *prev;
+       struct wq_barrier barr;
+
+       might_sleep();
+       cwq = get_wq_data(work);
+       if (!cwq)
+               return 0;
+
+       lock_map_acquire(&cwq->wq->lockdep_map);
+       lock_map_release(&cwq->wq->lockdep_map);
+
+       prev = NULL;
+       spin_lock_irq(&cwq->lock);
+       if (!list_empty(&work->entry)) {
+               /*
+                * See the comment near try_to_grab_pending()->smp_rmb().
+                * If it was re-queued under us we are not going to wait.
+                */
+               smp_rmb();
+               if (unlikely(cwq != get_wq_data(work)))
+                       goto out;
+               prev = &work->entry;
+       } else {
+               if (cwq->current_work != work)
+                       goto out;
+               prev = &cwq->worklist;
+       }
+       insert_wq_barrier(cwq, &barr, prev->next);
+out:
+       spin_unlock_irq(&cwq->lock);
+       if (!prev)
+               return 0;
+
+       wait_for_completion(&barr.done);
+       return 1;
+}
+EXPORT_SYMBOL_GPL(flush_work);
+
 /*
  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
  * so this work can't be re-armed in any way.
@@ -488,8 +536,8 @@ static void wait_on_work(struct work_struct *work)
 
        might_sleep();
 
-       lock_acquire(&work->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
-       lock_release(&work->lockdep_map, 1, _THIS_IP_);
+       lock_map_acquire(&work->lockdep_map);
+       lock_map_release(&work->lockdep_map);
 
        cwq = get_wq_data(work);
        if (!cwq)
@@ -641,10 +689,10 @@ int schedule_on_each_cpu(work_func_t func)
                struct work_struct *work = per_cpu_ptr(works, cpu);
 
                INIT_WORK(work, func);
-               set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
-               __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
+               schedule_work_on(cpu, work);
        }
-       flush_workqueue(keventd_wq);
+       for_each_online_cpu(cpu)
+               flush_work(per_cpu_ptr(works, cpu));
        put_online_cpus();
        free_percpu(works);
        return 0;
@@ -781,11 +829,22 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
                err = create_workqueue_thread(cwq, singlethread_cpu);
                start_workqueue_thread(cwq, -1);
        } else {
-               get_online_cpus();
+               cpu_maps_update_begin();
+               /*
+                * We must place this wq on list even if the code below fails.
+                * cpu_down(cpu) can remove cpu from cpu_populated_map before
+                * destroy_workqueue() takes the lock, in that case we leak
+                * cwq[cpu]->thread.
+                */
                spin_lock(&workqueue_lock);
                list_add(&wq->list, &workqueues);
                spin_unlock(&workqueue_lock);
-
+               /*
+                * We must initialize cwqs for each possible cpu even if we
+                * are going to call destroy_workqueue() finally. Otherwise
+                * cpu_up() can hit the uninitialized cwq once we drop the
+                * lock.
+                */
                for_each_possible_cpu(cpu) {
                        cwq = init_cpu_workqueue(wq, cpu);
                        if (err || !cpu_online(cpu))
@@ -793,7 +852,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
                        err = create_workqueue_thread(cwq, cpu);
                        start_workqueue_thread(cwq, cpu);
                }
-               put_online_cpus();
+               cpu_maps_update_done();
        }
 
        if (err) {
@@ -807,18 +866,18 @@ EXPORT_SYMBOL_GPL(__create_workqueue_key);
 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
 {
        /*
-        * Our caller is either destroy_workqueue() or CPU_DEAD,
-        * get_online_cpus() protects cwq->thread.
+        * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
+        * cpu_add_remove_lock protects cwq->thread.
         */
        if (cwq->thread == NULL)
                return;
 
-       lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
-       lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
+       lock_map_acquire(&cwq->wq->lockdep_map);
+       lock_map_release(&cwq->wq->lockdep_map);
 
        flush_cpu_workqueue(cwq);
        /*
-        * If the caller is CPU_DEAD and cwq->worklist was not empty,
+        * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
         * a concurrent flush_workqueue() can insert a barrier after us.
         * However, in that case run_workqueue() won't return and check
         * kthread_should_stop() until it flushes all work_struct's.
@@ -842,14 +901,14 @@ void destroy_workqueue(struct workqueue_struct *wq)
        const cpumask_t *cpu_map = wq_cpu_map(wq);
        int cpu;
 
-       get_online_cpus();
+       cpu_maps_update_begin();
        spin_lock(&workqueue_lock);
        list_del(&wq->list);
        spin_unlock(&workqueue_lock);
 
        for_each_cpu_mask_nr(cpu, *cpu_map)
                cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
-       put_online_cpus();
+       cpu_maps_update_done();
 
        free_percpu(wq->cpu_wq);
        kfree(wq);
@@ -863,6 +922,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
        unsigned int cpu = (unsigned long)hcpu;
        struct cpu_workqueue_struct *cwq;
        struct workqueue_struct *wq;
+       int ret = NOTIFY_OK;
 
        action &= ~CPU_TASKS_FROZEN;
 
@@ -870,7 +930,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
        case CPU_UP_PREPARE:
                cpu_set(cpu, cpu_populated_map);
        }
-
+undo:
        list_for_each_entry(wq, &workqueues, list) {
                cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 
@@ -880,7 +940,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                                break;
                        printk(KERN_ERR "workqueue [%s] for %i failed\n",
                                wq->name, cpu);
-                       return NOTIFY_BAD;
+                       action = CPU_UP_CANCELED;
+                       ret = NOTIFY_BAD;
+                       goto undo;
 
                case CPU_ONLINE:
                        start_workqueue_thread(cwq, cpu);
@@ -888,7 +950,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
                case CPU_UP_CANCELED:
                        start_workqueue_thread(cwq, -1);
-               case CPU_DEAD:
+               case CPU_POST_DEAD:
                        cleanup_workqueue_thread(cwq);
                        break;
                }
@@ -896,11 +958,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
        switch (action) {
        case CPU_UP_CANCELED:
-       case CPU_DEAD:
+       case CPU_POST_DEAD:
                cpu_clear(cpu, cpu_populated_map);
        }
 
-       return NOTIFY_OK;
+       return ret;
 }
 
 void __init init_workqueues(void)