ARM: tegra: pm375 : Enable utmi1
[linux-3.10.git] / fs / aio.c
index 55c4c76..ebd06fd 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -8,6 +8,8 @@
  *
  *     See ../COPYING for licensing terms.
  */
+#define pr_fmt(fmt) "%s: " fmt, __func__
+
 #include <linux/kernel.h>
 #include <linux/init.h>
 #include <linux/errno.h>
@@ -18,8 +20,6 @@
 #include <linux/backing-dev.h>
 #include <linux/uio.h>
 
-#define DEBUG 0
-
 #include <linux/sched.h>
 #include <linux/fs.h>
 #include <linux/file.h>
 #include <asm/kmap_types.h>
 #include <asm/uaccess.h>
 
-#if DEBUG > 1
-#define dprintk                printk
-#else
-#define dprintk(x...)  do { ; } while (0)
-#endif
+#define AIO_RING_MAGIC                 0xa10a10a1
+#define AIO_RING_COMPAT_FEATURES       1
+#define AIO_RING_INCOMPAT_FEATURES     0
+struct aio_ring {
+       unsigned        id;     /* kernel internal index number */
+       unsigned        nr;     /* number of io_events */
+       unsigned        head;
+       unsigned        tail;
+
+       unsigned        magic;
+       unsigned        compat_features;
+       unsigned        incompat_features;
+       unsigned        header_length;  /* size of aio_ring */
+
+
+       struct io_event         io_events[0];
+}; /* 128 bytes + ring size */
+
+#define AIO_RING_PAGES 8
+
+struct kioctx {
+       atomic_t                users;
+       atomic_t                dead;
+
+       /* This needs improving */
+       unsigned long           user_id;
+       struct hlist_node       list;
+
+       /*
+        * This is what userspace passed to io_setup(), it's not used for
+        * anything but counting against the global max_reqs quota.
+        *
+        * The real limit is nr_events - 1, which will be larger (see
+        * aio_setup_ring())
+        */
+       unsigned                max_reqs;
+
+       /* Size of ringbuffer, in units of struct io_event */
+       unsigned                nr_events;
+
+       unsigned long           mmap_base;
+       unsigned long           mmap_size;
+
+       struct page             **ring_pages;
+       long                    nr_pages;
+
+       struct rcu_head         rcu_head;
+       struct work_struct      rcu_work;
+
+       struct {
+               atomic_t        reqs_active;
+       } ____cacheline_aligned_in_smp;
+
+       struct {
+               spinlock_t      ctx_lock;
+               struct list_head active_reqs;   /* used for cancellation */
+       } ____cacheline_aligned_in_smp;
+
+       struct {
+               struct mutex    ring_lock;
+               wait_queue_head_t wait;
+       } ____cacheline_aligned_in_smp;
+
+       struct {
+               unsigned        tail;
+               spinlock_t      completion_lock;
+       } ____cacheline_aligned_in_smp;
+
+       struct page             *internal_pages[AIO_RING_PAGES];
+};
 
 /*------ sysctl variables----*/
 static DEFINE_SPINLOCK(aio_nr_lock);
@@ -54,18 +119,6 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request
 static struct kmem_cache       *kiocb_cachep;
 static struct kmem_cache       *kioctx_cachep;
 
-static struct workqueue_struct *aio_wq;
-
-/* Used for rare fput completion. */
-static void aio_fput_routine(struct work_struct *);
-static DECLARE_WORK(fput_work, aio_fput_routine);
-
-static DEFINE_SPINLOCK(fput_lock);
-static LIST_HEAD(fput_head);
-
-static void aio_kick_handler(struct work_struct *);
-static void aio_queue_work(struct kioctx *);
-
 /* aio_setup
  *     Creates the slab caches used by the aio routines, panic on
  *     failure as this is done early during the boot sequence.
@@ -75,10 +128,7 @@ static int __init aio_setup(void)
        kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
        kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 
-       aio_wq = alloc_workqueue("aio", 0, 1);  /* used to limit concurrency */
-       BUG_ON(!aio_wq);
-
-       pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
+       pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
 
        return 0;
 }
@@ -86,29 +136,21 @@ __initcall(aio_setup);
 
 static void aio_free_ring(struct kioctx *ctx)
 {
-       struct aio_ring_info *info = &ctx->ring_info;
        long i;
 
-       for (i=0; i<info->nr_pages; i++)
-               put_page(info->ring_pages[i]);
+       for (i = 0; i < ctx->nr_pages; i++)
+               put_page(ctx->ring_pages[i]);
 
-       if (info->mmap_size) {
-               BUG_ON(ctx->mm != current->mm);
-               vm_munmap(info->mmap_base, info->mmap_size);
-       }
-
-       if (info->ring_pages && info->ring_pages != info->internal_pages)
-               kfree(info->ring_pages);
-       info->ring_pages = NULL;
-       info->nr = 0;
+       if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
+               kfree(ctx->ring_pages);
 }
 
 static int aio_setup_ring(struct kioctx *ctx)
 {
        struct aio_ring *ring;
-       struct aio_ring_info *info = &ctx->ring_info;
        unsigned nr_events = ctx->max_reqs;
-       unsigned long size;
+       struct mm_struct *mm = current->mm;
+       unsigned long size, populate;
        int nr_pages;
 
        /* Compensate for the ring buffer's head/tail overlap entry */
@@ -123,43 +165,44 @@ static int aio_setup_ring(struct kioctx *ctx)
 
        nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
 
-       info->nr = 0;
-       info->ring_pages = info->internal_pages;
+       ctx->nr_events = 0;
+       ctx->ring_pages = ctx->internal_pages;
        if (nr_pages > AIO_RING_PAGES) {
-               info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
-               if (!info->ring_pages)
+               ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *),
+                                         GFP_KERNEL);
+               if (!ctx->ring_pages)
                        return -ENOMEM;
        }
 
-       info->mmap_size = nr_pages * PAGE_SIZE;
-       dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
-       down_write(&ctx->mm->mmap_sem);
-       info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size, 
-                                       PROT_READ|PROT_WRITE,
-                                       MAP_ANONYMOUS|MAP_PRIVATE, 0);
-       if (IS_ERR((void *)info->mmap_base)) {
-               up_write(&ctx->mm->mmap_sem);
-               info->mmap_size = 0;
+       ctx->mmap_size = nr_pages * PAGE_SIZE;
+       pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
+       down_write(&mm->mmap_sem);
+       ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size,
+                                      PROT_READ|PROT_WRITE,
+                                      MAP_ANONYMOUS|MAP_PRIVATE, 0, &populate);
+       if (IS_ERR((void *)ctx->mmap_base)) {
+               up_write(&mm->mmap_sem);
+               ctx->mmap_size = 0;
                aio_free_ring(ctx);
                return -EAGAIN;
        }
 
-       dprintk("mmap address: 0x%08lx\n", info->mmap_base);
-       info->nr_pages = get_user_pages(current, ctx->mm,
-                                       info->mmap_base, nr_pages, 
-                                       1, 0, info->ring_pages, NULL);
-       up_write(&ctx->mm->mmap_sem);
+       pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
+       ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,
+                                      1, 0, ctx->ring_pages, NULL);
+       up_write(&mm->mmap_sem);
 
-       if (unlikely(info->nr_pages != nr_pages)) {
+       if (unlikely(ctx->nr_pages != nr_pages)) {
                aio_free_ring(ctx);
                return -EAGAIN;
        }
+       if (populate)
+               mm_populate(ctx->mmap_base, populate);
 
-       ctx->user_id = info->mmap_base;
-
-       info->nr = nr_events;           /* trusted copy */
+       ctx->user_id = ctx->mmap_base;
+       ctx->nr_events = nr_events; /* trusted copy */
 
-       ring = kmap_atomic(info->ring_pages[0]);
+       ring = kmap_atomic(ctx->ring_pages[0]);
        ring->nr = nr_events;   /* user copy */
        ring->id = ctx->user_id;
        ring->head = ring->tail = 0;
@@ -168,72 +211,130 @@ static int aio_setup_ring(struct kioctx *ctx)
        ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
        ring->header_length = sizeof(struct aio_ring);
        kunmap_atomic(ring);
+       flush_dcache_page(ctx->ring_pages[0]);
 
        return 0;
 }
 
-
-/* aio_ring_event: returns a pointer to the event at the given index from
- * kmap_atomic().  Release the pointer with put_aio_ring_event();
- */
 #define AIO_EVENTS_PER_PAGE    (PAGE_SIZE / sizeof(struct io_event))
 #define AIO_EVENTS_FIRST_PAGE  ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 #define AIO_EVENTS_OFFSET      (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 
