padata: Make two separate cpumasks
[linux-2.6.git] / kernel / padata.c
1 /*
2  * padata.c - generic interface to process data streams in parallel
3  *
4  * Copyright (C) 2008, 2009 secunet Security Networks AG
5  * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms and conditions of the GNU General Public License,
9  * version 2, as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along with
17  * this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 #include <linux/module.h>
22 #include <linux/cpumask.h>
23 #include <linux/err.h>
24 #include <linux/cpu.h>
25 #include <linux/padata.h>
26 #include <linux/mutex.h>
27 #include <linux/sched.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30
31 #define MAX_SEQ_NR (INT_MAX - NR_CPUS)
32 #define MAX_OBJ_NUM 1000
33
34 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
35 {
36         int cpu, target_cpu;
37
38         target_cpu = cpumask_first(pd->cpumask.pcpu);
39         for (cpu = 0; cpu < cpu_index; cpu++)
40                 target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
41
42         return target_cpu;
43 }
44
45 static int padata_cpu_hash(struct padata_priv *padata)
46 {
47         int cpu_index;
48         struct parallel_data *pd;
49
50         pd =  padata->pd;
51
52         /*
53          * Hash the sequence numbers to the cpus by taking
54          * seq_nr mod. number of cpus in use.
55          */
56         cpu_index =  padata->seq_nr % cpumask_weight(pd->cpumask.pcpu);
57
58         return padata_index_to_cpu(pd, cpu_index);
59 }
60
61 static void padata_parallel_worker(struct work_struct *parallel_work)
62 {
63         struct padata_parallel_queue *pqueue;
64         struct parallel_data *pd;
65         struct padata_instance *pinst;
66         LIST_HEAD(local_list);
67
68         local_bh_disable();
69         pqueue = container_of(parallel_work,
70                               struct padata_parallel_queue, work);
71         pd = pqueue->pd;
72         pinst = pd->pinst;
73
74         spin_lock(&pqueue->parallel.lock);
75         list_replace_init(&pqueue->parallel.list, &local_list);
76         spin_unlock(&pqueue->parallel.lock);
77
78         while (!list_empty(&local_list)) {
79                 struct padata_priv *padata;
80
81                 padata = list_entry(local_list.next,
82                                     struct padata_priv, list);
83
84                 list_del_init(&padata->list);
85
86                 padata->parallel(padata);
87         }
88
89         local_bh_enable();
90 }
91
92 /**
93  * padata_do_parallel - padata parallelization function
94  *
95  * @pinst: padata instance
96  * @padata: object to be parallelized
97  * @cb_cpu: cpu the serialization callback function will run on,
98  *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
99  *
100  * The parallelization callback function will run with BHs off.
101  * Note: Every object which is parallelized by padata_do_parallel
102  * must be seen by padata_do_serial.
103  */
104 int padata_do_parallel(struct padata_instance *pinst,
105                        struct padata_priv *padata, int cb_cpu)
106 {
107         int target_cpu, err;
108         struct padata_parallel_queue *queue;
109         struct parallel_data *pd;
110
111         rcu_read_lock_bh();
112
113         pd = rcu_dereference(pinst->pd);
114
115         err = -EINVAL;
116         if (!(pinst->flags & PADATA_INIT))
117                 goto out;
118
119         if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
120                 goto out;
121
122         err =  -EBUSY;
123         if ((pinst->flags & PADATA_RESET))
124                 goto out;
125
126         if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
127                 goto out;
128
129         err = 0;
130         atomic_inc(&pd->refcnt);
131         padata->pd = pd;
132         padata->cb_cpu = cb_cpu;
133
134         if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr))
135                 atomic_set(&pd->seq_nr, -1);
136
137         padata->seq_nr = atomic_inc_return(&pd->seq_nr);
138
139         target_cpu = padata_cpu_hash(padata);
140         queue = per_cpu_ptr(pd->pqueue, target_cpu);
141
142         spin_lock(&queue->parallel.lock);
143         list_add_tail(&padata->list, &queue->parallel.list);
144         spin_unlock(&queue->parallel.lock);
145
146         queue_work_on(target_cpu, pinst->wq, &queue->work);
147
148 out:
149         rcu_read_unlock_bh();
150
151         return err;
152 }
153 EXPORT_SYMBOL(padata_do_parallel);
154
155 /*
156  * padata_get_next - Get the next object that needs serialization.
157  *
158  * Return values are:
159  *
160  * A pointer to the control struct of the next object that needs
161  * serialization, if present in one of the percpu reorder queues.
162  *
163  * NULL, if all percpu reorder queues are empty.
164  *
165  * -EINPROGRESS, if the next object that needs serialization will
166  *  be parallel processed by another cpu and is not yet present in
167  *  the cpu's reorder queue.
168  *
169  * -ENODATA, if this cpu has to do the parallel processing for
170  *  the next object.
171  */
172 static struct padata_priv *padata_get_next(struct parallel_data *pd)
173 {
174         int cpu, num_cpus;
175         int next_nr, next_index;
176         struct padata_parallel_queue *queue, *next_queue;
177         struct padata_priv *padata;
178         struct padata_list *reorder;
179
180         num_cpus = cpumask_weight(pd->cpumask.pcpu);
181
182         /*
183          * Calculate the percpu reorder queue and the sequence
184          * number of the next object.
185          */
186         next_nr = pd->processed;
187         next_index = next_nr % num_cpus;
188         cpu = padata_index_to_cpu(pd, next_index);
189         next_queue = per_cpu_ptr(pd->pqueue, cpu);
190
191         if (unlikely(next_nr > pd->max_seq_nr)) {
192                 next_nr = next_nr - pd->max_seq_nr - 1;
193                 next_index = next_nr % num_cpus;
194                 cpu = padata_index_to_cpu(pd, next_index);
195                 next_queue = per_cpu_ptr(pd->pqueue, cpu);
196                 pd->processed = 0;
197         }
198
199         padata = NULL;
200
201         reorder = &next_queue->reorder;
202
203         if (!list_empty(&reorder->list)) {
204                 padata = list_entry(reorder->list.next,
205                                     struct padata_priv, list);
206
207                 BUG_ON(next_nr != padata->seq_nr);
208
209                 spin_lock(&reorder->lock);
210                 list_del_init(&padata->list);
211                 atomic_dec(&pd->reorder_objects);
212                 spin_unlock(&reorder->lock);
213
214                 pd->processed++;
215
216                 goto out;
217         }
218
219         queue = per_cpu_ptr(pd->pqueue, smp_processor_id());
220         if (queue->cpu_index == next_queue->cpu_index) {
221                 padata = ERR_PTR(-ENODATA);
222                 goto out;
223         }
224
225         padata = ERR_PTR(-EINPROGRESS);
226 out:
227         return padata;
228 }
229
230 static void padata_reorder(struct parallel_data *pd)
231 {
232         struct padata_priv *padata;
233         struct padata_serial_queue *squeue;
234         struct padata_instance *pinst = pd->pinst;
235
236         /*
237          * We need to ensure that only one cpu can work on dequeueing of
238          * the reorder queue the time. Calculating in which percpu reorder
239          * queue the next object will arrive takes some time. A spinlock
240          * would be highly contended. Also it is not clear in which order
241          * the objects arrive to the reorder queues. So a cpu could wait to
242          * get the lock just to notice that there is nothing to do at the
243          * moment. Therefore we use a trylock and let the holder of the lock
244          * care for all the objects enqueued during the holdtime of the lock.
245          */
246         if (!spin_trylock_bh(&pd->lock))
247                 return;
248
249         while (1) {
250                 padata = padata_get_next(pd);
251
252                 /*
253                  * All reorder queues are empty, or the next object that needs
254                  * serialization is parallel processed by another cpu and is
255                  * still on it's way to the cpu's reorder queue, nothing to
256                  * do for now.
257                  */
258                 if (!padata || PTR_ERR(padata) == -EINPROGRESS)
259                         break;
260
261                 /*
262                  * This cpu has to do the parallel processing of the next
263                  * object. It's waiting in the cpu's parallelization queue,
264                  * so exit imediately.
265                  */
266                 if (PTR_ERR(padata) == -ENODATA) {
267                         del_timer(&pd->timer);
268                         spin_unlock_bh(&pd->lock);
269                         return;
270                 }
271
272                 squeue = per_cpu_ptr(pd->squeue, padata->cb_cpu);
273
274                 spin_lock(&squeue->serial.lock);
275                 list_add_tail(&padata->list, &squeue->serial.list);
276                 spin_unlock(&squeue->serial.lock);
277
278                 queue_work_on(padata->cb_cpu, pinst->wq, &squeue->work);
279         }
280
281         spin_unlock_bh(&pd->lock);
282
283         /*
284          * The next object that needs serialization might have arrived to
285          * the reorder queues in the meantime, we will be called again
286          * from the timer function if noone else cares for it.
287          */
288         if (atomic_read(&pd->reorder_objects)
289                         && !(pinst->flags & PADATA_RESET))
290                 mod_timer(&pd->timer, jiffies + HZ);
291         else
292                 del_timer(&pd->timer);
293
294         return;
295 }
296
297 static void padata_reorder_timer(unsigned long arg)
298 {
299         struct parallel_data *pd = (struct parallel_data *)arg;
300
301         padata_reorder(pd);
302 }
303
304 static void padata_serial_worker(struct work_struct *serial_work)
305 {
306         struct padata_serial_queue *squeue;
307         struct parallel_data *pd;
308         LIST_HEAD(local_list);
309
310         local_bh_disable();
311         squeue = container_of(serial_work, struct padata_serial_queue, work);
312         pd = squeue->pd;
313
314         spin_lock(&squeue->serial.lock);
315         list_replace_init(&squeue->serial.list, &local_list);
316         spin_unlock(&squeue->serial.lock);
317
318         while (!list_empty(&local_list)) {
319                 struct padata_priv *padata;
320
321                 padata = list_entry(local_list.next,
322                                     struct padata_priv, list);
323
324                 list_del_init(&padata->list);
325
326                 padata->serial(padata);
327                 atomic_dec(&pd->refcnt);
328         }
329         local_bh_enable();
330 }
331
332 /**
333  * padata_do_serial - padata serialization function
334  *
335  * @padata: object to be serialized.
336  *
337  * padata_do_serial must be called for every parallelized object.
338  * The serialization callback function will run with BHs off.
339  */
340 void padata_do_serial(struct padata_priv *padata)
341 {
342         int cpu;
343         struct padata_parallel_queue *pqueue;
344         struct parallel_data *pd;
345
346         pd = padata->pd;
347
348         cpu = get_cpu();
349         pqueue = per_cpu_ptr(pd->pqueue, cpu);
350
351         spin_lock(&pqueue->reorder.lock);
352         atomic_inc(&pd->reorder_objects);
353         list_add_tail(&padata->list, &pqueue->reorder.list);
354         spin_unlock(&pqueue->reorder.lock);
355
356         put_cpu();
357
358         padata_reorder(pd);
359 }
360 EXPORT_SYMBOL(padata_do_serial);
361
362 static int padata_setup_cpumasks(struct parallel_data *pd,
363                                  const struct cpumask *pcpumask,
364                                  const struct cpumask *cbcpumask)
365 {
366         if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
367                 return -ENOMEM;
368
369         cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_active_mask);
370         if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
371                 free_cpumask_var(pd->cpumask.cbcpu);
372                 return -ENOMEM;
373         }
374
375         cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_active_mask);
376         return 0;
377 }
378
379 static void __padata_list_init(struct padata_list *pd_list)
380 {
381         INIT_LIST_HEAD(&pd_list->list);
382         spin_lock_init(&pd_list->lock);
383 }
384
385 /* Initialize all percpu queues used by serial workers */
386 static void padata_init_squeues(struct parallel_data *pd)
387 {
388         int cpu;
389         struct padata_serial_queue *squeue;
390
391         for_each_cpu(cpu, pd->cpumask.cbcpu) {
392                 squeue = per_cpu_ptr(pd->squeue, cpu);
393                 squeue->pd = pd;
394                 __padata_list_init(&squeue->serial);
395                 INIT_WORK(&squeue->work, padata_serial_worker);
396         }
397 }
398
399 /* Initialize all percpu queues used by parallel workers */
400 static void padata_init_pqueues(struct parallel_data *pd)
401 {
402         int cpu_index, num_cpus, cpu;
403         struct padata_parallel_queue *pqueue;
404
405         cpu_index = 0;
406         for_each_cpu(cpu, pd->cpumask.pcpu) {
407                 pqueue = per_cpu_ptr(pd->pqueue, cpu);
408                 pqueue->pd = pd;
409                 pqueue->cpu_index = cpu_index;
410
411                 __padata_list_init(&pqueue->reorder);
412                 __padata_list_init(&pqueue->parallel);
413                 INIT_WORK(&pqueue->work, padata_parallel_worker);
414                 atomic_set(&pqueue->num_obj, 0);
415         }
416
417         num_cpus = cpumask_weight(pd->cpumask.pcpu);
418         pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1;
419 }
420
421 /* Allocate and initialize the internal cpumask dependend resources. */
422 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
423                                              const struct cpumask *pcpumask,
424                                              const struct cpumask *cbcpumask)
425 {
426         struct parallel_data *pd;
427
428         pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
429         if (!pd)
430                 goto err;
431
432         pd->pqueue = alloc_percpu(struct padata_parallel_queue);
433         if (!pd->pqueue)
434                 goto err_free_pd;
435
436         pd->squeue = alloc_percpu(struct padata_serial_queue);
437         if (!pd->squeue)
438                 goto err_free_pqueue;
439         if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
440                 goto err_free_squeue;
441
442         padata_init_pqueues(pd);
443         padata_init_squeues(pd);
444         setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
445         atomic_set(&pd->seq_nr, -1);
446         atomic_set(&pd->reorder_objects, 0);
447         atomic_set(&pd->refcnt, 0);
448         pd->pinst = pinst;
449         spin_lock_init(&pd->lock);
450
451         return pd;
452
453 err_free_squeue:
454         free_percpu(pd->squeue);
455 err_free_pqueue:
456         free_percpu(pd->pqueue);
457 err_free_pd:
458         kfree(pd);
459 err:
460         return NULL;
461 }
462
463 static void padata_free_pd(struct parallel_data *pd)
464 {
465         free_cpumask_var(pd->cpumask.pcpu);
466         free_cpumask_var(pd->cpumask.cbcpu);
467         free_percpu(pd->pqueue);
468         free_percpu(pd->squeue);
469         kfree(pd);
470 }
471
472 /* Flush all objects out of the padata queues. */
473 static void padata_flush_queues(struct parallel_data *pd)
474 {
475         int cpu;
476         struct padata_parallel_queue *pqueue;
477         struct padata_serial_queue *squeue;
478
479         for_each_cpu(cpu, pd->cpumask.pcpu) {
480                 pqueue = per_cpu_ptr(pd->pqueue, cpu);
481                 flush_work(&pqueue->work);
482         }
483
484         del_timer_sync(&pd->timer);
485
486         if (atomic_read(&pd->reorder_objects))
487                 padata_reorder(pd);
488
489         for_each_cpu(cpu, pd->cpumask.cbcpu) {
490                 squeue = per_cpu_ptr(pd->squeue, cpu);
491                 flush_work(&squeue->work);
492         }
493
494         BUG_ON(atomic_read(&pd->refcnt) != 0);
495 }
496
497 static void __padata_start(struct padata_instance *pinst)
498 {
499         pinst->flags |= PADATA_INIT;
500 }
501
502 static void __padata_stop(struct padata_instance *pinst)
503 {
504         if (!(pinst->flags & PADATA_INIT))
505                 return;
506
507         pinst->flags &= ~PADATA_INIT;
508
509         synchronize_rcu();
510
511         get_online_cpus();
512         padata_flush_queues(pinst->pd);
513         put_online_cpus();
514 }
515
516 /* Replace the internal control stucture with a new one. */
517 static void padata_replace(struct padata_instance *pinst,
518                            struct parallel_data *pd_new)
519 {
520         struct parallel_data *pd_old = pinst->pd;
521         int notification_mask = 0;
522
523         pinst->flags |= PADATA_RESET;
524
525         rcu_assign_pointer(pinst->pd, pd_new);
526
527         synchronize_rcu();
528         if (!pd_old)
529                 goto out;
530
531         padata_flush_queues(pd_old);
532         if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu))
533                 notification_mask |= PADATA_CPU_PARALLEL;
534         if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
535                 notification_mask |= PADATA_CPU_SERIAL;
536
537         padata_free_pd(pd_old);
538         if (notification_mask)
539                 blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
540                                              notification_mask, pinst);
541
542 out:
543         pinst->flags &= ~PADATA_RESET;
544 }
545
546 /**
547  * padata_register_cpumask_notifier - Registers a notifier that will be called
548  *                             if either pcpu or cbcpu or both cpumasks change.
549  *
550  * @pinst: A poineter to padata instance
551  * @nblock: A pointer to notifier block.
552  */
553 int padata_register_cpumask_notifier(struct padata_instance *pinst,
554                                      struct notifier_block *nblock)
555 {
556         return blocking_notifier_chain_register(&pinst->cpumask_change_notifier,
557                                                 nblock);
558 }
559 EXPORT_SYMBOL(padata_register_cpumask_notifier);
560
561 /**
562  * padata_unregister_cpumask_notifier - Unregisters cpumask notifier
563  *        registered earlier  using padata_register_cpumask_notifier
564  *
565  * @pinst: A pointer to data instance.
566  * @nlock: A pointer to notifier block.
567  */
568 int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
569                                        struct notifier_block *nblock)
570 {
571         return blocking_notifier_chain_unregister(
572                 &pinst->cpumask_change_notifier,
573                 nblock);
574 }
575 EXPORT_SYMBOL(padata_unregister_cpumask_notifier);
576
577
578 /* If cpumask contains no active cpu, we mark the instance as invalid. */
579 static bool padata_validate_cpumask(struct padata_instance *pinst,
580                                     const struct cpumask *cpumask)
581 {
582         if (!cpumask_intersects(cpumask, cpu_active_mask)) {
583                 pinst->flags |= PADATA_INVALID;
584                 return false;
585         }
586
587         pinst->flags &= ~PADATA_INVALID;
588         return true;
589 }
590
591 /**
592  * padata_get_cpumask: Fetch serial or parallel cpumask from the
593  *                     given padata instance and copy it to @out_mask
594  *
595  * @pinst: A pointer to padata instance
596  * @cpumask_type: Specifies which cpumask will be copied.
597  *                Possible values are PADATA_CPU_SERIAL *or* PADATA_CPU_PARALLEL
598  *                corresponding to serial and parallel cpumask respectively.
599  * @out_mask: A pointer to cpumask structure where selected
600  *            cpumask will be copied.
601  */
602 int padata_get_cpumask(struct padata_instance *pinst,
603                        int cpumask_type, struct cpumask *out_mask)
604 {
605         struct parallel_data *pd;
606         int ret = 0;
607
608         rcu_read_lock_bh();
609         pd = rcu_dereference(pinst->pd);
610         switch (cpumask_type) {
611         case PADATA_CPU_SERIAL:
612                 cpumask_copy(out_mask, pd->cpumask.cbcpu);
613                 break;
614         case PADATA_CPU_PARALLEL:
615                 cpumask_copy(out_mask, pd->cpumask.pcpu);
616                 break;
617         default:
618                 ret = -EINVAL;
619         }
620
621         rcu_read_unlock_bh();
622         return ret;
623 }
624 EXPORT_SYMBOL(padata_get_cpumask);
625
626 /**
627  * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
628  *                     equivalent to @cpumask.
629  *
630  * @pinst: padata instance
631  * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
632  *                to parallel and serial cpumasks respectively.
633  * @cpumask: the cpumask to use
634  */
635 int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
636                        cpumask_var_t cpumask)
637 {
638         struct cpumask *serial_mask, *parallel_mask;
639
640         switch (cpumask_type) {
641         case PADATA_CPU_PARALLEL:
642                 serial_mask = pinst->cpumask.cbcpu;
643                 parallel_mask = cpumask;
644                 break;
645         case PADATA_CPU_SERIAL:
646                 parallel_mask = pinst->cpumask.pcpu;
647                 serial_mask = cpumask;
648                 break;
649         default:
650                 return -EINVAL;
651         }
652
653         return __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
654 }
655 EXPORT_SYMBOL(padata_set_cpumask);
656
657 /**
658  * __padata_set_cpumasks - Set both parallel and serial cpumasks. The first
659  *                         one is used by parallel workers and the second one
660  *                         by the wokers doing serialization.
661  *
662  * @pinst: padata instance
663  * @pcpumask: the cpumask to use for parallel workers
664  * @cbcpumask: the cpumsak to use for serial workers
665  */
666 int __padata_set_cpumasks(struct padata_instance *pinst,
667                           cpumask_var_t pcpumask, cpumask_var_t cbcpumask)
668 {
669         int valid;
670         int err = 0;
671         struct parallel_data *pd = NULL;
672
673         mutex_lock(&pinst->lock);
674
675         valid = padata_validate_cpumask(pinst, pcpumask);
676         if (!valid) {
677                 __padata_stop(pinst);
678                 goto out_replace;
679         }
680
681         valid = padata_validate_cpumask(pinst, cbcpumask);
682         if (!valid) {
683                 __padata_stop(pinst);
684                 goto out_replace;
685         }
686
687         get_online_cpus();
688
689         pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
690         if (!pd) {
691                 err = -ENOMEM;
692                 goto out;
693         }
694
695 out_replace:
696         cpumask_copy(pinst->cpumask.pcpu, pcpumask);
697         cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
698
699         padata_replace(pinst, pd);
700
701         if (valid)
702                 __padata_start(pinst);
703
704 out:
705         put_online_cpus();
706
707         mutex_unlock(&pinst->lock);
708
709         return err;
710
711 }
712 EXPORT_SYMBOL(__padata_set_cpumasks);
713
714 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
715 {
716         struct parallel_data *pd;
717
718         if (cpumask_test_cpu(cpu, cpu_active_mask)) {
719                 pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
720                                      pinst->cpumask.cbcpu);
721                 if (!pd)
722                         return -ENOMEM;
723
724                 padata_replace(pinst, pd);
725
726                 if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
727                     padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
728                         __padata_start(pinst);
729         }
730
731         return 0;
732 }
733
734  /**
735  * padata_add_cpu - add a cpu to one or both(parallel and serial)
736  *                  padata cpumasks.
737  *
738  * @pinst: padata instance
739  * @cpu: cpu to add
740  * @mask: bitmask of flags specifying to which cpumask @cpu shuld be added.
741  *        The @mask may be any combination of the following flags:
742  *          PADATA_CPU_SERIAL   - serial cpumask
743  *          PADATA_CPU_PARALLEL - parallel cpumask
744  */
745
746 int padata_add_cpu(struct padata_instance *pinst, int cpu, int mask)
747 {
748         int err;
749
750         if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
751                 return -EINVAL;
752
753         mutex_lock(&pinst->lock);
754
755         get_online_cpus();
756         if (mask & PADATA_CPU_SERIAL)
757                 cpumask_set_cpu(cpu, pinst->cpumask.cbcpu);
758         if (mask & PADATA_CPU_PARALLEL)
759                 cpumask_set_cpu(cpu, pinst->cpumask.pcpu);
760
761         err = __padata_add_cpu(pinst, cpu);
762         put_online_cpus();
763
764         mutex_unlock(&pinst->lock);
765
766         return err;
767 }
768 EXPORT_SYMBOL(padata_add_cpu);
769
770 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
771 {
772         struct parallel_data *pd = NULL;
773
774         if (cpumask_test_cpu(cpu, cpu_online_mask)) {
775
776                 if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
777                     !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu)) {
778                         __padata_stop(pinst);
779                         padata_replace(pinst, pd);
780                         goto out;
781                 }
782
783                 pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
784                                      pinst->cpumask.cbcpu);
785                 if (!pd)
786                         return -ENOMEM;
787
788                 padata_replace(pinst, pd);
789         }
790
791 out:
792         return 0;
793 }
794
795  /**
796  * padata_remove_cpu - remove a cpu from the one or both(serial and paralell)
797  *                     padata cpumasks.
798  *
799  * @pinst: padata instance
800  * @cpu: cpu to remove
801  * @mask: bitmask specifying from which cpumask @cpu should be removed
802  *        The @mask may be any combination of the following flags:
803  *          PADATA_CPU_SERIAL   - serial cpumask
804  *          PADATA_CPU_PARALLEL - parallel cpumask
805  */
806 int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask)
807 {
808         int err;
809
810         if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
811                 return -EINVAL;
812
813         mutex_lock(&pinst->lock);
814
815         get_online_cpus();
816         if (mask & PADATA_CPU_SERIAL)
817                 cpumask_clear_cpu(cpu, pinst->cpumask.cbcpu);
818         if (mask & PADATA_CPU_PARALLEL)
819                 cpumask_clear_cpu(cpu, pinst->cpumask.pcpu);
820
821         err = __padata_remove_cpu(pinst, cpu);
822         put_online_cpus();
823
824         mutex_unlock(&pinst->lock);
825
826         return err;
827 }
828 EXPORT_SYMBOL(padata_remove_cpu);
829
830 /**
831  * padata_start - start the parallel processing
832  *
833  * @pinst: padata instance to start
834  */
835 int padata_start(struct padata_instance *pinst)
836 {
837         int err = 0;
838
839         mutex_lock(&pinst->lock);
840
841         if (pinst->flags & PADATA_INVALID)
842                 err =-EINVAL;
843
844          __padata_start(pinst);
845
846         mutex_unlock(&pinst->lock);
847
848         return err;
849 }
850 EXPORT_SYMBOL(padata_start);
851
852 /**
853  * padata_stop - stop the parallel processing
854  *
855  * @pinst: padata instance to stop
856  */
857 void padata_stop(struct padata_instance *pinst)
858 {
859         mutex_lock(&pinst->lock);
860         __padata_stop(pinst);
861         mutex_unlock(&pinst->lock);
862 }
863 EXPORT_SYMBOL(padata_stop);
864
865 #ifdef CONFIG_HOTPLUG_CPU
866
867 static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
868 {
869         return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
870                 cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
871 }
872
873
874 static int padata_cpu_callback(struct notifier_block *nfb,
875                                unsigned long action, void *hcpu)
876 {
877         int err;
878         struct padata_instance *pinst;
879         int cpu = (unsigned long)hcpu;
880
881         pinst = container_of(nfb, struct padata_instance, cpu_notifier);
882
883         switch (action) {
884         case CPU_ONLINE:
885         case CPU_ONLINE_FROZEN:
886                 if (!pinst_has_cpu(pinst, cpu))
887                         break;
888                 mutex_lock(&pinst->lock);
889                 err = __padata_add_cpu(pinst, cpu);
890                 mutex_unlock(&pinst->lock);
891                 if (err)
892                         return NOTIFY_BAD;
893                 break;
894
895         case CPU_DOWN_PREPARE:
896         case CPU_DOWN_PREPARE_FROZEN:
897                 if (!pinst_has_cpu(pinst, cpu))
898                         break;
899                 mutex_lock(&pinst->lock);
900                 err = __padata_remove_cpu(pinst, cpu);
901                 mutex_unlock(&pinst->lock);
902                 if (err)
903                         return NOTIFY_BAD;
904                 break;
905
906         case CPU_UP_CANCELED:
907         case CPU_UP_CANCELED_FROZEN:
908                 if (!pinst_has_cpu(pinst, cpu))
909                         break;
910                 mutex_lock(&pinst->lock);
911                 __padata_remove_cpu(pinst, cpu);
912                 mutex_unlock(&pinst->lock);
913
914         case CPU_DOWN_FAILED:
915         case CPU_DOWN_FAILED_FROZEN:
916                 if (!pinst_has_cpu(pinst, cpu))
917                         break;
918                 mutex_lock(&pinst->lock);
919                 __padata_add_cpu(pinst, cpu);
920                 mutex_unlock(&pinst->lock);
921         }
922
923         return NOTIFY_OK;
924 }
925 #endif
926
927 /**
928  * padata_alloc - Allocate and initialize padata instance.
929  *                Use default cpumask(cpu_possible_mask)
930  *                for serial and parallel workes.
931  *
932  * @wq: workqueue to use for the allocated padata instance
933  */
934 struct padata_instance *padata_alloc(struct workqueue_struct *wq)
935 {
936         return __padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
937 }
938 EXPORT_SYMBOL(padata_alloc);
939
940 /**
941  * __padata_alloc - allocate and initialize a padata instance
942  *                  and specify cpumasks for serial and parallel workers.
943  *
944  * @wq: workqueue to use for the allocated padata instance
945  * @pcpumask: cpumask that will be used for padata parallelization
946  * @cbcpumask: cpumask that will be used for padata serialization
947  */
948 struct padata_instance *__padata_alloc(struct workqueue_struct *wq,
949                                        const struct cpumask *pcpumask,
950                                        const struct cpumask *cbcpumask)
951 {
952         struct padata_instance *pinst;
953         struct parallel_data *pd = NULL;
954
955         pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
956         if (!pinst)
957                 goto err;
958
959         get_online_cpus();
960         if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
961                 goto err_free_inst;
962         if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
963                 free_cpumask_var(pinst->cpumask.pcpu);
964                 goto err_free_inst;
965         }
966         if (!padata_validate_cpumask(pinst, pcpumask) ||
967             !padata_validate_cpumask(pinst, cbcpumask))
968                 goto err_free_masks;
969
970         pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
971         if (!pd)
972                 goto err_free_masks;
973
974         rcu_assign_pointer(pinst->pd, pd);
975
976         pinst->wq = wq;
977
978         cpumask_copy(pinst->cpumask.pcpu, pcpumask);
979         cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
980
981         pinst->flags = 0;
982
983 #ifdef CONFIG_HOTPLUG_CPU
984         pinst->cpu_notifier.notifier_call = padata_cpu_callback;
985         pinst->cpu_notifier.priority = 0;
986         register_hotcpu_notifier(&pinst->cpu_notifier);
987 #endif
988
989         put_online_cpus();
990
991         BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier);
992         mutex_init(&pinst->lock);
993
994         return pinst;
995
996 err_free_masks:
997         free_cpumask_var(pinst->cpumask.pcpu);
998         free_cpumask_var(pinst->cpumask.cbcpu);
999 err_free_inst:
1000         kfree(pinst);
1001         put_online_cpus();
1002 err:
1003         return NULL;
1004 }
1005 EXPORT_SYMBOL(__padata_alloc);
1006
1007 /**
1008  * padata_free - free a padata instance
1009  *
1010  * @padata_inst: padata instance to free
1011  */
1012 void padata_free(struct padata_instance *pinst)
1013 {
1014 #ifdef CONFIG_HOTPLUG_CPU
1015         unregister_hotcpu_notifier(&pinst->cpu_notifier);
1016 #endif
1017
1018         padata_stop(pinst);
1019         padata_free_pd(pinst->pd);
1020         free_cpumask_var(pinst->cpumask.pcpu);
1021         free_cpumask_var(pinst->cpumask.cbcpu);
1022         kfree(pinst);
1023 }
1024 EXPORT_SYMBOL(padata_free);