mm: correctly synchronize rss-counters at exit/exec
[linux-2.6.git] / kernel / relay.c
index 951f29b..ab56a17 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Public API and common code for kernel->userspace relay file support.
  *
- * See Documentation/filesystems/relayfs.txt for an overview of relayfs.
+ * See Documentation/filesystems/relay.txt for an overview.
  *
  * Copyright (C) 2002-2005 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
  * Copyright (C) 1999-2005 - Karim Yaghmour (karim@opersys.com)
@@ -15,7 +15,7 @@
 #include <linux/errno.h>
 #include <linux/stddef.h>
 #include <linux/slab.h>
-#include <linux/module.h>
+#include <linux/export.h>
 #include <linux/string.h>
 #include <linux/relay.h>
 #include <linux/vmalloc.h>
@@ -37,40 +37,56 @@ static void relay_file_mmap_close(struct vm_area_struct *vma)
 }
 
 /*
- * nopage() vm_op implementation for relay file mapping.
+ * fault() vm_op implementation for relay file mapping.
  */
-static struct page *relay_buf_nopage(struct vm_area_struct *vma,
-                                    unsigned long address,
-                                    int *type)
+static int relay_buf_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
 {
        struct page *page;
        struct rchan_buf *buf = vma->vm_private_data;
-       unsigned long offset = address - vma->vm_start;
+       pgoff_t pgoff = vmf->pgoff;
 
-       if (address > vma->vm_end)
-               return NOPAGE_SIGBUS; /* Disallow mremap */
        if (!buf)
-               return NOPAGE_OOM;
+               return VM_FAULT_OOM;
 
-       page = vmalloc_to_page(buf->start + offset);
+       page = vmalloc_to_page(buf->start + (pgoff << PAGE_SHIFT));
        if (!page)
-               return NOPAGE_OOM;
+               return VM_FAULT_SIGBUS;
        get_page(page);
+       vmf->page = page;
 
-       if (type)
-               *type = VM_FAULT_MINOR;
-
-       return page;
+       return 0;
 }
 
 /*
  * vm_ops for relay file mappings.
  */