-#define aio_ring_event(info, nr) ({                                    \
-       unsigned pos = (nr) + AIO_EVENTS_OFFSET;                        \
-       struct io_event *__event;                                       \
-       __event = kmap_atomic(                                          \
-                       (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE]); \
-       __event += pos % AIO_EVENTS_PER_PAGE;                           \
-       __event;                                                        \
-})
-
-#define put_aio_ring_event(event) do {         \
-       struct io_event *__event = (event);     \
-       (void)__event;                          \
-       kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
-} while(0)
-
-static void ctx_rcu_free(struct rcu_head *head)
+void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
+{
+       struct kioctx *ctx = req->ki_ctx;
+       unsigned long flags;
+
+       spin_lock_irqsave(&ctx->ctx_lock, flags);
+
+       if (!req->ki_list.next)
+               list_add(&req->ki_list, &ctx->active_reqs);
+
+       req->ki_cancel = cancel;
+
+       spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+}
+EXPORT_SYMBOL(kiocb_set_cancel_fn);
+
+static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
+                       struct io_event *res)
+{
+       kiocb_cancel_fn *old, *cancel;
+       int ret = -EINVAL;
+
+       /*
+        * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
+        * actually has a cancel function, hence the cmpxchg()
+        */
+
+       cancel = ACCESS_ONCE(kiocb->ki_cancel);
+       do {
+               if (!cancel || cancel == KIOCB_CANCELLED)
+                       return ret;
+
+               old = cancel;
+               cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
+       } while (cancel != old);
+
+       atomic_inc(&kiocb->ki_users);
+       spin_unlock_irq(&ctx->ctx_lock);
+
+       memset(res, 0, sizeof(*res));
+       res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
+       res->data = kiocb->ki_user_data;
+       ret = cancel(kiocb, res);
+
+       spin_lock_irq(&ctx->ctx_lock);
+
+       return ret;
+}
+
+static void free_ioctx_rcu(struct rcu_head *head)
 {
        struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
        kmem_cache_free(kioctx_cachep, ctx);
 }
 
-/* __put_ioctx
- *     Called when the last user of an aio context has gone away,
- *     and the struct needs to be freed.
+/*
+ * When this function runs, the kioctx has been removed from the "hash table"
+ * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
+ * now it's safe to cancel any that need to be.
  */
-static void __put_ioctx(struct kioctx *ctx)
+static void free_ioctx(struct kioctx *ctx)
 {
-       unsigned nr_events = ctx->max_reqs;
-       BUG_ON(ctx->reqs_active);
+       struct aio_ring *ring;
+       struct io_event res;
+       struct kiocb *req;
+       unsigned head, avail;
 
-       cancel_delayed_work_sync(&ctx->wq);
-       aio_free_ring(ctx);
-       mmdrop(ctx->mm);
-       ctx->mm = NULL;
-       if (nr_events) {
-               spin_lock(&aio_nr_lock);
-               BUG_ON(aio_nr - nr_events > aio_nr);
-               aio_nr -= nr_events;
-               spin_unlock(&aio_nr_lock);
+       spin_lock_irq(&ctx->ctx_lock);
+
+       while (!list_empty(&ctx->active_reqs)) {
+               req = list_first_entry(&ctx->active_reqs,
+                                      struct kiocb, ki_list);
+
+               list_del_init(&req->ki_list);
+               kiocb_cancel(ctx, req, &res);
        }
-       pr_debug("__put_ioctx: freeing %p\n", ctx);
-       call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
 
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
-       return atomic_inc_not_zero(&kioctx->users);
+       spin_unlock_irq(&ctx->ctx_lock);
+
+       ring = kmap_atomic(ctx->ring_pages[0]);
+       head = ring->head;
+       kunmap_atomic(ring);
+
+       while (atomic_read(&ctx->reqs_active) > 0) {
+               wait_event(ctx->wait,
+                               head != ctx->tail ||
+                               atomic_read(&ctx->reqs_active) <= 0);
+
+               avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
+
+               atomic_sub(avail, &ctx->reqs_active);
+               head += avail;
+               head %= ctx->nr_events;
+       }
+
+       WARN_ON(atomic_read(&ctx->reqs_active) < 0);
+
+       aio_free_ring(ctx);
+
+       pr_debug("freeing %p\n", ctx);
+
+       /*
+        * Here the call_rcu() is between the wait_event() for reqs_active to
+        * hit 0, and freeing the ioctx.
+        *
+        * aio_complete() decrements reqs_active, but it has to touch the ioctx
+        * after to issue a wakeup so we use rcu.
+        */
+       call_rcu(&ctx->rcu_head, free_ioctx_rcu);
 }
 
-static inline void put_ioctx(struct kioctx *kioctx)
+static void put_ioctx(struct kioctx *ctx)
 {
-       BUG_ON(atomic_read(&kioctx->users) <= 0);
-       if (unlikely(atomic_dec_and_test(&kioctx->users)))
-               __put_ioctx(kioctx);
+       if (unlikely(atomic_dec_and_test(&ctx->users)))
+               free_ioctx(ctx);
 }
 
 /* ioctx_alloc
@@ -241,7 +342,7 @@ static inline void put_ioctx(struct kioctx *kioctx)
  */
 static struct kioctx *ioctx_alloc(unsigned nr_events)
 {
-       struct mm_struct *mm;
+       struct mm_struct *mm = current->mm;
        struct kioctx *ctx;
        int err = -ENOMEM;
 
@@ -260,17 +361,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
                return ERR_PTR(-ENOMEM);
 
        ctx->max_reqs = nr_events;
-       mm = ctx->mm = current->mm;
-       atomic_inc(&mm->mm_count);
 
        atomic_set(&ctx->users, 2);
+       atomic_set(&ctx->dead, 0);
        spin_lock_init(&ctx->ctx_lock);
-       spin_lock_init(&ctx->ring_info.ring_lock);
+       spin_lock_init(&ctx->completion_lock);
+       mutex_init(&ctx->ring_lock);
        init_waitqueue_head(&ctx->wait);
 
        INIT_LIST_HEAD(&ctx->active_reqs);
-       INIT_LIST_HEAD(&ctx->run_list);
-       INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
 
        if (aio_setup_ring(ctx) < 0)
                goto out_freectx;
@@ -290,64 +389,65 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
        spin_unlock(&mm->ioctx_lock);
 
-       dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
-               ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
+       pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
+                ctx, ctx->user_id, mm, ctx->nr_events);
        return ctx;
 
 out_cleanup:
        err = -EAGAIN;
        aio_free_ring(ctx);
 out_freectx:
-       mmdrop(mm);
        kmem_cache_free(kioctx_cachep, ctx);
-       dprintk("aio: error allocating ioctx %d\n", err);
+       pr_debug("error allocating ioctx %d\n", err);
        return ERR_PTR(err);
 }
 
-/* kill_ctx
- *     Cancels all outstanding aio requests on an aio context.  Used 
- *     when the processes owning a context have all exited to encourage 
+static void kill_ioctx_work(struct work_struct *work)
+{
+       struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
+
+       wake_up_all(&ctx->wait);
+       put_ioctx(ctx);
+}
+
+static void kill_ioctx_rcu(struct rcu_head *head)
+{
+       struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+
+       INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
+       schedule_work(&ctx->rcu_work);
+}
+
+/* kill_ioctx
+ *     Cancels all outstanding aio requests on an aio context.  Used
+ *     when the processes owning a context have all exited to encourage
  *     the rapid destruction of the kioctx.
  */
