padata: Dont scale the parallel objects with the cpus
[linux-2.6.git] / kernel / padata.c
1 /*
2  * padata.c - generic interface to process data streams in parallel
3  *
4  * Copyright (C) 2008, 2009 secunet Security Networks AG
5  * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms and conditions of the GNU General Public License,
9  * version 2, as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along with
17  * this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 #include <linux/module.h>
22 #include <linux/cpumask.h>
23 #include <linux/err.h>
24 #include <linux/cpu.h>
25 #include <linux/padata.h>
26 #include <linux/mutex.h>
27 #include <linux/sched.h>
28 #include <linux/rcupdate.h>
29
30 #define MAX_SEQ_NR INT_MAX - NR_CPUS
31 #define MAX_OBJ_NUM 1000
32
33 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
34 {
35         int cpu, target_cpu;
36
37         target_cpu = cpumask_first(pd->cpumask);
38         for (cpu = 0; cpu < cpu_index; cpu++)
39                 target_cpu = cpumask_next(target_cpu, pd->cpumask);
40
41         return target_cpu;
42 }
43
44 static int padata_cpu_hash(struct padata_priv *padata)
45 {
46         int cpu_index;
47         struct parallel_data *pd;
48
49         pd =  padata->pd;
50
51         /*
52          * Hash the sequence numbers to the cpus by taking
53          * seq_nr mod. number of cpus in use.
54          */
55         cpu_index =  padata->seq_nr % cpumask_weight(pd->cpumask);
56
57         return padata_index_to_cpu(pd, cpu_index);
58 }
59
60 static void padata_parallel_worker(struct work_struct *work)
61 {
62         struct padata_queue *queue;
63         struct parallel_data *pd;
64         struct padata_instance *pinst;
65         LIST_HEAD(local_list);
66
67         local_bh_disable();
68         queue = container_of(work, struct padata_queue, pwork);
69         pd = queue->pd;
70         pinst = pd->pinst;
71
72         spin_lock(&queue->parallel.lock);
73         list_replace_init(&queue->parallel.list, &local_list);
74         spin_unlock(&queue->parallel.lock);
75
76         while (!list_empty(&local_list)) {
77                 struct padata_priv *padata;
78
79                 padata = list_entry(local_list.next,
80                                     struct padata_priv, list);
81
82                 list_del_init(&padata->list);
83
84                 padata->parallel(padata);
85         }
86
87         local_bh_enable();
88 }
89
90 /*
91  * padata_do_parallel - padata parallelization function
92  *
93  * @pinst: padata instance
94  * @padata: object to be parallelized
95  * @cb_cpu: cpu the serialization callback function will run on,
96  *          must be in the cpumask of padata.
97  *
98  * The parallelization callback function will run with BHs off.
99  * Note: Every object which is parallelized by padata_do_parallel
100  * must be seen by padata_do_serial.
101  */
102 int padata_do_parallel(struct padata_instance *pinst,
103                        struct padata_priv *padata, int cb_cpu)
104 {
105         int target_cpu, err;
106         struct padata_queue *queue;
107         struct parallel_data *pd;
108
109         rcu_read_lock_bh();
110
111         pd = rcu_dereference(pinst->pd);
112
113         err = 0;
114         if (!(pinst->flags & PADATA_INIT))
115                 goto out;
116
117         err =  -EBUSY;
118         if ((pinst->flags & PADATA_RESET))
119                 goto out;
120
121         if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
122                 goto out;
123
124         err = -EINVAL;
125         if (!cpumask_test_cpu(cb_cpu, pd->cpumask))
126                 goto out;
127
128         err = -EINPROGRESS;
129         atomic_inc(&pd->refcnt);
130         padata->pd = pd;
131         padata->cb_cpu = cb_cpu;
132
133         if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr))
134                 atomic_set(&pd->seq_nr, -1);
135
136         padata->seq_nr = atomic_inc_return(&pd->seq_nr);
137
138         target_cpu = padata_cpu_hash(padata);
139         queue = per_cpu_ptr(pd->queue, target_cpu);
140
141         spin_lock(&queue->parallel.lock);
142         list_add_tail(&padata->list, &queue->parallel.list);
143         spin_unlock(&queue->parallel.lock);
144
145         queue_work_on(target_cpu, pinst->wq, &queue->pwork);
146
147 out:
148         rcu_read_unlock_bh();
149
150         return err;
151 }
152 EXPORT_SYMBOL(padata_do_parallel);
153
154 static struct padata_priv *padata_get_next(struct parallel_data *pd)
155 {
156         int cpu, num_cpus, empty, calc_seq_nr;
157         int seq_nr, next_nr, overrun, next_overrun;
158         struct padata_queue *queue, *next_queue;
159         struct padata_priv *padata;
160         struct padata_list *reorder;
161
162         empty = 0;
163         next_nr = -1;
164         next_overrun = 0;
165         next_queue = NULL;
166
167         num_cpus = cpumask_weight(pd->cpumask);
168
169         for_each_cpu(cpu, pd->cpumask) {
170                 queue = per_cpu_ptr(pd->queue, cpu);
171                 reorder = &queue->reorder;
172
173                 /*
174                  * Calculate the seq_nr of the object that should be
175                  * next in this queue.
176                  */
177                 overrun = 0;
178                 calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus)
179                                + queue->cpu_index;
180
181                 if (unlikely(calc_seq_nr > pd->max_seq_nr)) {
182                         calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1;
183                         overrun = 1;
184                 }
185
186                 if (!list_empty(&reorder->list)) {
187                         padata = list_entry(reorder->list.next,
188                                             struct padata_priv, list);
189
190                         seq_nr  = padata->seq_nr;
191                         BUG_ON(calc_seq_nr != seq_nr);
192                 } else {
193                         seq_nr = calc_seq_nr;
194                         empty++;
195                 }
196
197                 if (next_nr < 0 || seq_nr < next_nr
198                     || (next_overrun && !overrun)) {
199                         next_nr = seq_nr;
200                         next_overrun = overrun;
201                         next_queue = queue;
202                 }
203         }
204
205         padata = NULL;
206
207         if (empty == num_cpus)
208                 goto out;
209
210         reorder = &next_queue->reorder;
211
212         if (!list_empty(&reorder->list)) {
213                 padata = list_entry(reorder->list.next,
214                                     struct padata_priv, list);
215
216                 if (unlikely(next_overrun)) {
217                         for_each_cpu(cpu, pd->cpumask) {
218                                 queue = per_cpu_ptr(pd->queue, cpu);
219                                 atomic_set(&queue->num_obj, 0);
220                         }
221                 }
222
223                 spin_lock(&reorder->lock);
224                 list_del_init(&padata->list);
225                 atomic_dec(&pd->reorder_objects);
226                 spin_unlock(&reorder->lock);
227
228                 atomic_inc(&next_queue->num_obj);
229
230                 goto out;
231         }
232
233         if (next_nr % num_cpus == next_queue->cpu_index) {
234                 padata = ERR_PTR(-ENODATA);
235                 goto out;
236         }
237
238         padata = ERR_PTR(-EINPROGRESS);
239 out:
240         return padata;
241 }
242
243 static void padata_reorder(struct parallel_data *pd)
244 {
245         struct padata_priv *padata;
246         struct padata_queue *queue;
247         struct padata_instance *pinst = pd->pinst;
248
249 try_again:
250         if (!spin_trylock_bh(&pd->lock))
251                 goto out;
252
253         while (1) {
254                 padata = padata_get_next(pd);
255
256                 if (!padata || PTR_ERR(padata) == -EINPROGRESS)
257                         break;
258
259                 if (PTR_ERR(padata) == -ENODATA) {
260                         spin_unlock_bh(&pd->lock);
261                         goto out;
262                 }
263
264                 queue = per_cpu_ptr(pd->queue, padata->cb_cpu);
265
266                 spin_lock(&queue->serial.lock);
267                 list_add_tail(&padata->list, &queue->serial.list);
268                 spin_unlock(&queue->serial.lock);
269
270                 queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork);
271         }
272
273         spin_unlock_bh(&pd->lock);
274
275         if (atomic_read(&pd->reorder_objects))
276                 goto try_again;
277
278 out:
279         return;
280 }
281
282 static void padata_serial_worker(struct work_struct *work)
283 {
284         struct padata_queue *queue;
285         struct parallel_data *pd;
286         LIST_HEAD(local_list);
287
288         local_bh_disable();
289         queue = container_of(work, struct padata_queue, swork);
290         pd = queue->pd;
291
292         spin_lock(&queue->serial.lock);
293         list_replace_init(&queue->serial.list, &local_list);
294         spin_unlock(&queue->serial.lock);
295
296         while (!list_empty(&local_list)) {
297                 struct padata_priv *padata;
298
299                 padata = list_entry(local_list.next,
300                                     struct padata_priv, list);
301
302                 list_del_init(&padata->list);
303
304                 padata->serial(padata);
305                 atomic_dec(&pd->refcnt);
306         }
307         local_bh_enable();
308 }
309
310 /*
311  * padata_do_serial - padata serialization function
312  *
313  * @padata: object to be serialized.
314  *
315  * padata_do_serial must be called for every parallelized object.
316  * The serialization callback function will run with BHs off.
317  */
318 void padata_do_serial(struct padata_priv *padata)
319 {
320         int cpu;
321         struct padata_queue *queue;
322         struct parallel_data *pd;
323
324         pd = padata->pd;
325
326         cpu = get_cpu();
327         queue = per_cpu_ptr(pd->queue, cpu);
328
329         spin_lock(&queue->reorder.lock);
330         atomic_inc(&pd->reorder_objects);
331         list_add_tail(&padata->list, &queue->reorder.list);
332         spin_unlock(&queue->reorder.lock);
333
334         put_cpu();
335
336         padata_reorder(pd);
337 }
338 EXPORT_SYMBOL(padata_do_serial);
339
340 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
341                                              const struct cpumask *cpumask)
342 {
343         int cpu, cpu_index, num_cpus;
344         struct padata_queue *queue;
345         struct parallel_data *pd;
346
347         cpu_index = 0;
348
349         pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
350         if (!pd)
351                 goto err;
352
353         pd->queue = alloc_percpu(struct padata_queue);
354         if (!pd->queue)
355                 goto err_free_pd;
356
357         if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL))
358                 goto err_free_queue;
359
360         for_each_possible_cpu(cpu) {
361                 queue = per_cpu_ptr(pd->queue, cpu);
362
363                 queue->pd = pd;
364
365                 if (cpumask_test_cpu(cpu, cpumask)
366                     && cpumask_test_cpu(cpu, cpu_active_mask)) {
367                         queue->cpu_index = cpu_index;
368                         cpu_index++;
369                 } else
370                         queue->cpu_index = -1;
371
372                 INIT_LIST_HEAD(&queue->reorder.list);
373                 INIT_LIST_HEAD(&queue->parallel.list);
374                 INIT_LIST_HEAD(&queue->serial.list);
375                 spin_lock_init(&queue->reorder.lock);
376                 spin_lock_init(&queue->parallel.lock);
377                 spin_lock_init(&queue->serial.lock);
378
379                 INIT_WORK(&queue->pwork, padata_parallel_worker);
380                 INIT_WORK(&queue->swork, padata_serial_worker);
381                 atomic_set(&queue->num_obj, 0);
382         }
383
384         cpumask_and(pd->cpumask, cpumask, cpu_active_mask);
385
386         num_cpus = cpumask_weight(pd->cpumask);
387         pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1;
388
389         atomic_set(&pd->seq_nr, -1);
390         atomic_set(&pd->reorder_objects, 0);
391         atomic_set(&pd->refcnt, 0);
392         pd->pinst = pinst;
393         spin_lock_init(&pd->lock);
394
395         return pd;
396
397 err_free_queue:
398         free_percpu(pd->queue);
399 err_free_pd:
400         kfree(pd);
401 err:
402         return NULL;
403 }
404
405 static void padata_free_pd(struct parallel_data *pd)
406 {
407         free_cpumask_var(pd->cpumask);
408         free_percpu(pd->queue);
409         kfree(pd);
410 }
411
412 static void padata_replace(struct padata_instance *pinst,
413                            struct parallel_data *pd_new)
414 {
415         struct parallel_data *pd_old = pinst->pd;
416
417         pinst->flags |= PADATA_RESET;
418
419         rcu_assign_pointer(pinst->pd, pd_new);
420
421         synchronize_rcu();
422
423         while (atomic_read(&pd_old->refcnt) != 0)
424                 yield();
425
426         flush_workqueue(pinst->wq);
427
428         padata_free_pd(pd_old);
429
430         pinst->flags &= ~PADATA_RESET;
431 }
432
433 /*
434  * padata_set_cpumask - set the cpumask that padata should use
435  *
436  * @pinst: padata instance
437  * @cpumask: the cpumask to use
438  */
439 int padata_set_cpumask(struct padata_instance *pinst,
440                         cpumask_var_t cpumask)
441 {
442         struct parallel_data *pd;
443         int err = 0;
444
445         might_sleep();
446
447         mutex_lock(&pinst->lock);
448
449         pd = padata_alloc_pd(pinst, cpumask);
450         if (!pd) {
451                 err = -ENOMEM;
452                 goto out;
453         }
454
455         cpumask_copy(pinst->cpumask, cpumask);
456
457         padata_replace(pinst, pd);
458
459 out:
460         mutex_unlock(&pinst->lock);
461
462         return err;
463 }
464 EXPORT_SYMBOL(padata_set_cpumask);
465
466 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
467 {
468         struct parallel_data *pd;
469
470         if (cpumask_test_cpu(cpu, cpu_active_mask)) {
471                 pd = padata_alloc_pd(pinst, pinst->cpumask);
472                 if (!pd)
473                         return -ENOMEM;
474
475                 padata_replace(pinst, pd);
476         }
477
478         return 0;
479 }
480
481 /*
482  * padata_add_cpu - add a cpu to the padata cpumask
483  *
484  * @pinst: padata instance
485  * @cpu: cpu to add
486  */
487 int padata_add_cpu(struct padata_instance *pinst, int cpu)
488 {
489         int err;
490
491         might_sleep();
492
493         mutex_lock(&pinst->lock);
494
495         cpumask_set_cpu(cpu, pinst->cpumask);
496         err = __padata_add_cpu(pinst, cpu);
497
498         mutex_unlock(&pinst->lock);
499
500         return err;
501 }
502 EXPORT_SYMBOL(padata_add_cpu);
503
504 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
505 {
506         struct parallel_data *pd;
507
508         if (cpumask_test_cpu(cpu, cpu_online_mask)) {
509                 pd = padata_alloc_pd(pinst, pinst->cpumask);
510                 if (!pd)
511                         return -ENOMEM;
512
513                 padata_replace(pinst, pd);
514         }
515
516         return 0;
517 }
518
519 /*
520  * padata_remove_cpu - remove a cpu from the padata cpumask
521  *
522  * @pinst: padata instance
523  * @cpu: cpu to remove
524  */
525 int padata_remove_cpu(struct padata_instance *pinst, int cpu)
526 {
527         int err;
528
529         might_sleep();
530
531         mutex_lock(&pinst->lock);
532
533         cpumask_clear_cpu(cpu, pinst->cpumask);
534         err = __padata_remove_cpu(pinst, cpu);
535
536         mutex_unlock(&pinst->lock);
537
538         return err;
539 }
540 EXPORT_SYMBOL(padata_remove_cpu);
541
542 /*
543  * padata_start - start the parallel processing
544  *
545  * @pinst: padata instance to start
546  */
547 void padata_start(struct padata_instance *pinst)
548 {
549         might_sleep();
550
551         mutex_lock(&pinst->lock);
552         pinst->flags |= PADATA_INIT;
553         mutex_unlock(&pinst->lock);
554 }
555 EXPORT_SYMBOL(padata_start);
556
557 /*
558  * padata_stop - stop the parallel processing
559  *
560  * @pinst: padata instance to stop
561  */
562 void padata_stop(struct padata_instance *pinst)
563 {
564         might_sleep();
565
566         mutex_lock(&pinst->lock);
567         pinst->flags &= ~PADATA_INIT;
568         mutex_unlock(&pinst->lock);
569 }
570 EXPORT_SYMBOL(padata_stop);
571
572 static int padata_cpu_callback(struct notifier_block *nfb,
573                                unsigned long action, void *hcpu)
574 {
575         int err;
576         struct padata_instance *pinst;
577         int cpu = (unsigned long)hcpu;
578
579         pinst = container_of(nfb, struct padata_instance, cpu_notifier);
580
581         switch (action) {
582         case CPU_ONLINE:
583         case CPU_ONLINE_FROZEN:
584                 if (!cpumask_test_cpu(cpu, pinst->cpumask))
585                         break;
586                 mutex_lock(&pinst->lock);
587                 err = __padata_add_cpu(pinst, cpu);
588                 mutex_unlock(&pinst->lock);
589                 if (err)
590                         return NOTIFY_BAD;
591                 break;
592
593         case CPU_DOWN_PREPARE:
594         case CPU_DOWN_PREPARE_FROZEN:
595                 if (!cpumask_test_cpu(cpu, pinst->cpumask))
596                         break;
597                 mutex_lock(&pinst->lock);
598                 err = __padata_remove_cpu(pinst, cpu);
599                 mutex_unlock(&pinst->lock);
600                 if (err)
601                         return NOTIFY_BAD;
602                 break;
603
604         case CPU_UP_CANCELED:
605         case CPU_UP_CANCELED_FROZEN:
606                 if (!cpumask_test_cpu(cpu, pinst->cpumask))
607                         break;
608                 mutex_lock(&pinst->lock);
609                 __padata_remove_cpu(pinst, cpu);
610                 mutex_unlock(&pinst->lock);
611
612         case CPU_DOWN_FAILED:
613         case CPU_DOWN_FAILED_FROZEN:
614                 if (!cpumask_test_cpu(cpu, pinst->cpumask))
615                         break;
616                 mutex_lock(&pinst->lock);
617                 __padata_add_cpu(pinst, cpu);
618                 mutex_unlock(&pinst->lock);
619         }
620
621         return NOTIFY_OK;
622 }
623
624 /*
625  * padata_alloc - allocate and initialize a padata instance
626  *
627  * @cpumask: cpumask that padata uses for parallelization
628  * @wq: workqueue to use for the allocated padata instance
629  */
630 struct padata_instance *padata_alloc(const struct cpumask *cpumask,
631                                      struct workqueue_struct *wq)
632 {
633         int err;
634         struct padata_instance *pinst;
635         struct parallel_data *pd;
636
637         pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
638         if (!pinst)
639                 goto err;
640
641         pd = padata_alloc_pd(pinst, cpumask);
642         if (!pd)
643                 goto err_free_inst;
644
645         rcu_assign_pointer(pinst->pd, pd);
646
647         pinst->wq = wq;
648
649         cpumask_copy(pinst->cpumask, cpumask);
650
651         pinst->flags = 0;
652
653         pinst->cpu_notifier.notifier_call = padata_cpu_callback;
654         pinst->cpu_notifier.priority = 0;
655         err = register_hotcpu_notifier(&pinst->cpu_notifier);
656         if (err)
657                 goto err_free_pd;
658
659         mutex_init(&pinst->lock);
660
661         return pinst;
662
663 err_free_pd:
664         padata_free_pd(pd);
665 err_free_inst:
666         kfree(pinst);
667 err:
668         return NULL;
669 }
670 EXPORT_SYMBOL(padata_alloc);
671
672 /*
673  * padata_free - free a padata instance
674  *
675  * @ padata_inst: padata instance to free
676  */
677 void padata_free(struct padata_instance *pinst)
678 {
679         padata_stop(pinst);
680
681         synchronize_rcu();
682
683         while (atomic_read(&pinst->pd->refcnt) != 0)
684                 yield();
685
686         unregister_hotcpu_notifier(&pinst->cpu_notifier);
687         padata_free_pd(pinst->pd);
688         kfree(pinst);
689 }
690 EXPORT_SYMBOL(padata_free);