-static struct vm_operations_struct relay_file_mmap_ops = {
-       .nopage = relay_buf_nopage,
+static const struct vm_operations_struct relay_file_mmap_ops = {
+       .fault = relay_buf_fault,
        .close = relay_file_mmap_close,
 };
 
+/*
+ * allocate an array of pointers of struct page
+ */
+static struct page **relay_alloc_page_array(unsigned int n_pages)
+{
+       const size_t pa_size = n_pages * sizeof(struct page *);
+       if (pa_size > PAGE_SIZE)
+               return vzalloc(pa_size);
+       return kzalloc(pa_size, GFP_KERNEL);
+}
+
+/*
+ * free an array of pointers of struct page
+ */
+static void relay_free_page_array(struct page **array)
+{
+       if (is_vmalloc_addr(array))
+               vfree(array);
+       else
+               kfree(array);
+}
+
 /**
  *     relay_mmap_buf: - mmap channel buffer to process address space
  *     @buf: relay channel buffer
@@ -80,7 +96,7 @@ static struct vm_operations_struct relay_file_mmap_ops = {
  *
  *     Caller should already have grabbed mmap_sem.
  */
-int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
+static int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
 {
        unsigned long length = vma->vm_end - vma->vm_start;
        struct file *filp = vma->vm_file;
@@ -92,6 +108,7 @@ int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
                return -EINVAL;
 
        vma->vm_ops = &relay_file_mmap_ops;
+       vma->vm_flags |= VM_DONTEXPAND;
        vma->vm_private_data = buf;
        buf->chan->cb->buf_mapped(buf, filp);
 
@@ -114,7 +131,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
        *size = PAGE_ALIGN(*size);
        n_pages = *size >> PAGE_SHIFT;
 
-       buf->page_array = kcalloc(n_pages, sizeof(struct page *), GFP_KERNEL);
+       buf->page_array = relay_alloc_page_array(n_pages);
        if (!buf->page_array)
                return NULL;
 
@@ -135,7 +152,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
 depopulate:
        for (j = 0; j < i; j++)
                __free_page(buf->page_array[j]);
-       kfree(buf->page_array);
+       relay_free_page_array(buf->page_array);
        return NULL;
 }
 
@@ -145,12 +162,16 @@ depopulate:
  *
  *     Returns channel buffer if successful, %NULL otherwise.
  */
-struct rchan_buf *relay_create_buf(struct rchan *chan)
+static struct rchan_buf *relay_create_buf(struct rchan *chan)
 {
-       struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
-       if (!buf)
+       struct rchan_buf *buf;
+
+       if (chan->n_subbufs > UINT_MAX / sizeof(size_t *))
                return NULL;
 
+       buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
+       if (!buf)
+               return NULL;
        buf->padding = kmalloc(chan->n_subbufs * sizeof(size_t *), GFP_KERNEL);
        if (!buf->padding)
                goto free_buf;
@@ -175,7 +196,7 @@ free_buf:
  *
  *     Should only be called from kref_put().
  */
-void relay_destroy_channel(struct kref *kref)
+static void relay_destroy_channel(struct kref *kref)
 {
        struct rchan *chan = container_of(kref, struct rchan, kref);
        kfree(chan);
@@ -185,7 +206,7 @@ void relay_destroy_channel(struct kref *kref)
  *     relay_destroy_buf - destroy an rchan_buf struct and associated buffer
  *     @buf: the buffer struct
  */
-void relay_destroy_buf(struct rchan_buf *buf)
+static void relay_destroy_buf(struct rchan_buf *buf)
 {
        struct rchan *chan = buf->chan;
        unsigned int i;
@@ -194,7 +215,7 @@ void relay_destroy_buf(struct rchan_buf *buf)
                vunmap(buf->start);
                for (i = 0; i < buf->page_count; i++)
                        __free_page(buf->page_array[i]);
-               kfree(buf->page_array);
+               relay_free_page_array(buf->page_array);
        }
        chan->buf[buf->cpu] = NULL;
        kfree(buf->padding);
@@ -210,7 +231,7 @@ void relay_destroy_buf(struct rchan_buf *buf)
  *     rchan_buf_struct and the channel buffer.  Should only be called from
  *     kref_put().
  */
-void relay_remove_buf(struct kref *kref)
+static void relay_remove_buf(struct kref *kref)
 {
        struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
        buf->chan->cb->remove_buf_file(buf->dentry);
@@ -223,11 +244,10 @@ void relay_remove_buf(struct kref *kref)
  *
  *     Returns 1 if the buffer is empty, 0 otherwise.
  */
-int relay_buf_empty(struct rchan_buf *buf)
+static int relay_buf_empty(struct rchan_buf *buf)
 {
        return (buf->subbufs_produced - buf->subbufs_consumed) ? 0 : 1;
 }
-EXPORT_SYMBOL_GPL(relay_buf_empty);
 
 /**
  *     relay_buf_full - boolean, is the channel buffer full?
@@ -286,7 +306,7 @@ static void buf_unmapped_default_callback(struct rchan_buf *buf,
  */
 static struct dentry *create_buf_file_default_callback(const char *filename,
                                                       struct dentry *parent,
-                                                      int mode,
+                                                      umode_t mode,
                                                       struct rchan_buf *buf,
                                                       int *is_global)
 {
@@ -371,19 +391,48 @@ void relay_reset(struct rchan *chan)
        if (!chan)
                return;
 
-       if (chan->is_global && chan->buf[0]) {
+       if (chan->is_global && chan->buf[0]) {
                __relay_reset(chan->buf[0], 0);
                return;
        }
 
        mutex_lock(&relay_channels_mutex);
-       for_each_online_cpu(i)
+       for_each_possible_cpu(i)
                if (chan->buf[i])
                        __relay_reset(chan->buf[i], 0);
        mutex_unlock(&relay_channels_mutex);
 }
 EXPORT_SYMBOL_GPL(relay_reset);
 
+static inline void relay_set_buf_dentry(struct rchan_buf *buf,
+                                       struct dentry *dentry)
+{
+       buf->dentry = dentry;
+       buf->dentry->d_inode->i_size = buf->early_bytes;
+}
+
+static struct dentry *relay_create_buf_file(struct rchan *chan,
+                                           struct rchan_buf *buf,
+                                           unsigned int cpu)
+{
+       struct dentry *dentry;
+       char *tmpname;
+
+       tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
+       if (!tmpname)
+               return NULL;
+       snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
+
+       /* Create file in fs */
+       dentry = chan->cb->create_buf_file(tmpname, chan->parent,
+                                          S_IRUSR, buf,
+                                          &chan->is_global);
+
+       kfree(tmpname);
+
+       return dentry;
+}
+
 /*
  *     relay_open_buf - create a new relay channel buffer
  *
@@ -393,44 +442,34 @@ static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
 {
        struct rchan_buf *buf = NULL;
        struct dentry *dentry;
-       char *tmpname;
 
        if (chan->is_global)
                return chan->buf[0];
 
-       tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
-       if (!tmpname)
-               goto end;
-       snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
-
        buf = relay_create_buf(chan);
        if (!buf)
-               goto free_name;
+               return NULL;
+
+       if (chan->has_base_filename) {
+               dentry = relay_create_buf_file(chan, buf, cpu);
+               if (!dentry)
+                       goto free_buf;
+               relay_set_buf_dentry(buf, dentry);
+       }
 
        buf->cpu = cpu;
        __relay_reset(buf, 1);
 
-       /* Create file in fs */
-       dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
-                                          buf, &chan->is_global);
-       if (!dentry)
-               goto free_buf;
-
-       buf->dentry = dentry;
-
        if(chan->is_global) {
                chan->buf[0] = buf;
                buf->cpu = 0;
        }
 
-       goto free_name;
+       return buf;
 
 free_buf:
        relay_destroy_buf(buf);
-free_name:
-       kfree(tmpname);
-end:
-       return buf;
+       return NULL;
 }
 
 /**
@@ -497,7 +536,7 @@ static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
                                        "relay_hotcpu_callback: cpu %d buffer "
                                        "creation failed\n", hotcpu);
                                mutex_unlock(&relay_channels_mutex);
-                               return NOTIFY_BAD;
+                               return notifier_from_errno(-ENOMEM);
                        }
                }
                mutex_unlock(&relay_channels_mutex);
@@ -513,8 +552,8 @@ static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
 
 /**
  *     relay_open - create a new relay channel
- *     @base_filename: base name of files to create
- *     @parent: dentry of parent directory, %NULL for root directory
+ *     @base_filename: base name of files to create, %NULL for buffering only
+ *     @parent: dentry of parent directory, %NULL for root directory or buffer
  *     @subbuf_size: size of sub-buffers
  *     @n_subbufs: number of sub-buffers
  *     @cb: client callback functions
@@ -536,11 +575,11 @@ struct rchan *relay_open(const char *base_filename,
 {
        unsigned int i;
        struct rchan *chan;
-       if (!base_filename)
-               return NULL;
 
        if (!(subbuf_size && n_subbufs))
                return NULL;
+       if (subbuf_size > UINT_MAX / n_subbufs)
+               return NULL;
 
        chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
        if (!chan)
@@ -552,7 +591,10 @@ struct rchan *relay_open(const char *base_filename,
        chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
        chan->parent = parent;
        chan->private_data = private_data;
-       strlcpy(chan->base_filename, base_filename, NAME_MAX);
+       if (base_filename) {
+               chan->has_base_filename = 1;
+               strlcpy(chan->base_filename, base_filename, NAME_MAX);
+       }
        setup_callbacks(chan, cb);
        kref_init(&chan->kref);
 
@@ -568,10 +610,9 @@ struct rchan *relay_open(const char *base_filename,
        return chan;
 
 free_bufs:
-       for_each_online_cpu(i) {
-               if (!chan->buf[i])
-                       break;
-               relay_close_buf(chan->buf[i]);
+       for_each_possible_cpu(i) {
+               if (chan->buf[i])
+                       relay_close_buf(chan->buf[i]);
        }
 
        kref_put(&chan->kref, relay_destroy_channel);
@@ -580,6 +621,94 @@ free_bufs:
 }
 EXPORT_SYMBOL_GPL(relay_open);
 
+struct rchan_percpu_buf_dispatcher {
+       struct rchan_buf *buf;
+       struct dentry *dentry;
+};
+
+/* Called in atomic context. */
+static void __relay_set_buf_dentry(void *info)
+{
+       struct rchan_percpu_buf_dispatcher *p = info;
+
+       relay_set_buf_dentry(p->buf, p->dentry);
+}
+
+/**
+ *     relay_late_setup_files - triggers file creation
+ *     @chan: channel to operate on
+ *     @base_filename: base name of files to create
+ *     @parent: dentry of parent directory, %NULL for root directory
+ *
+ *     Returns 0 if successful, non-zero otherwise.
+ *
+ *     Use to setup files for a previously buffer-only channel.
+ *     Useful to do early tracing in kernel, before VFS is up, for example.
+ */
+int relay_late_setup_files(struct rchan *chan,
+                          const char *base_filename,
+                          struct dentry *parent)
+{
+       int err = 0;
+       unsigned int i, curr_cpu;
+       unsigned long flags;
+       struct dentry *dentry;
+       struct rchan_percpu_buf_dispatcher disp;
+
+       if (!chan || !base_filename)
+               return -EINVAL;
+
+       strlcpy(chan->base_filename, base_filename, NAME_MAX);
+
+       mutex_lock(&relay_channels_mutex);
+       /* Is chan already set up? */
+       if (unlikely(chan->has_base_filename)) {
+               mutex_unlock(&relay_channels_mutex);
+               return -EEXIST;
+       }
+       chan->has_base_filename = 1;
+       chan->parent = parent;
+       curr_cpu = get_cpu();
+       /*
+        * The CPU hotplug notifier ran before us and created buffers with
+        * no files associated. So it's safe to call relay_setup_buf_file()
+        * on all currently online CPUs.
+        */
+       for_each_online_cpu(i) {
+               if (unlikely(!chan->buf[i])) {
+                       WARN_ONCE(1, KERN_ERR "CPU has no buffer!\n");
+                       err = -EINVAL;
+                       break;
+               }
+
+               dentry = relay_create_buf_file(chan, chan->buf[i], i);
+               if (unlikely(!dentry)) {
+                       err = -EINVAL;
+                       break;
+               }
+
+               if (curr_cpu == i) {
+                       local_irq_save(flags);
+                       relay_set_buf_dentry(chan->buf[i], dentry);
+                       local_irq_restore(flags);
+               } else {
+                       disp.buf = chan->buf[i];
+                       disp.dentry = dentry;
+                       smp_mb();
+                       /* relay_channels_mutex must be held, so wait. */
+                       err = smp_call_function_single(i,
+                                                      __relay_set_buf_dentry,
+                                                      &disp, 1);
+               }
+               if (unlikely(err))
+                       break;
+       }
+       put_cpu();
+       mutex_unlock(&relay_channels_mutex);
+
+       return err;
+}
+
 /**
  *     relay_switch_subbuf - switch to a new sub-buffer
  *     @buf: channel buffer
@@ -603,8 +732,13 @@ size_t relay_switch_subbuf(struct rchan_buf *buf, size_t length)
                old_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
                buf->padding[old_subbuf] = buf->prev_padding;
                buf->subbufs_produced++;
-               buf->dentry->d_inode->i_size += buf->chan->subbuf_size -
-                       buf->padding[old_subbuf];
+               if (buf->dentry)
+                       buf->dentry->d_inode->i_size +=
+                               buf->chan->subbuf_size -
+                               buf->padding[old_subbuf];
+               else
+                       buf->early_bytes += buf->chan->subbuf_size -
+                                           buf->padding[old_subbuf];
                smp_mb();
                if (waitqueue_active(&buf->read_wait))
                        /*
@@ -613,7 +747,7 @@ size_t relay_switch_subbuf(struct rchan_buf *buf, size_t length)
                         * from the scheduler (trying to re-grab
                         * rq->lock), so defer it.
                         */
-                       __mod_timer(&buf->timer, jiffies + 1);
+                       mod_timer(&buf->timer, jiffies + 1);
        }
 
        old = buf->data;
@@ -660,13 +794,15 @@ void relay_subbufs_consumed(struct rchan *chan,
        if (!chan)
                return;
 
-       if (cpu >= NR_CPUS || !chan->buf[cpu])
+       if (cpu >= NR_CPUS || !chan->buf[cpu] ||
+                                       subbufs_consumed > chan->n_subbufs)
                return;
 
        buf = chan->buf[cpu];
-       buf->subbufs_consumed += subbufs_consumed;
-       if (buf->subbufs_consumed > buf->subbufs_produced)
+       if (subbufs_consumed > buf->subbufs_produced - buf->subbufs_consumed)
                buf->subbufs_consumed = buf->subbufs_produced;
+       else
+               buf->subbufs_consumed += subbufs_consumed;
 }
 EXPORT_SYMBOL_GPL(relay_subbufs_consumed);
 
@@ -741,7 +877,7 @@ static int relay_file_open(struct inode *inode, struct file *filp)
        kref_get(&buf->kref);
        filp->private_data = buf;
 
-       return 0;
+       return nonseekable_open(inode, filp);
 }
 
 /**
@@ -808,6 +944,10 @@ static void relay_file_read_consume(struct rchan_buf *buf,
        size_t n_subbufs = buf->chan->n_subbufs;
        size_t read_subbuf;
 
+       if (buf->subbufs_produced == buf->subbufs_consumed &&
+           buf->offset == buf->bytes_consumed)
+               return;
+
        if (buf->bytes_consumed + bytes_consumed > subbuf_size) {
                relay_subbufs_consumed(buf->chan, buf->cpu, 1);
                buf->bytes_consumed = 0;
@@ -839,6 +979,8 @@ static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
 
        relay_file_read_consume(buf, read_pos, 0);
 
+       consumed = buf->subbufs_consumed;
+
        if (unlikely(buf->offset > subbuf_size)) {
                if (produced == consumed)
                        return 0;
@@ -850,15 +992,19 @@ static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
                buf->subbufs_consumed = consumed;
                buf->bytes_consumed = 0;
        }
-       
+
        produced = (produced % n_subbufs) * subbuf_size + buf->offset;
        consumed = (consumed % n_subbufs) * subbuf_size + buf->bytes_consumed;
 
        if (consumed > produced)
                produced += n_subbufs * subbuf_size;
-       
-       if (consumed == produced)
+
+       if (consumed == produced) {
+               if (buf->offset == subbuf_size &&
+                   buf->subbufs_produced > buf->subbufs_consumed)
+                       return 1;
                return 0;
+       }
 
        return 1;
 }
@@ -1032,153 +1178,132 @@ static ssize_t relay_file_read(struct file *filp,
                                       NULL, &desc);
 }
 
+static void relay_consume_bytes(struct rchan_buf *rbuf, int bytes_consumed)
+{
+       rbuf->bytes_consumed += bytes_consumed;
+
+       if (rbuf->bytes_consumed >= rbuf->chan->subbuf_size) {
+               relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
+               rbuf->bytes_consumed %= rbuf->chan->subbuf_size;
+       }
+}
+
 static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
                                   struct pipe_buffer *buf)
 {
        struct rchan_buf *rbuf;
 
        rbuf = (struct rchan_buf *)page_private(buf->page);
-
-       rbuf->bytes_consumed += PAGE_SIZE;
-
-       if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
-               relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
-               rbuf->bytes_consumed = 0;
-       }
+       relay_consume_bytes(rbuf, buf->private);
 }
 
-static struct pipe_buf_operations relay_pipe_buf_ops = {
+static const struct pipe_buf_operations relay_pipe_buf_ops = {
        .can_merge = 0,
        .map = generic_pipe_buf_map,
        .unmap = generic_pipe_buf_unmap,
-       .pin = generic_pipe_buf_pin,
+       .confirm = generic_pipe_buf_confirm,
        .release = relay_pipe_buf_release,
        .steal = generic_pipe_buf_steal,
        .get = generic_pipe_buf_get,
 };
 
-/**
+static void relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
+{
+}
+
+/*
  *     subbuf_splice_actor - splice up to one subbuf's worth of data
  */
-static int subbuf_splice_actor(struct file *in,
+static ssize_t subbuf_splice_actor(struct file *in,
                               loff_t *ppos,
                               struct pipe_inode_info *pipe,
                               size_t len,
                               unsigned int flags,
                               int *nonpad_ret)
 {
-       unsigned int pidx, poff;
-       unsigned int subbuf_pages;
-       int ret = 0;
-       int do_wakeup = 0;
+       unsigned int pidx, poff, total_len, subbuf_pages, nr_pages;
        struct rchan_buf *rbuf = in->private_data;
        unsigned int subbuf_size = rbuf->chan->subbuf_size;
-       size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
-       size_t avail = subbuf_size - read_start % subbuf_size;
+       uint64_t pos = (uint64_t) *ppos;
+       uint32_t alloc_size = (uint32_t) rbuf->chan->alloc_size;
+       size_t read_start = (size_t) do_div(pos, alloc_size);
        size_t read_subbuf = read_start / subbuf_size;
        size_t padding = rbuf->padding[read_subbuf];
        size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
+       struct page *pages[PIPE_DEF_BUFFERS];
+       struct partial_page partial[PIPE_DEF_BUFFERS];
+       struct splice_pipe_desc spd = {
+               .pages = pages,
+               .nr_pages = 0,
+               .partial = partial,
+               .flags = flags,
+               .ops = &relay_pipe_buf_ops,
+               .spd_release = relay_page_release,
+       };
+       ssize_t ret;
 
        if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
                return 0;
+       if (splice_grow_spd(pipe, &spd))
+               return -ENOMEM;
 
-       if (len > avail)
-               len = avail;
-
-       if (pipe->inode)
-               mutex_lock(&pipe->inode->i_mutex);
+       /*
+        * Adjust read len, if longer than what is available
+        */
+       if (len > (subbuf_size - read_start % subbuf_size))
+               len = subbuf_size - read_start % subbuf_size;
 
        subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
        pidx = (read_start / PAGE_SIZE) % subbuf_pages;
        poff = read_start & ~PAGE_MASK;
+       nr_pages = min_t(unsigned int, subbuf_pages, pipe->buffers);
 
-       for (;;) {
-               unsigned int this_len;
-               unsigned int this_end;
-               int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
-               struct pipe_buffer *buf = pipe->bufs + newbuf;
+       for (total_len = 0; spd.nr_pages < nr_pages; spd.nr_pages++) {
+               unsigned int this_len, this_end, private;
+               unsigned int cur_pos = read_start + total_len;
 
-               if (!pipe->readers) {
-                       send_sig(SIGPIPE, current, 0);
-                       if (!ret)
-                               ret = -EPIPE;
+               if (!len)
                        break;
-               }
-
-               if (pipe->nrbufs < PIPE_BUFFERS) {
-                       this_len = PAGE_SIZE - poff;
-                       if (this_len > avail)
-                               this_len = avail;
-
-                       buf->page = rbuf->page_array[pidx];
-                       buf->offset = poff;
-                       this_end = read_start + ret + this_len;
-                       if (this_end > nonpad_end) {
-                               if (read_start + ret >= nonpad_end)
-                                       buf->len = 0;
-                               else
-                                       buf->len = nonpad_end - (read_start + ret);
-                       } else
-                               buf->len = this_len;
-
-                       *nonpad_ret += buf->len;
-
-                       buf->ops = &relay_pipe_buf_ops;
-                       pipe->nrbufs++;
-
-                       avail -= this_len;
-                       ret += this_len;
-                       poff = 0;
-                       pidx = (pidx + 1) % subbuf_pages;
 
-                       if (pipe->inode)
-                               do_wakeup = 1;
+               this_len = min_t(unsigned long, len, PAGE_SIZE - poff);
+               private = this_len;
 
-                       if (!avail)
-                               break;
+               spd.pages[spd.nr_pages] = rbuf->page_array[pidx];
+               spd.partial[spd.nr_pages].offset = poff;
 
-                       if (pipe->nrbufs < PIPE_BUFFERS)
-                               continue;
-
-                       break;
+               this_end = cur_pos + this_len;
+               if (this_end >= nonpad_end) {
+                       this_len = nonpad_end - cur_pos;
+                       private = this_len + padding;
                }
+               spd.partial[spd.nr_pages].len = this_len;
+               spd.partial[spd.nr_pages].private = private;
 
-               if (flags & SPLICE_F_NONBLOCK) {
-                       if (!ret)
-                               ret = -EAGAIN;
-                       break;
-               }
+               len -= this_len;
+               total_len += this_len;
+               poff = 0;
+               pidx = (pidx + 1) % subbuf_pages;
 
-               if (signal_pending(current)) {
-                       if (!ret)
-                               ret = -ERESTARTSYS;
+               if (this_end >= nonpad_end) {
+                       spd.nr_pages++;
                        break;
                }
-
-               if (do_wakeup) {
-                       smp_mb();
-                       if (waitqueue_active(&pipe->wait))
-                               wake_up_interruptible_sync(&pipe->wait);
-                       kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
-                       do_wakeup = 0;
-               }
-
-               pipe->waiting_writers++;
-               pipe_wait(pipe);
-               pipe->waiting_writers--;
        }
 
-       if (pipe->inode)
-               mutex_unlock(&pipe->inode->i_mutex);
+       ret = 0;
+       if (!spd.nr_pages)
+               goto out;
 
-       if (do_wakeup) {
-               smp_mb();
-               if (waitqueue_active(&pipe->wait))
-                       wake_up_interruptible(&pipe->wait);
-               kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
-       }
+       ret = *nonpad_ret = splice_to_pipe(pipe, &spd);
+       if (ret < 0 || ret < total_len)
+               goto out;
 
-       return ret;
+        if (read_start + ret == nonpad_end)
+                ret += padding;
+
+out:
+       splice_shrink_spd(pipe, &spd);
+        return ret;
 }
 
 static ssize_t relay_file_splice_read(struct file *in,
@@ -1194,18 +1319,14 @@ static ssize_t relay_file_splice_read(struct file *in,
        ret = 0;
        spliced = 0;
 
-       while (len) {
+       while (len && !spliced) {
                ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
                if (ret < 0)
                        break;
                else if (!ret) {
-                       break;
-                       if (spliced)
-                               break;
-                       if (flags & SPLICE_F_NONBLOCK) {
+                       if (flags & SPLICE_F_NONBLOCK)
                                ret = -EAGAIN;
-                               break;
-                       }
+                       break;
                }
 
                *ppos += ret;
@@ -1241,4 +1362,4 @@ static __init int relay_init(void)
        return 0;
 }
 
-module_init(relay_init);
+early_initcall(relay_init);