-static void kill_ctx(struct kioctx *ctx)
+static void kill_ioctx(struct mm_struct *mm, struct kioctx *ctx)
 {
-       int (*cancel)(struct kiocb *, struct io_event *);
-       struct task_struct *tsk = current;
-       DECLARE_WAITQUEUE(wait, tsk);
-       struct io_event res;
+       if (!atomic_xchg(&ctx->dead, 1)) {
+               spin_lock(&mm->ioctx_lock);
+               hlist_del_rcu(&ctx->list);
+               spin_unlock(&mm->ioctx_lock);
 
-       spin_lock_irq(&ctx->ctx_lock);
-       ctx->dead = 1;
-       while (!list_empty(&ctx->active_reqs)) {
-               struct list_head *pos = ctx->active_reqs.next;
-               struct kiocb *iocb = list_kiocb(pos);
-               list_del_init(&iocb->ki_list);
-               cancel = iocb->ki_cancel;
-               kiocbSetCancelled(iocb);
-               if (cancel) {
-                       iocb->ki_users++;
-                       spin_unlock_irq(&ctx->ctx_lock);
-                       cancel(iocb, &res);
-                       spin_lock_irq(&ctx->ctx_lock);
-               }
-       }
+               /*
+                * It'd be more correct to do this in free_ioctx(), after all
+                * the outstanding kiocbs have finished - but by then io_destroy
+                * has already returned, so io_setup() could potentially return
+                * -EAGAIN with no ioctxs actually in use (as far as userspace
+                *  could tell).
+                */
+               spin_lock(&aio_nr_lock);
+               BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+               aio_nr -= ctx->max_reqs;
+               spin_unlock(&aio_nr_lock);
 
-       if (!ctx->reqs_active)
-               goto out;
+               if (ctx->mmap_size)
+                       vm_munmap(ctx->mmap_base, ctx->mmap_size);
 
-       add_wait_queue(&ctx->wait, &wait);
-       set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-       while (ctx->reqs_active) {
-               spin_unlock_irq(&ctx->ctx_lock);
-               io_schedule();
-               set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-               spin_lock_irq(&ctx->ctx_lock);
+               /* Between hlist_del_rcu() and dropping the initial ref */
+               call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
        }
-       __set_task_state(tsk, TASK_RUNNING);
-       remove_wait_queue(&ctx->wait, &wait);
-
-out:
-       spin_unlock_irq(&ctx->ctx_lock);
 }
 
 /* wait_on_sync_kiocb:
@@ -355,9 +455,9 @@ out:
  */
 ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
 {
-       while (iocb->ki_users) {
+       while (atomic_read(&iocb->ki_users)) {
                set_current_state(TASK_UNINTERRUPTIBLE);
-               if (!iocb->ki_users)
+               if (!atomic_read(&iocb->ki_users))
                        break;
                io_schedule();
        }
@@ -366,28 +466,26 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
 }
 EXPORT_SYMBOL(wait_on_sync_kiocb);
 
-/* exit_aio: called when the last user of mm goes away.  At this point, 
- * there is no way for any new requests to be submited or any of the 
- * io_* syscalls to be called on the context.  However, there may be 
- * outstanding requests which hold references to the context; as they 
- * go away, they will call put_ioctx and release any pinned memory
- * associated with the request (held via struct page * references).
+/*
+ * exit_aio: called when the last user of mm goes away.  At this point, there is
+ * no way for any new requests to be submited or any of the io_* syscalls to be
+ * called on the context.
+ *
+ * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
+ * them.
  */
 void exit_aio(struct mm_struct *mm)
 {
        struct kioctx *ctx;
+       struct hlist_node *n;
 
-       while (!hlist_empty(&mm->ioctx_list)) {
-               ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
-               hlist_del_rcu(&ctx->list);
-
-               kill_ctx(ctx);
-
+       hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) {
                if (1 != atomic_read(&ctx->users))
                        printk(KERN_DEBUG
                                "exit_aio:ioctx still alive: %d %d %d\n",
-                               atomic_read(&ctx->users), ctx->dead,
-                               ctx->reqs_active);
+                               atomic_read(&ctx->users),
+                               atomic_read(&ctx->dead),
+                               atomic_read(&ctx->reqs_active));
                /*
                 * We don't need to bother with munmap() here -
                 * exit_mmap(mm) is coming and it'll unmap everything.
@@ -395,168 +493,50 @@ void exit_aio(struct mm_struct *mm)
                 * as indicator that it needs to unmap the area,
                 * just set it to 0; aio_free_ring() is the only
                 * place that uses ->mmap_size, so it's safe.
-                * That way we get all munmap done to current->mm -
-                * all other callers have ctx->mm == current->mm.
                 */
-               ctx->ring_info.mmap_size = 0;
-               put_ioctx(ctx);
+               ctx->mmap_size = 0;
+
+               kill_ioctx(mm, ctx);
        }
 }
 
 /* aio_get_req
- *     Allocate a slot for an aio request.  Increments the users count
+ *     Allocate a slot for an aio request.  Increments the ki_users count
  * of the kioctx so that the kioctx stays around until all requests are
  * complete.  Returns NULL if no requests are free.
  *
- * Returns with kiocb->users set to 2.  The io submit code path holds
+ * Returns with kiocb->ki_users set to 2.  The io submit code path holds
  * an extra reference while submitting the i/o.
  * This prevents races between the aio code path referencing the
  * req (after submitting it) and aio_complete() freeing the req.
  */
-static struct kiocb *__aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx)
 {
-       struct kiocb *req = NULL;
+       struct kiocb *req;
 
-       req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
-       if (unlikely(!req))
+       if (atomic_read(&ctx->reqs_active) >= ctx->nr_events)
                return NULL;
 
-       req->ki_flags = 0;
-       req->ki_users = 2;
-       req->ki_key = 0;
-       req->ki_ctx = ctx;
-       req->ki_cancel = NULL;
-       req->ki_retry = NULL;
-       req->ki_dtor = NULL;
-       req->private = NULL;
-       req->ki_iovec = NULL;
-       INIT_LIST_HEAD(&req->ki_run_list);
-       req->ki_eventfd = NULL;
-
-       return req;
-}
-
-/*
- * struct kiocb's are allocated in batches to reduce the number of
- * times the ctx lock is acquired and released.
- */
-#define KIOCB_BATCH_SIZE       32L
-struct kiocb_batch {
-       struct list_head head;
-       long count; /* number of requests left to allocate */
-};
-
-static void kiocb_batch_init(struct kiocb_batch *batch, long total)
-{
-       INIT_LIST_HEAD(&batch->head);
-       batch->count = total;
-}
-
-static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
-{
-       struct kiocb *req, *n;
-
-       if (list_empty(&batch->head))
-               return;
-
-       spin_lock_irq(&ctx->ctx_lock);
-       list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
-               list_del(&req->ki_batch);
-               list_del(&req->ki_list);
-               kmem_cache_free(kiocb_cachep, req);
-               ctx->reqs_active--;
-       }
-       if (unlikely(!ctx->reqs_active && ctx->dead))
-               wake_up_all(&ctx->wait);
-       spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * Allocate a batch of kiocbs.  This avoids taking and dropping the
- * context lock a lot during setup.
- */
-static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
-{
-       unsigned short allocated, to_alloc;
-       long avail;
-       bool called_fput = false;
-       struct kiocb *req, *n;
-       struct aio_ring *ring;
-
-       to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
-       for (allocated = 0; allocated < to_alloc; allocated++) {
-               req = __aio_get_req(ctx);
-               if (!req)
-                       /* allocation failed, go with what we've got */
-                       break;
-               list_add(&req->ki_batch, &batch->head);
-       }
-
-       if (allocated == 0)
-               goto out;
-
-retry:
-       spin_lock_irq(&ctx->ctx_lock);
-       ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
-
-       avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
-       BUG_ON(avail < 0);
-       if (avail == 0 && !called_fput) {
-               /*
-                * Handle a potential starvation case.  It is possible that
-                * we hold the last reference on a struct file, causing us
-                * to delay the final fput to non-irq context.  In this case,
-                * ctx->reqs_active is artificially high.  Calling the fput
-                * routine here may free up a slot in the event completion
-                * ring, allowing this allocation to succeed.
-                */
-               kunmap_atomic(ring);
-               spin_unlock_irq(&ctx->ctx_lock);
-               aio_fput_routine(NULL);
-               called_fput = true;
-               goto retry;
-       }
-
-       if (avail < allocated) {
-               /* Trim back the number of requests. */
-               list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
-                       list_del(&req->ki_batch);
-                       kmem_cache_free(kiocb_cachep, req);
-                       if (--allocated <= avail)
-                               break;
-               }
-       }
-
-       batch->count -= allocated;
-       list_for_each_entry(req, &batch->head, ki_batch) {
-               list_add(&req->ki_list, &ctx->active_reqs);
-               ctx->reqs_active++;
-       }
+       if (atomic_inc_return(&ctx->reqs_active) > ctx->nr_events - 1)
+               goto out_put;
 
-       kunmap_atomic(ring);
-       spin_unlock_irq(&ctx->ctx_lock);
-
-out:
-       return allocated;
-}
+       req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
+       if (unlikely(!req))
+               goto out_put;
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx,
-                                       struct kiocb_batch *batch)
-{
-       struct kiocb *req;
+       atomic_set(&req->ki_users, 2);
+       req->ki_ctx = ctx;
 
-       if (list_empty(&batch->head))
-               if (kiocb_batch_refill(ctx, batch) == 0)
-                       return NULL;
-       req = list_first_entry(&batch->head, struct kiocb, ki_batch);
-       list_del(&req->ki_batch);
        return req;
+out_put:
+       atomic_dec(&ctx->reqs_active);
+       return NULL;
 }
 
