ring-buffer benchmark: Run producer/consumer threads at nice +19
[linux-2.6.git] / kernel / trace / ring_buffer_benchmark.c
1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/time.h>
11
12 struct rb_page {
13         u64             ts;
14         local_t         commit;
15         char            data[4080];
16 };
17
18 /* run time and sleep time in seconds */
19 #define RUN_TIME        10
20 #define SLEEP_TIME      10
21
22 /* number of events for writer to wake up the reader */
23 static int wakeup_interval = 100;
24
25 static int reader_finish;
26 static struct completion read_start;
27 static struct completion read_done;
28
29 static struct ring_buffer *buffer;
30 static struct task_struct *producer;
31 static struct task_struct *consumer;
32 static unsigned long read;
33
34 static int disable_reader;
35 module_param(disable_reader, uint, 0644);
36 MODULE_PARM_DESC(disable_reader, "only run producer");
37
38 static int write_iteration = 50;
39 module_param(write_iteration, uint, 0644);
40 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
41
42 static int read_events;
43
44 static int kill_test;
45
46 #define KILL_TEST()                             \
47         do {                                    \
48                 if (!kill_test) {               \
49                         kill_test = 1;          \
50                         WARN_ON(1);             \
51                 }                               \
52         } while (0)
53
54 enum event_status {
55         EVENT_FOUND,
56         EVENT_DROPPED,
57 };
58
59 static enum event_status read_event(int cpu)
60 {
61         struct ring_buffer_event *event;
62         int *entry;
63         u64 ts;
64
65         event = ring_buffer_consume(buffer, cpu, &ts);
66         if (!event)
67                 return EVENT_DROPPED;
68
69         entry = ring_buffer_event_data(event);
70         if (*entry != cpu) {
71                 KILL_TEST();
72                 return EVENT_DROPPED;
73         }
74
75         read++;
76         return EVENT_FOUND;
77 }
78
79 static enum event_status read_page(int cpu)
80 {
81         struct ring_buffer_event *event;
82         struct rb_page *rpage;
83         unsigned long commit;
84         void *bpage;
85         int *entry;
86         int ret;
87         int inc;
88         int i;
89
90         bpage = ring_buffer_alloc_read_page(buffer);
91         if (!bpage)
92                 return EVENT_DROPPED;
93
94         ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
95         if (ret >= 0) {
96                 rpage = bpage;
97                 commit = local_read(&rpage->commit);
98                 for (i = 0; i < commit && !kill_test; i += inc) {
99
100                         if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
101                                 KILL_TEST();
102                                 break;
103                         }
104
105                         inc = -1;
106                         event = (void *)&rpage->data[i];
107                         switch (event->type_len) {
108                         case RINGBUF_TYPE_PADDING:
109                                 /* failed writes may be discarded events */
110                                 if (!event->time_delta)
111                                         KILL_TEST();
112                                 inc = event->array[0] + 4;
113                                 break;
114                         case RINGBUF_TYPE_TIME_EXTEND:
115                                 inc = 8;
116                                 break;
117                         case 0:
118                                 entry = ring_buffer_event_data(event);
119                                 if (*entry != cpu) {
120                                         KILL_TEST();
121                                         break;
122                                 }
123                                 read++;
124                                 if (!event->array[0]) {
125                                         KILL_TEST();
126                                         break;
127                                 }
128                                 inc = event->array[0] + 4;
129                                 break;
130                         default:
131                                 entry = ring_buffer_event_data(event);
132                                 if (*entry != cpu) {
133                                         KILL_TEST();
134                                         break;
135                                 }
136                                 read++;
137                                 inc = ((event->type_len + 1) * 4);
138                         }
139                         if (kill_test)
140                                 break;
141
142                         if (inc <= 0) {
143                                 KILL_TEST();
144                                 break;
145                         }
146                 }
147         }
148         ring_buffer_free_read_page(buffer, bpage);
149
150         if (ret < 0)
151                 return EVENT_DROPPED;
152         return EVENT_FOUND;
153 }
154
155 static void ring_buffer_consumer(void)
156 {
157         /* toggle between reading pages and events */
158         read_events ^= 1;
159
160         read = 0;
161         while (!reader_finish && !kill_test) {
162                 int found;
163
164                 do {
165                         int cpu;
166
167                         found = 0;
168                         for_each_online_cpu(cpu) {
169                                 enum event_status stat;
170
171                                 if (read_events)
172                                         stat = read_event(cpu);
173                                 else
174                                         stat = read_page(cpu);
175
176                                 if (kill_test)
177                                         break;
178                                 if (stat == EVENT_FOUND)
179                                         found = 1;
180                         }
181                 } while (found && !kill_test);
182
183                 set_current_state(TASK_INTERRUPTIBLE);
184                 if (reader_finish)
185                         break;
186
187                 schedule();
188                 __set_current_state(TASK_RUNNING);
189         }
190         reader_finish = 0;
191         complete(&read_done);
192 }
193
194 static void ring_buffer_producer(void)
195 {
196         struct timeval start_tv;
197         struct timeval end_tv;
198         unsigned long long time;
199         unsigned long long entries;
200         unsigned long long overruns;
201         unsigned long missed = 0;
202         unsigned long hit = 0;
203         unsigned long avg;
204         int cnt = 0;
205
206         /*
207          * Hammer the buffer for 10 secs (this may
208          * make the system stall)
209          */
210         trace_printk("Starting ring buffer hammer\n");
211         do_gettimeofday(&start_tv);
212         do {
213                 struct ring_buffer_event *event;
214                 int *entry;
215                 int i;
216
217                 for (i = 0; i < write_iteration; i++) {
218                         event = ring_buffer_lock_reserve(buffer, 10);
219                         if (!event) {
220                                 missed++;
221                         } else {
222                                 hit++;
223                                 entry = ring_buffer_event_data(event);
224                                 *entry = smp_processor_id();
225                                 ring_buffer_unlock_commit(buffer, event);
226                         }
227                 }
228                 do_gettimeofday(&end_tv);
229
230                 cnt++;
231                 if (consumer && !(cnt % wakeup_interval))
232                         wake_up_process(consumer);
233
234 #ifndef CONFIG_PREEMPT
235                 /*
236                  * If we are a non preempt kernel, the 10 second run will
237                  * stop everything while it runs. Instead, we will call
238                  * cond_resched and also add any time that was lost by a
239                  * rescedule.
240                  *
241                  * Do a cond resched at the same frequency we would wake up
242                  * the reader.
243                  */
244                 if (cnt % wakeup_interval)
245                         cond_resched();
246 #endif
247
248         } while (end_tv.tv_sec < (start_tv.tv_sec + RUN_TIME) && !kill_test);
249         trace_printk("End ring buffer hammer\n");
250
251         if (consumer) {
252                 /* Init both completions here to avoid races */
253                 init_completion(&read_start);
254                 init_completion(&read_done);
255                 /* the completions must be visible before the finish var */
256                 smp_wmb();
257                 reader_finish = 1;
258                 /* finish var visible before waking up the consumer */
259                 smp_wmb();
260                 wake_up_process(consumer);
261                 wait_for_completion(&read_done);
262         }
263
264         time = end_tv.tv_sec - start_tv.tv_sec;
265         time *= USEC_PER_SEC;
266         time += (long long)((long)end_tv.tv_usec - (long)start_tv.tv_usec);
267
268         entries = ring_buffer_entries(buffer);
269         overruns = ring_buffer_overruns(buffer);
270
271         if (kill_test)
272                 trace_printk("ERROR!\n");
273         trace_printk("Time:     %lld (usecs)\n", time);
274         trace_printk("Overruns: %lld\n", overruns);
275         if (disable_reader)
276                 trace_printk("Read:     (reader disabled)\n");
277         else
278                 trace_printk("Read:     %ld  (by %s)\n", read,
279                         read_events ? "events" : "pages");
280         trace_printk("Entries:  %lld\n", entries);
281         trace_printk("Total:    %lld\n", entries + overruns + read);
282         trace_printk("Missed:   %ld\n", missed);
283         trace_printk("Hit:      %ld\n", hit);
284
285         /* Convert time from usecs to millisecs */
286         do_div(time, USEC_PER_MSEC);
287         if (time)
288                 hit /= (long)time;
289         else
290                 trace_printk("TIME IS ZERO??\n");
291
292         trace_printk("Entries per millisec: %ld\n", hit);
293
294         if (hit) {
295                 /* Calculate the average time in nanosecs */
296                 avg = NSEC_PER_MSEC / hit;
297                 trace_printk("%ld ns per entry\n", avg);
298         }
299
300         if (missed) {
301                 if (time)
302                         missed /= (long)time;
303
304                 trace_printk("Total iterations per millisec: %ld\n",
305                              hit + missed);
306
307                 /* it is possible that hit + missed will overflow and be zero */
308                 if (!(hit + missed)) {
309                         trace_printk("hit + missed overflowed and totalled zero!\n");
310                         hit--; /* make it non zero */
311                 }
312
313                 /* Caculate the average time in nanosecs */
314                 avg = NSEC_PER_MSEC / (hit + missed);
315                 trace_printk("%ld ns per entry\n", avg);
316         }
317 }
318
319 static void wait_to_die(void)
320 {
321         set_current_state(TASK_INTERRUPTIBLE);
322         while (!kthread_should_stop()) {
323                 schedule();
324                 set_current_state(TASK_INTERRUPTIBLE);
325         }
326         __set_current_state(TASK_RUNNING);
327 }
328
329 static int ring_buffer_consumer_thread(void *arg)
330 {
331         while (!kthread_should_stop() && !kill_test) {
332                 complete(&read_start);
333
334                 ring_buffer_consumer();
335
336                 set_current_state(TASK_INTERRUPTIBLE);
337                 if (kthread_should_stop() || kill_test)
338                         break;
339
340                 schedule();
341                 __set_current_state(TASK_RUNNING);
342         }
343         __set_current_state(TASK_RUNNING);
344
345         if (kill_test)
346                 wait_to_die();
347
348         return 0;
349 }
350
351 static int ring_buffer_producer_thread(void *arg)
352 {
353         init_completion(&read_start);
354
355         while (!kthread_should_stop() && !kill_test) {
356                 ring_buffer_reset(buffer);
357
358                 if (consumer) {
359                         smp_wmb();
360                         wake_up_process(consumer);
361                         wait_for_completion(&read_start);
362                 }
363
364                 ring_buffer_producer();
365
366                 trace_printk("Sleeping for 10 secs\n");
367                 set_current_state(TASK_INTERRUPTIBLE);
368                 schedule_timeout(HZ * SLEEP_TIME);
369                 __set_current_state(TASK_RUNNING);
370         }
371
372         if (kill_test)
373                 wait_to_die();
374
375         return 0;
376 }
377
378 static int __init ring_buffer_benchmark_init(void)
379 {
380         int ret;
381
382         /* make a one meg buffer in overwite mode */
383         buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
384         if (!buffer)
385                 return -ENOMEM;
386
387         if (!disable_reader) {
388                 consumer = kthread_create(ring_buffer_consumer_thread,
389                                           NULL, "rb_consumer");
390                 ret = PTR_ERR(consumer);
391                 if (IS_ERR(consumer))
392                         goto out_fail;
393         }
394
395         producer = kthread_run(ring_buffer_producer_thread,
396                                NULL, "rb_producer");
397         ret = PTR_ERR(producer);
398
399         if (IS_ERR(producer))
400                 goto out_kill;
401
402         /*
403          * Run them as low-prio background tasks by default:
404          */
405         set_user_nice(consumer, 19);
406         set_user_nice(producer, 19);
407
408         return 0;
409
410  out_kill:
411         if (consumer)
412                 kthread_stop(consumer);
413
414  out_fail:
415         ring_buffer_free(buffer);
416         return ret;
417 }
418
419 static void __exit ring_buffer_benchmark_exit(void)
420 {
421         kthread_stop(producer);
422         if (consumer)
423                 kthread_stop(consumer);
424         ring_buffer_free(buffer);
425 }
426
427 module_init(ring_buffer_benchmark_init);
428 module_exit(ring_buffer_benchmark_exit);
429
430 MODULE_AUTHOR("Steven Rostedt");
431 MODULE_DESCRIPTION("ring_buffer_benchmark");
432 MODULE_LICENSE("GPL");