-static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
+static void kiocb_free(struct kiocb *req)
 {
-       assert_spin_locked(&ctx->ctx_lock);
-
+       if (req->ki_filp)
+               fput(req->ki_filp);
        if (req->ki_eventfd != NULL)
                eventfd_ctx_put(req->ki_eventfd);
        if (req->ki_dtor)
@@ -564,90 +544,12 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
        if (req->ki_iovec != &req->ki_inline_vec)
                kfree(req->ki_iovec);
        kmem_cache_free(kiocb_cachep, req);
-       ctx->reqs_active--;
-
-       if (unlikely(!ctx->reqs_active && ctx->dead))
-               wake_up_all(&ctx->wait);
-}
-
-static void aio_fput_routine(struct work_struct *data)
-{
-       spin_lock_irq(&fput_lock);
-       while (likely(!list_empty(&fput_head))) {
-               struct kiocb *req = list_kiocb(fput_head.next);
-               struct kioctx *ctx = req->ki_ctx;
-
-               list_del(&req->ki_list);
-               spin_unlock_irq(&fput_lock);
-
-               /* Complete the fput(s) */
-               if (req->ki_filp != NULL)
-                       fput(req->ki_filp);
-
-               /* Link the iocb into the context's free list */
-               rcu_read_lock();
-               spin_lock_irq(&ctx->ctx_lock);
-               really_put_req(ctx, req);
-               /*
-                * at that point ctx might've been killed, but actual
-                * freeing is RCU'd
-                */
-               spin_unlock_irq(&ctx->ctx_lock);
-               rcu_read_unlock();
-
-               spin_lock_irq(&fput_lock);
-       }
-       spin_unlock_irq(&fput_lock);
-}
-
-/* __aio_put_req
- *     Returns true if this put was the last user of the request.
- */
-static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
-{
-       dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
-               req, atomic_long_read(&req->ki_filp->f_count));
-
-       assert_spin_locked(&ctx->ctx_lock);
-
-       req->ki_users--;
-       BUG_ON(req->ki_users < 0);
-       if (likely(req->ki_users))
-               return 0;
-       list_del(&req->ki_list);                /* remove from active_reqs */
-       req->ki_cancel = NULL;
-       req->ki_retry = NULL;
-
-       /*
-        * Try to optimize the aio and eventfd file* puts, by avoiding to
-        * schedule work in case it is not final fput() time. In normal cases,
-        * we would not be holding the last reference to the file*, so
-        * this function will be executed w/out any aio kthread wakeup.
-        */
-       if (unlikely(!fput_atomic(req->ki_filp))) {
-               spin_lock(&fput_lock);
-               list_add(&req->ki_list, &fput_head);
-               spin_unlock(&fput_lock);
-               schedule_work(&fput_work);
-       } else {
-               req->ki_filp = NULL;
-               really_put_req(ctx, req);
-       }
-       return 1;
 }
 
-/* aio_put_req
- *     Returns true if this put was the last user of the kiocb,
- *     false if the request is still in use.
- */
-int aio_put_req(struct kiocb *req)
+void aio_put_req(struct kiocb *req)
 {
-       struct kioctx *ctx = req->ki_ctx;
-       int ret;
-       spin_lock_irq(&ctx->ctx_lock);
-       ret = __aio_put_req(ctx, req);
-       spin_unlock_irq(&ctx->ctx_lock);
-       return ret;
+       if (atomic_dec_and_test(&req->ki_users))
+               kiocb_free(req);
 }
 EXPORT_SYMBOL(aio_put_req);
 
@@ -655,18 +557,12 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 {
        struct mm_struct *mm = current->mm;
        struct kioctx *ctx, *ret = NULL;
-       struct hlist_node *n;
 
        rcu_read_lock();
 
-       hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
-               /*
-                * RCU protects us against accessing freed memory but
-                * we have to be careful not to get a reference when the
-                * reference count already dropped to 0 (ctx->dead test
-                * is unreliable because of races).
-                */
-               if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+       hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) {
+               if (ctx->user_id == ctx_id) {
+                       atomic_inc(&ctx->users);
                        ret = ctx;
                        break;
                }
@@ -676,295 +572,16 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
        return ret;
 }
 
-/*
- * Queue up a kiocb to be retried. Assumes that the kiocb
- * has already been marked as kicked, and places it on
- * the retry run list for the corresponding ioctx, if it
- * isn't already queued. Returns 1 if it actually queued
- * the kiocb (to tell the caller to activate the work
- * queue to process it), or 0, if it found that it was
- * already queued.
- */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
-{
-       struct kioctx *ctx = iocb->ki_ctx;
-
-       assert_spin_locked(&ctx->ctx_lock);
-
-       if (list_empty(&iocb->ki_run_list)) {
-               list_add_tail(&iocb->ki_run_list,
-                       &ctx->run_list);
-               return 1;
-       }
-       return 0;
-}
-
-/* aio_run_iocb
- *     This is the core aio execution routine. It is
- *     invoked both for initial i/o submission and
- *     subsequent retries via the aio_kick_handler.
- *     Expects to be invoked with iocb->ki_ctx->lock
- *     already held. The lock is released and reacquired
- *     as needed during processing.
- *
- * Calls the iocb retry method (already setup for the
- * iocb on initial submission) for operation specific
- * handling, but takes care of most of common retry
- * execution details for a given iocb. The retry method
- * needs to be non-blocking as far as possible, to avoid
- * holding up other iocbs waiting to be serviced by the
- * retry kernel thread.
- *
- * The trickier parts in this code have to do with
- * ensuring that only one retry instance is in progress
- * for a given iocb at any time. Providing that guarantee
- * simplifies the coding of individual aio operations as
- * it avoids various potential races.
- */
-static ssize_t aio_run_iocb(struct kiocb *iocb)
-{
-       struct kioctx   *ctx = iocb->ki_ctx;
-       ssize_t (*retry)(struct kiocb *);
-       ssize_t ret;
-
-       if (!(retry = iocb->ki_retry)) {
-               printk("aio_run_iocb: iocb->ki_retry = NULL\n");
-               return 0;
-       }
-
-       /*
-        * We don't want the next retry iteration for this
-        * operation to start until this one has returned and
-        * updated the iocb state. However, wait_queue functions
-        * can trigger a kick_iocb from interrupt context in the
-        * meantime, indicating that data is available for the next
-        * iteration. We want to remember that and enable the
-        * next retry iteration _after_ we are through with
-        * this one.
-        *
-        * So, in order to be able to register a "kick", but
-        * prevent it from being queued now, we clear the kick
-        * flag, but make the kick code *think* that the iocb is
-        * still on the run list until we are actually done.
-        * When we are done with this iteration, we check if
-        * the iocb was kicked in the meantime and if so, queue
-        * it up afresh.
-        */
-
-       kiocbClearKicked(iocb);
-
-       /*
-        * This is so that aio_complete knows it doesn't need to
-        * pull the iocb off the run list (We can't just call
-        * INIT_LIST_HEAD because we don't want a kick_iocb to
-        * queue this on the run list yet)
-        */
-       iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
-       spin_unlock_irq(&ctx->ctx_lock);
-
-       /* Quit retrying if the i/o has been cancelled */
-       if (kiocbIsCancelled(iocb)) {
-               ret = -EINTR;
-               aio_complete(iocb, ret, 0);
-               /* must not access the iocb after this */
-               goto out;
-       }
-
-       /*
-        * Now we are all set to call the retry method in async
-        * context.
-        */
-       ret = retry(iocb);
-
-       if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
-               /*
-                * There's no easy way to restart the syscall since other AIO's
-                * may be already running. Just fail this IO with EINTR.
-                */
-               if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
-                            ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
-                       ret = -EINTR;
-               aio_complete(iocb, ret, 0);
-       }
-out:
-       spin_lock_irq(&ctx->ctx_lock);
-
-       if (-EIOCBRETRY == ret) {
-               /*
-                * OK, now that we are done with this iteration
-                * and know that there is more left to go,
-                * this is where we let go so that a subsequent
-                * "kick" can start the next iteration
-                */
-
-               /* will make __queue_kicked_iocb succeed from here on */
-               INIT_LIST_HEAD(&iocb->ki_run_list);
-               /* we must queue the next iteration ourselves, if it
-                * has already been kicked */
-               if (kiocbIsKicked(iocb)) {
-                       __queue_kicked_iocb(iocb);
-
-                       /*
-                        * __queue_kicked_iocb will always return 1 here, because
-                        * iocb->ki_run_list is empty at this point so it should
-                        * be safe to unconditionally queue the context into the
-                        * work queue.
-                        */
-                       aio_queue_work(ctx);
-               }
-       }
-       return ret;
-}
-
-/*
- * __aio_run_iocbs:
- *     Process all pending retries queued on the ioctx
- *     run list.
- * Assumes it is operating within the aio issuer's mm
- * context.
- */
-static int __aio_run_iocbs(struct kioctx *ctx)
-{
-       struct kiocb *iocb;
-       struct list_head run_list;
-
-       assert_spin_locked(&ctx->ctx_lock);
-
-       list_replace_init(&ctx->run_list, &run_list);
-       while (!list_empty(&run_list)) {
-               iocb = list_entry(run_list.next, struct kiocb,
-                       ki_run_list);
-               list_del(&iocb->ki_run_list);
-               /*
-                * Hold an extra reference while retrying i/o.
-                */
-               iocb->ki_users++;       /* grab extra reference */
-               aio_run_iocb(iocb);
-               __aio_put_req(ctx, iocb);
-       }
-       if (!list_empty(&ctx->run_list))
-               return 1;
-       return 0;
-}
-
-static void aio_queue_work(struct kioctx * ctx)
-{
-       unsigned long timeout;
-       /*
-        * if someone is waiting, get the work started right
-        * away, otherwise, use a longer delay
-        */
-       smp_mb();
-       if (waitqueue_active(&ctx->wait))
-               timeout = 1;
-       else
-               timeout = HZ/10;
-       queue_delayed_work(aio_wq, &ctx->wq, timeout);
-}
-
-/*
- * aio_run_all_iocbs:
- *     Process all pending retries queued on the ioctx
- *     run list, and keep running them until the list
- *     stays empty.
- * Assumes it is operating within the aio issuer's mm context.
- */
-static inline void aio_run_all_iocbs(struct kioctx *ctx)
-{
-       spin_lock_irq(&ctx->ctx_lock);
-       while (__aio_run_iocbs(ctx))
-               ;
-       spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * aio_kick_handler:
- *     Work queue handler triggered to process pending
- *     retries on an ioctx. Takes on the aio issuer's
- *     mm context before running the iocbs, so that
- *     copy_xxx_user operates on the issuer's address
- *      space.
- * Run on aiod's context.
- */
-static void aio_kick_handler(struct work_struct *work)
-{
-       struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
-       mm_segment_t oldfs = get_fs();
-       struct mm_struct *mm;
-       int requeue;
-
-       set_fs(USER_DS);
-       use_mm(ctx->mm);
-       spin_lock_irq(&ctx->ctx_lock);
-       requeue =__aio_run_iocbs(ctx);
-       mm = ctx->mm;
-       spin_unlock_irq(&ctx->ctx_lock);
-       unuse_mm(mm);
-       set_fs(oldfs);
-       /*
-        * we're in a worker thread already; no point using non-zero delay
-        */
-       if (requeue)
-               queue_delayed_work(aio_wq, &ctx->wq, 0);
-}
-
-
-/*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
- */
-static void try_queue_kicked_iocb(struct kiocb *iocb)
-{
-       struct kioctx   *ctx = iocb->ki_ctx;
-       unsigned long flags;
-       int run = 0;
-
-       spin_lock_irqsave(&ctx->ctx_lock, flags);
-       /* set this inside the lock so that we can't race with aio_run_iocb()
-        * testing it and putting the iocb on the run list under the lock */
-       if (!kiocbTryKick(iocb))
-               run = __queue_kicked_iocb(iocb);
-       spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-       if (run)
-               aio_queue_work(ctx);
-}
-
-/*
- * kick_iocb:
- *      Called typically from a wait queue callback context
- *      to trigger a retry of the iocb.
- *      The retry is usually executed by aio workqueue
- *      threads (See aio_kick_handler).
- */
-void kick_iocb(struct kiocb *iocb)
-{
-       /* sync iocbs are easy: they can only ever be executing from a 
-        * single context. */
-       if (is_sync_kiocb(iocb)) {
-               kiocbSetKicked(iocb);
-               wake_up_process(iocb->ki_obj.tsk);
-               return;
-       }
-
-       try_queue_kicked_iocb(iocb);
-}
-EXPORT_SYMBOL(kick_iocb);
-
 /* aio_complete
  *     Called when the io request on the given iocb is complete.
- *     Returns true if this is the last user of the request.  The 
- *     only other user of the request can be the cancellation code.
  */
-int aio_complete(struct kiocb *iocb, long res, long res2)
+void aio_complete(struct kiocb *iocb, long res, long res2)
 {
        struct kioctx   *ctx = iocb->ki_ctx;
-       struct aio_ring_info    *info;
        struct aio_ring *ring;
-       struct io_event *event;
+       struct io_event *ev_page, *event;
        unsigned long   flags;
-       unsigned long   tail;
-       int             ret;
+       unsigned tail, pos;
 
        /*
         * Special case handling for sync iocbs:
@@ -974,61 +591,81 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
         *  - the sync task helpfully left a reference to itself in the iocb
         */
        if (is_sync_kiocb(iocb)) {
-               BUG_ON(iocb->ki_users != 1);
+               BUG_ON(atomic_read(&iocb->ki_users) != 1);
                iocb->ki_user_data = res;
-               iocb->ki_users = 0;
+               atomic_set(&iocb->ki_users, 0);
                wake_up_process(iocb->ki_obj.tsk);
-               return 1;
+               return;
        }
 
-       info = &ctx->ring_info;
-
-       /* add a completion event to the ring buffer.
-        * must be done holding ctx->ctx_lock to prevent
-        * other code from messing with the tail
-        * pointer since we might be called from irq
-        * context.
+       /*
+        * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+        * need to issue a wakeup after decrementing reqs_active.
         */
-       spin_lock_irqsave(&ctx->ctx_lock, flags);
+       rcu_read_lock();
+
+       if (iocb->ki_list.next) {
+               unsigned long flags;
 
-       if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
-               list_del_init(&iocb->ki_run_list);
+               spin_lock_irqsave(&ctx->ctx_lock, flags);
+               list_del(&iocb->ki_list);
+               spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+       }
 
        /*
         * cancelled requests don't get events, userland was given one
         * when the event got cancelled.
         */
-       if (kiocbIsCancelled(iocb))
+       if (unlikely(xchg(&iocb->ki_cancel,
+                         KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+               atomic_dec(&ctx->reqs_active);
+               /* Still need the wake_up in case free_ioctx is waiting */
                goto put_rq;
+       }
+
+       /*
+        * Add a completion event to the ring buffer. Must be done holding
+        * ctx->ctx_lock to prevent other code from messing with the tail
+        * pointer since we might be called from irq context.
+        */
+       spin_lock_irqsave(&ctx->completion_lock, flags);
 
-       ring = kmap_atomic(info->ring_pages[0]);
+       tail = ctx->tail;
+       pos = tail + AIO_EVENTS_OFFSET;
 
-       tail = info->tail;
-       event = aio_ring_event(info, tail);
-       if (++tail >= info->nr)
+       if (++tail >= ctx->nr_events)
                tail = 0;
 
+       ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
+       event = ev_page + pos % AIO_EVENTS_PER_PAGE;
+
        event->obj = (u64)(unsigned long)iocb->ki_obj.user;
        event->data = iocb->ki_user_data;
        event->res = res;
        event->res2 = res2;
 
-       dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n",
-               ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
-               res, res2);
+       kunmap_atomic(ev_page);
+       flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
+
+       pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
+                ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
+                res, res2);
 
        /* after flagging the request as done, we
         * must never even look at it again
         */
        smp_wmb();      /* make event visible before updating tail */
 
-       info->tail = tail;
-       ring->tail = tail;
+       ctx->tail = tail;
 
-       put_aio_ring_event(event);
+       ring = kmap_atomic(ctx->ring_pages[0]);
+       ring->tail = tail;
        kunmap_atomic(ring);
+       flush_dcache_page(ctx->ring_pages[0]);
+
+       spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
-       pr_debug("added to ring %p at [%lu]\n", iocb, tail);
+       pr_debug("added to ring %p at [%u]\n", iocb, tail);
 
        /*
         * Check if the user asked us to deliver the result through an
@@ -1040,7 +677,7 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
 
 put_rq:
        /* everything turned out well, dispose of the aiocb. */
-       ret = __aio_put_req(ctx, iocb);
+       aio_put_req(iocb);
 
        /*
         * We have to order our ring_info tail store above and test
@@ -1053,233 +690,133 @@ put_rq:
        if (waitqueue_active(&ctx->wait))
                wake_up(&ctx->wait);
 
-       spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-       return ret;
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL(aio_complete);
 
-/* aio_read_evt
- *     Pull an event off of the ioctx's event ring.  Returns the number of 
- *     events fetched (0 or 1 ;-)
- *     FIXME: make this use cmpxchg.
- *     TODO: make the ringbuffer user mmap()able (requires FIXME).
+/* aio_read_events
+ *     Pull an event off of the ioctx's event ring.  Returns the number of
+ *     events fetched
  */
-static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
+static long aio_read_events_ring(struct kioctx *ctx,
+                                struct io_event __user *event, long nr)
 {
-       struct aio_ring_info *info = &ioctx->ring_info;
        struct aio_ring *ring;
-       unsigned long head;
-       int ret = 0;
-
-       ring = kmap_atomic(info->ring_pages[0]);
-       dprintk("in aio_read_evt h%lu t%lu m%lu\n",
-                (unsigned long)ring->head, (unsigned long)ring->tail,
-                (unsigned long)ring->nr);
-
-       if (ring->head == ring->tail)
-               goto out;
+       unsigned head, pos;
+       long ret = 0;
+       int copy_ret;
 
-       spin_lock(&info->ring_lock);
-
-       head = ring->head % info->nr;
-       if (head != ring->tail) {
-               struct io_event *evp = aio_ring_event(info, head);
-               *ent = *evp;
-               head = (head + 1) % info->nr;
-               smp_mb(); /* finish reading the event before updatng the head */
-               ring->head = head;
-               ret = 1;
-               put_aio_ring_event(evp);
-       }
-       spin_unlock(&info->ring_lock);
+       mutex_lock(&ctx->ring_lock);
 
-out:
+       ring = kmap_atomic(ctx->ring_pages[0]);
+       head = ring->head;
        kunmap_atomic(ring);
-       dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
-                (unsigned long)ring->head, (unsigned long)ring->tail);
-       return ret;
-}
-
-struct aio_timeout {
-       struct timer_list       timer;
-       int                     timed_out;
-       struct task_struct      *p;
-};
 
-static void timeout_func(unsigned long data)
-{
-       struct aio_timeout *to = (struct aio_timeout *)data;
+       pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr_events);
 
-       to->timed_out = 1;
-       wake_up_process(to->p);
-}
+       if (head == ctx->tail)
+               goto out;
 
-static inline void init_timeout(struct aio_timeout *to)
-{
-       setup_timer_on_stack(&to->timer, timeout_func, (unsigned long) to);
-       to->timed_out = 0;
-       to->p = current;
-}
+       while (ret < nr) {
+               long avail;
+               struct io_event *ev;
+               struct page *page;
 
-static inline void set_timeout(long start_jiffies, struct aio_timeout *to,
-                              const struct timespec *ts)
-{
-       to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
-       if (time_after(to->timer.expires, jiffies))
-               add_timer(&to->timer);
-       else
-               to->timed_out = 1;
-}
+               avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
+               if (head == ctx->tail)
+                       break;
 
-static inline void clear_timeout(struct aio_timeout *to)
-{
-       del_singleshot_timer_sync(&to->timer);
-}
+               avail = min(avail, nr - ret);
+               avail = min_t(long, avail, AIO_EVENTS_PER_PAGE -
+                           ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
 
-static int read_events(struct kioctx *ctx,
-                       long min_nr, long nr,
-                       struct io_event __user *event,
-                       struct timespec __user *timeout)
-{
-       long                    start_jiffies = jiffies;
-       struct task_struct      *tsk = current;
-       DECLARE_WAITQUEUE(wait, tsk);
-       int                     ret;
-       int                     i = 0;
-       struct io_event         ent;
-       struct aio_timeout      to;
-       int                     retry = 0;
-
-       /* needed to zero any padding within an entry (there shouldn't be 
-        * any, but C is fun!
-        */
-       memset(&ent, 0, sizeof(ent));
-retry:
-       ret = 0;
-       while (likely(i < nr)) {
-               ret = aio_read_evt(ctx, &ent);
-               if (unlikely(ret <= 0))
-                       break;
+               pos = head + AIO_EVENTS_OFFSET;
+               page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
+               pos %= AIO_EVENTS_PER_PAGE;
 
-               dprintk("read event: %Lx %Lx %Lx %Lx\n",
-                       ent.data, ent.obj, ent.res, ent.res2);
+               ev = kmap(page);
+               copy_ret = copy_to_user(event + ret, ev + pos,
+                                       sizeof(*ev) * avail);
+               kunmap(page);
 
-               /* Could we split the check in two? */
-               ret = -EFAULT;
-               if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
-                       dprintk("aio: lost an event due to EFAULT.\n");
-                       break;
+               if (unlikely(copy_ret)) {
+                       ret = -EFAULT;
+                       goto out;
                }
-               ret = 0;
 
-               /* Good, event copied to userland, update counts. */
-               event ++;
-               i ++;
+               ret += avail;
+               head += avail;
+               head %= ctx->nr_events;
        }
 
-       if (min_nr <= i)
-               return i;
-       if (ret)
-               return ret;
+       ring = kmap_atomic(ctx->ring_pages[0]);
+       ring->head = head;
+       kunmap_atomic(ring);
+       flush_dcache_page(ctx->ring_pages[0]);
 
-       /* End fast path */
+       pr_debug("%li  h%u t%u\n", ret, head, ctx->tail);
 
-       /* racey check, but it gets redone */
-       if (!retry && unlikely(!list_empty(&ctx->run_list))) {
-               retry = 1;
-               aio_run_all_iocbs(ctx);
-               goto retry;
-       }
+       atomic_sub(ret, &ctx->reqs_active);
+out:
+       mutex_unlock(&ctx->ring_lock);
 
-       init_timeout(&to);
-       if (timeout) {
-               struct timespec ts;
-               ret = -EFAULT;
-               if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
-                       goto out;
+       return ret;
+}
 
-               set_timeout(start_jiffies, &to, &ts);
-       }
+static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
+                           struct io_event __user *event, long *i)
+{
+       long ret = aio_read_events_ring(ctx, event + *i, nr - *i);
 
-       while (likely(i < nr)) {
-               add_wait_queue_exclusive(&ctx->wait, &wait);
-               do {
-                       set_task_state(tsk, TASK_INTERRUPTIBLE);
-                       ret = aio_read_evt(ctx, &ent);
-                       if (ret)
-                               break;
-                       if (min_nr <= i)
-                               break;
-                       if (unlikely(ctx->dead)) {
-                               ret = -EINVAL;
-                               break;
-                       }
-                       if (to.timed_out)       /* Only check after read evt */
-                               break;
-                       /* Try to only show up in io wait if there are ops
-                        *  in flight */
-                       if (ctx->reqs_active)
-                               io_schedule();
-                       else
-                               schedule();
-                       if (signal_pending(tsk)) {
-                               ret = -EINTR;
-                               break;
-                       }
-                       /*ret = aio_read_evt(ctx, &ent);*/
-               } while (1) ;
-
-               set_task_state(tsk, TASK_RUNNING);
-               remove_wait_queue(&ctx->wait, &wait);
-
-               if (unlikely(ret <= 0))
-                       break;
+       if (ret > 0)
+               *i += ret;
 
-               ret = -EFAULT;
-               if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
-                       dprintk("aio: lost an event due to EFAULT.\n");
-                       break;
-               }
+       if (unlikely(atomic_read(&ctx->dead)))
+               ret = -EINVAL;
 
-               /* Good, event copied to userland, update counts. */
-               event ++;
-               i ++;
-       }
+       if (!*i)
+               *i = ret;
 
-       if (timeout)
-               clear_timeout(&to);
-out:
-       destroy_timer_on_stack(&to.timer);
-       return i ? i : ret;
+       return ret < 0 || *i >= min_nr;
 }
 
-/* Take an ioctx and remove it from the list of ioctx's.  Protects 
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
+static long read_events(struct kioctx *ctx, long min_nr, long nr,
+                       struct io_event __user *event,
+                       struct timespec __user *timeout)
 {
-       struct mm_struct *mm = current->mm;
-       int was_dead;
+       ktime_t until = { .tv64 = KTIME_MAX };
+       long ret = 0;
 
-       /* delete the entry from the list is someone else hasn't already */
-       spin_lock(&mm->ioctx_lock);
-       was_dead = ioctx->dead;
-       ioctx->dead = 1;
-       hlist_del_rcu(&ioctx->list);
-       spin_unlock(&mm->ioctx_lock);
+       if (timeout) {
+               struct timespec ts;
 
-       dprintk("aio_release(%p)\n", ioctx);
-       if (likely(!was_dead))
-               put_ioctx(ioctx);       /* twice for the list */
+               if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
+                       return -EFAULT;
 
-       kill_ctx(ioctx);
+               until = timespec_to_ktime(ts);
+       }
 
        /*
-        * Wake up any waiters.  The setting of ctx->dead must be seen
-        * by other CPUs at this point.  Right now, we rely on the
-        * locking done by the above calls to ensure this consistency.
+        * Note that aio_read_events() is being called as the conditional - i.e.
+        * we're calling it after prepare_to_wait() has set task state to
+        * TASK_INTERRUPTIBLE.
+        *
+        * But aio_read_events() can block, and if it blocks it's going to flip
+        * the task state back to TASK_RUNNING.
+        *
+        * This should be ok, provided it doesn't flip the state back to
+        * TASK_RUNNING and return 0 too much - that causes us to spin. That
+        * will only happen if the mutex_lock() call blocks, and we then find
+        * the ringbuffer empty. So in practice we should be ok, but it's
+        * something to be aware of when touching this code.
         */
-       wake_up_all(&ioctx->wait);
+       wait_event_interruptible_hrtimeout(ctx->wait,
+                       aio_read_events(ctx, min_nr, nr, event, &ret), until);
+
+       if (!ret && signal_pending(current))
+               ret = -EINTR;
+
+       return ret;
 }
 
 /* sys_io_setup:
@@ -1317,7 +854,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
        if (!IS_ERR(ioctx)) {
                ret = put_user(ioctx->user_id, ctxp);
                if (ret)
-                       io_destroy(ioctx);
+                       kill_ioctx(current->mm, ioctx);
                put_ioctx(ioctx);
        }
 
@@ -1335,7 +872,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
 {
        struct kioctx *ioctx = lookup_ioctx(ctx);
        if (likely(NULL != ioctx)) {
-               io_destroy(ioctx);
+               kill_ioctx(current->mm, ioctx);
                put_ioctx(ioctx);
                return 0;
        }
@@ -1366,29 +903,22 @@ static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
        BUG_ON(ret > 0 && iocb->ki_left == 0);
 }
 
-static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
+typedef ssize_t (aio_rw_op)(struct kiocb *, const struct iovec *,
+                           unsigned long, loff_t);
+
+static ssize_t aio_rw_vect_retry(struct kiocb *iocb, int rw, aio_rw_op *rw_op)
 {
        struct file *file = iocb->ki_filp;
        struct address_space *mapping = file->f_mapping;
        struct inode *inode = mapping->host;
-       ssize_t (*rw_op)(struct kiocb *, const struct iovec *,
-                        unsigned long, loff_t);
        ssize_t ret = 0;
-       unsigned short opcode;
-
-       if ((iocb->ki_opcode == IOCB_CMD_PREADV) ||
-               (iocb->ki_opcode == IOCB_CMD_PREAD)) {
-               rw_op = file->f_op->aio_read;
-               opcode = IOCB_CMD_PREADV;
-       } else {
-               rw_op = file->f_op->aio_write;
-               opcode = IOCB_CMD_PWRITEV;
-       }
 
        /* This matches the pread()/pwrite() logic */
        if (iocb->ki_pos < 0)
                return -EINVAL;
 
+       if (rw == WRITE)
+               file_start_write(file);
        do {
                ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
                            iocb->ki_nr_segs - iocb->ki_cur_seg,
@@ -1399,8 +929,10 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
        /* retry all partial writes.  retry partial reads as long as its a
         * regular file. */
        } while (ret > 0 && iocb->ki_left > 0 &&
-                (opcode == IOCB_CMD_PWRITEV ||
+                (rw == WRITE ||
                  (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
+       if (rw == WRITE)
+               file_end_write(file);
 
        /* This means we must have transferred all that we could */
        /* No need to retry anymore */
@@ -1409,81 +941,49 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
 
        /* If we managed to write some out we return that, rather than
         * the eventual error. */
-       if (opcode == IOCB_CMD_PWRITEV
-           && ret < 0 && ret != -EIOCBQUEUED && ret != -EIOCBRETRY
+       if (rw == WRITE
+           && ret < 0 && ret != -EIOCBQUEUED
            && iocb->ki_nbytes - iocb->ki_left)
                ret = iocb->ki_nbytes - iocb->ki_left;
 
        return ret;
 }
 
-static ssize_t aio_fdsync(struct kiocb *iocb)
-{
-       struct file *file = iocb->ki_filp;
-       ssize_t ret = -EINVAL;
-
-       if (file->f_op->aio_fsync)
-               ret = file->f_op->aio_fsync(iocb, 1);
-       return ret;
-}
-
-static ssize_t aio_fsync(struct kiocb *iocb)
-{
-       struct file *file = iocb->ki_filp;
-       ssize_t ret = -EINVAL;
-
-       if (file->f_op->aio_fsync)
-               ret = file->f_op->aio_fsync(iocb, 0);
-       return ret;
-}
-
-static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb, bool compat)
+static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat)
 {
        ssize_t ret;
 
+       kiocb->ki_nr_segs = kiocb->ki_nbytes;
+
 #ifdef CONFIG_COMPAT
        if (compat)
-               ret = compat_rw_copy_check_uvector(type,
+               ret = compat_rw_copy_check_uvector(rw,
                                (struct compat_iovec __user *)kiocb->ki_buf,
-                               kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
+                               kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
                                &kiocb->ki_iovec);
        else
 #endif
-               ret = rw_copy_check_uvector(type,
+               ret = rw_copy_check_uvector(rw,
                                (struct iovec __user *)kiocb->ki_buf,
-                               kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
+                               kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
                                &kiocb->ki_iovec);
        if (ret < 0)
-               goto out;
-
-       ret = rw_verify_area(type, kiocb->ki_filp, &kiocb->ki_pos, ret);
-       if (ret < 0)
-               goto out;
+               return ret;
 
-       kiocb->ki_nr_segs = kiocb->ki_nbytes;
-       kiocb->ki_cur_seg = 0;
-       /* ki_nbytes/left now reflect bytes instead of segs */
+       /* ki_nbytes now reflect bytes instead of segs */
        kiocb->ki_nbytes = ret;
-       kiocb->ki_left = ret;
-
-       ret = 0;
-out:
-       return ret;
+       return 0;
 }
 
-static ssize_t aio_setup_single_vector(int type, struct file * file, struct kiocb *kiocb)
+static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)
 {
-       int bytes;
-
-       bytes = rw_verify_area(type, file, &kiocb->ki_pos, kiocb->ki_left);
-       if (bytes < 0)
-               return bytes;
+       if (unlikely(!access_ok(!rw, kiocb->ki_buf, kiocb->ki_nbytes)))
+               return -EFAULT;
 
        kiocb->ki_iovec = &kiocb->ki_inline_vec;
        kiocb->ki_iovec->iov_base = kiocb->ki_buf;
-       kiocb->ki_iovec->iov_len = bytes;
+       kiocb->ki_iovec->iov_len = kiocb->ki_nbytes;
        kiocb->ki_nr_segs = 1;
-       kiocb->ki_cur_seg = 0;
        return 0;
 }
 
@@ -1492,96 +992,95 @@ static ssize_t aio_setup_single_vector(int type, struct file * file, struct kioc
  *     Performs the initial checks and aio retry method
  *     setup for the kiocb at the time of io submission.
  */
-static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
+static ssize_t aio_run_iocb(struct kiocb *req, bool compat)
 {
-       struct file *file = kiocb->ki_filp;
-       ssize_t ret = 0;
+       struct file *file = req->ki_filp;
+       ssize_t ret;
+       int rw;
+       fmode_t mode;
+       aio_rw_op *rw_op;
 
-       switch (kiocb->ki_opcode) {
+       switch (req->ki_opcode) {
        case IOCB_CMD_PREAD:
-               ret = -EBADF;
-               if (unlikely(!(file->f_mode & FMODE_READ)))
-                       break;
-               ret = -EFAULT;
-               if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
-                       kiocb->ki_left)))
-                       break;
-               ret = aio_setup_single_vector(READ, file, kiocb);
-               if (ret)
-                       break;
-               ret = -EINVAL;
-               if (file->f_op->aio_read)
-                       kiocb->ki_retry = aio_rw_vect_retry;
-               break;
-       case IOCB_CMD_PWRITE:
-               ret = -EBADF;
-               if (unlikely(!(file->f_mode & FMODE_WRITE)))
-                       break;
-               ret = -EFAULT;
-               if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
-                       kiocb->ki_left)))
-                       break;
-               ret = aio_setup_single_vector(WRITE, file, kiocb);
-               if (ret)
-                       break;
-               ret = -EINVAL;
-               if (file->f_op->aio_write)
-                       kiocb->ki_retry = aio_rw_vect_retry;
-               break;
        case IOCB_CMD_PREADV:
-               ret = -EBADF;
-               if (unlikely(!(file->f_mode & FMODE_READ)))
-                       break;
-               ret = aio_setup_vectored_rw(READ, kiocb, compat);
-               if (ret)
-                       break;
-               ret = -EINVAL;
-               if (file->f_op->aio_read)
-                       kiocb->ki_retry = aio_rw_vect_retry;
-               break;
+               mode    = FMODE_READ;
+               rw      = READ;
+               rw_op   = file->f_op->aio_read;
+               goto rw_common;
+
+       case IOCB_CMD_PWRITE:
        case IOCB_CMD_PWRITEV:
-               ret = -EBADF;
-               if (unlikely(!(file->f_mode & FMODE_WRITE)))
-                       break;
-               ret = aio_setup_vectored_rw(WRITE, kiocb, compat);
+               mode    = FMODE_WRITE;
+               rw      = WRITE;
+               rw_op   = file->f_op->aio_write;
+               goto rw_common;
+rw_common:
+               if (unlikely(!(file->f_mode & mode)))
+                       return -EBADF;
+
+               if (!rw_op)
+                       return -EINVAL;
+
+               ret = (req->ki_opcode == IOCB_CMD_PREADV ||
+                      req->ki_opcode == IOCB_CMD_PWRITEV)
+                       ? aio_setup_vectored_rw(rw, req, compat)
+                       : aio_setup_single_vector(rw, req);
                if (ret)
-                       break;
-               ret = -EINVAL;
-               if (file->f_op->aio_write)
-                       kiocb->ki_retry = aio_rw_vect_retry;
+                       return ret;
+
+               ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes);
+               if (ret < 0)
+                       return ret;
+
+               req->ki_nbytes = ret;
+               req->ki_left = ret;
+
+               ret = aio_rw_vect_retry(req, rw, rw_op);
                break;
+
        case IOCB_CMD_FDSYNC:
-               ret = -EINVAL;
-               if (file->f_op->aio_fsync)
-                       kiocb->ki_retry = aio_fdsync;
+               if (!file->f_op->aio_fsync)
+                       return -EINVAL;
+
+               ret = file->f_op->aio_fsync(req, 1);
                break;
+
        case IOCB_CMD_FSYNC:
-               ret = -EINVAL;
-               if (file->f_op->aio_fsync)
-                       kiocb->ki_retry = aio_fsync;
+               if (!file->f_op->aio_fsync)
+                       return -EINVAL;
+
+               ret = file->f_op->aio_fsync(req, 0);
                break;
+
        default:
-               dprintk("EINVAL: io_submit: no operation provided\n");
-               ret = -EINVAL;
+               pr_debug("EINVAL: no operation provided\n");
+               return -EINVAL;
        }
 
-       if (!kiocb->ki_retry)
-               return ret;
+       if (ret != -EIOCBQUEUED) {
+               /*
+                * There's no easy way to restart the syscall since other AIO's
+                * may be already running. Just fail this IO with EINTR.
+                */
+               if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+                            ret == -ERESTARTNOHAND ||
+                            ret == -ERESTART_RESTARTBLOCK))
+                       ret = -EINTR;
+               aio_complete(req, ret, 0);
+       }
 
        return 0;
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-                        struct iocb *iocb, struct kiocb_batch *batch,
-                        bool compat)
+                        struct iocb *iocb, bool compat)
 {
        struct kiocb *req;
-       struct file *file;
        ssize_t ret;
 
        /* enforce forwards compatibility on users */
        if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
-               pr_debug("EINVAL: io_submit: reserve field set\n");
+               pr_debug("EINVAL: reserve field set\n");
                return -EINVAL;
        }
 
@@ -1595,16 +1094,16 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
                return -EINVAL;
        }
 
-       file = fget(iocb->aio_fildes);
-       if (unlikely(!file))
-               return -EBADF;
-
-       req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
-       if (unlikely(!req)) {
-               fput(file);
+       req = aio_get_req(ctx);
+       if (unlikely(!req))
                return -EAGAIN;
+
+       req->ki_filp = fget(iocb->aio_fildes);
+       if (unlikely(!req->ki_filp)) {
+               ret = -EBADF;
+               goto out_put_req;
        }
-       req->ki_filp = file;
+
        if (iocb->aio_flags & IOCB_FLAG_RESFD) {
                /*
                 * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
@@ -1620,9 +1119,9 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
                }
        }
 
-       ret = put_user(req->ki_key, &user_iocb->aio_key);
+       ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
        if (unlikely(ret)) {
-               dprintk("EFAULT: aio_key\n");
+               pr_debug("EFAULT: aio_key\n");
                goto out_put_req;
        }
 
@@ -1634,41 +1133,14 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
        req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
        req->ki_opcode = iocb->aio_lio_opcode;
 
-       ret = aio_setup_iocb(req, compat);
-
+       ret = aio_run_iocb(req, compat);
        if (ret)
                goto out_put_req;
 
-       spin_lock_irq(&ctx->ctx_lock);
-       /*
-        * We could have raced with io_destroy() and are currently holding a
-        * reference to ctx which should be destroyed. We cannot submit IO
-        * since ctx gets freed as soon as io_submit() puts its reference.  The
-        * check here is reliable: io_destroy() sets ctx->dead before waiting
-        * for outstanding IO and the barrier between these two is realized by
-        * unlock of mm->ioctx_lock and lock of ctx->ctx_lock.  Analogously we
-        * increment ctx->reqs_active before checking for ctx->dead and the
-        * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
-        * don't see ctx->dead set here, io_destroy() waits for our IO to
-        * finish.
-        */
-       if (ctx->dead) {
-               spin_unlock_irq(&ctx->ctx_lock);
-               ret = -EINVAL;
-               goto out_put_req;
-       }
-       aio_run_iocb(req);
-       if (!list_empty(&ctx->run_list)) {
-               /* drain the run list */
-               while (__aio_run_iocbs(ctx))
-                       ;
-       }
-       spin_unlock_irq(&ctx->ctx_lock);
-
        aio_put_req(req);       /* drop extra ref to req */
        return 0;
-
 out_put_req:
+       atomic_dec(&ctx->reqs_active);
        aio_put_req(req);       /* drop extra ref to req */
        aio_put_req(req);       /* drop i/o ref to req */
        return ret;
@@ -1681,7 +1153,6 @@ long do_io_submit(aio_context_t ctx_id, long nr,
        long ret = 0;
        int i = 0;
        struct blk_plug plug;
-       struct kiocb_batch batch;
 
        if (unlikely(nr < 0))
                return -EINVAL;
@@ -1694,12 +1165,10 @@ long do_io_submit(aio_context_t ctx_id, long nr,
 
        ctx = lookup_ioctx(ctx_id);
        if (unlikely(!ctx)) {
-               pr_debug("EINVAL: io_submit: invalid context id\n");
+               pr_debug("EINVAL: invalid context id\n");
                return -EINVAL;
        }
 
-       kiocb_batch_init(&batch, nr);
-
        blk_start_plug(&plug);
 
        /*
@@ -1720,13 +1189,12 @@ long do_io_submit(aio_context_t ctx_id, long nr,
                        break;
                }
 
-               ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
+               ret = io_submit_one(ctx, user_iocb, &tmp, compat);
                if (ret)
                        break;
        }
        blk_finish_plug(&plug);
 
-       kiocb_batch_free(ctx, &batch);
        put_ioctx(ctx);
        return i ? i : ret;
 }
@@ -1759,10 +1227,13 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
 
        assert_spin_locked(&ctx->ctx_lock);
 
+       if (key != KIOCB_KEY)
+               return NULL;
+
        /* TODO: use a hash or array, this sucks. */
        list_for_each(pos, &ctx->active_reqs) {
                struct kiocb *kiocb = list_kiocb(pos);
-               if (kiocb->ki_obj.user == iocb && kiocb->ki_key == key)
+               if (kiocb->ki_obj.user == iocb)
                        return kiocb;
        }
        return NULL;
@@ -1781,7 +1252,7 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
 SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
                struct io_event __user *, result)
 {
-       int (*cancel)(struct kiocb *iocb, struct io_event *res);
+       struct io_event res;
        struct kioctx *ctx;
        struct kiocb *kiocb;
        u32 key;
@@ -1796,32 +1267,22 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
                return -EINVAL;
 
        spin_lock_irq(&ctx->ctx_lock);
-       ret = -EAGAIN;
+
        kiocb = lookup_kiocb(ctx, iocb, key);
-       if (kiocb && kiocb->ki_cancel) {
-               cancel = kiocb->ki_cancel;
-               kiocb->ki_users ++;
-               kiocbSetCancelled(kiocb);
-       } else
-               cancel = NULL;
+       if (kiocb)
+               ret = kiocb_cancel(ctx, kiocb, &res);
+       else
+               ret = -EINVAL;
+
        spin_unlock_irq(&ctx->ctx_lock);
 
-       if (NULL != cancel) {
-               struct io_event tmp;
-               pr_debug("calling cancel\n");
-               memset(&tmp, 0, sizeof(tmp));
-               tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user;
-               tmp.data = kiocb->ki_user_data;
-               ret = cancel(kiocb, &tmp);
-               if (!ret) {
-                       /* Cancellation succeeded -- copy the result
-                        * into the user's buffer.
-                        */
-                       if (copy_to_user(result, &tmp, sizeof(tmp)))
-                               ret = -EFAULT;
-               }
-       } else
-               ret = -EINVAL;
+       if (!ret) {
+               /* Cancellation succeeded -- copy the result
+                * into the user's buffer.
+                */
+               if (copy_to_user(result, &res, sizeof(res)))
+                       ret = -EFAULT;
+       }
 
        put_ioctx(ctx);
 
@@ -1838,8 +1299,7 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
  *     < min_nr if the timeout specified by timeout has elapsed
  *     before sufficient events are available, where timeout == NULL
  *     specifies an infinite timeout. Note that the timeout pointed to by
- *     timeout is relative and will be updated if not NULL and the
- *     operation blocks. Will fail with -ENOSYS if not implemented.
+ *     timeout is relative.  Will fail with -ENOSYS if not implemented.
  */
 SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
                long, min_nr,
@@ -1855,7 +1315,5 @@ SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
                        ret = read_events(ioctx, min_nr, nr, events, timeout);
                put_ioctx(ioctx);
        }
-
-       asmlinkage_protect(5, ret, ctx_id, min_nr, nr, events, timeout);
        return ret;
 }