non-ext4 portions of "direct-io: Implement generic deferred AIO completions"
Theodore Ts'o [Thu, 22 Jan 2015 20:20:55 +0000 (15:20 -0500)]
Originally from 7b7a8665edd8db73

Signed-off-by: Theodore Ts'o <tytso@mit.edu>

fs/direct-io.c
fs/ocfs2/aops.c
fs/super.c
fs/xfs/xfs_aops.c
fs/xfs/xfs_aops.h
include/linux/buffer_head.h
include/linux/fs.h

index 7ab90f5..8b31b9f 100644 (file)
@@ -127,6 +127,7 @@ struct dio {
        spinlock_t bio_lock;            /* protects BIO fields below */
        int page_errors;                /* errno from get_user_pages() */
        int is_async;                   /* is IO async ? */
+       bool defer_completion;          /* defer AIO completion to workqueue? */
        int io_error;                   /* IO error in completion path */
        unsigned long refcount;         /* direct_io_worker() and bios */
        struct bio *bio_list;           /* singly linked via bi_private */
@@ -141,7 +142,10 @@ struct dio {
         * allocation time.  Don't add new fields after pages[] unless you
         * wish that they not be zeroed.
         */
-       struct page *pages[DIO_PAGES];  /* page buffer */
+       union {
+               struct page *pages[DIO_PAGES];  /* page buffer */
+               struct work_struct complete_work;/* deferred AIO completion */
+       };
 } ____cacheline_aligned_in_smp;
 
 static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
  * dio_complete() - called when all DIO BIO I/O has been completed
  * @offset: the byte offset in the file of the completed operation
  *
- * This releases locks as dictated by the locking type, lets interested parties
- * know that a DIO operation has completed, and calculates the resulting return
- * code for the operation.
+ * This drops i_dio_count, lets interested parties know that a DIO operation
+ * has completed, and calculates the resulting return code for the operation.
  *
  * It lets the filesystem know if it registered an interest earlier via
  * get_block.  Pass the private field of the map buffer_head so that
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
+               bool is_async)
 {
        ssize_t transferred = 0;
 
@@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
        if (ret == 0)
                ret = transferred;
 
-       if (dio->end_io && dio->result) {
-               dio->end_io(dio->iocb, offset, transferred,
-                           dio->private, ret, is_async);
-       } else {
-               inode_dio_done(dio->inode);
-               if (is_async)
-                       aio_complete(dio->iocb, ret, 0);
-       }
+       if (dio->end_io && dio->result)
+               dio->end_io(dio->iocb, offset, transferred, dio->private);
+
+       inode_dio_done(dio->inode);
+       if (is_async)
+               aio_complete(dio->iocb, ret, 0);
 
+       kmem_cache_free(dio_cache, dio);
        return ret;
 }
 
+static void dio_aio_complete_work(struct work_struct *work)
+{
+       struct dio *dio = container_of(work, struct dio, complete_work);
+
+       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+}
+
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
+
 /*
  * Asynchronous IO callback. 
  */
@@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (remaining == 0) {
-               dio_complete(dio, dio->iocb->ki_pos, 0, true);
-               kmem_cache_free(dio_cache, dio);
+               if (dio->result && dio->defer_completion) {
+                       INIT_WORK(&dio->complete_work, dio_aio_complete_work);
+                       queue_work(dio->inode->i_sb->s_dio_done_wq,
+                                  &dio->complete_work);
+               } else {
+                       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+               }
        }
 }
 
@@ -511,6 +527,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
 }
 
 /*
+ * Create workqueue for deferred direct IO completions. We allocate the
+ * workqueue when it's first needed. This avoids creating workqueue for
+ * filesystems that don't need it and also allows us to create the workqueue
+ * late enough so the we can include s_id in the name of the workqueue.
+ */
+static int sb_init_dio_done_wq(struct super_block *sb)
+{
+       struct workqueue_struct *wq = alloc_workqueue("dio/%s",
+                                                     WQ_MEM_RECLAIM, 0,
+                                                     sb->s_id);
+       if (!wq)
+               return -ENOMEM;
+       /*
+        * This has to be atomic as more DIOs can race to create the workqueue
+        */
+       cmpxchg(&sb->s_dio_done_wq, NULL, wq);
+       /* Someone created workqueue before us? Free ours... */
+       if (wq != sb->s_dio_done_wq)
+               destroy_workqueue(wq);
+       return 0;
+}
+
+static int dio_set_defer_completion(struct dio *dio)
+{
+       struct super_block *sb = dio->inode->i_sb;
+
+       if (dio->defer_completion)
+               return 0;
+       dio->defer_completion = true;
+       if (!sb->s_dio_done_wq)
+               return sb_init_dio_done_wq(sb);
+       return 0;
+}
+
+/*
  * Call into the fs to map some more disk blocks.  We record the current number
  * of available blocks at sdio->blocks_available.  These are in units of the
  * fs blocksize, (1 << inode->i_blkbits).
@@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,
 
                /* Store for completion */
                dio->private = map_bh->b_private;
+
+               if (ret == 0 && buffer_defer_completion(map_bh))
+                       ret = dio_set_defer_completion(dio);
        }
        return ret;
 }
@@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 
        if (drop_refcount(dio) == 0) {
                retval = dio_complete(dio, offset, retval, false);
-               kmem_cache_free(dio_cache, dio);
        } else
                BUG_ON(retval != -EIOCBQUEUED);
 
index f998c60..c92061e 100644 (file)
@@ -565,9 +565,7 @@ bail:
 static void ocfs2_dio_end_io(struct kiocb *iocb,
                             loff_t offset,
                             ssize_t bytes,
-                            void *private,
-                            int ret,
-                            bool is_async)
+                            void *private)
 {
        struct inode *inode = file_inode(iocb->ki_filp);
        int level;
@@ -592,10 +590,6 @@ static void ocfs2_dio_end_io(struct kiocb *iocb,
 
        level = ocfs2_iocb_rw_locked_level(iocb);
        ocfs2_rw_unlock(inode, level);
-
-       inode_dio_done(inode);
-       if (is_async)
-               aio_complete(iocb, ret, 0);
 }
 
 /*
index 97280e7..eb03b48 100644 (file)
@@ -154,15 +154,9 @@ static struct super_block *alloc_super(struct file_system_type *type, int flags)
        static const struct super_operations default_op;
 
        if (s) {
-               if (security_sb_alloc(s)) {
-                       /*
-                        * We cannot call security_sb_free() without
-                        * security_sb_alloc() succeeding. So bail out manually
-                        */
-                       kfree(s);
-                       s = NULL;
-                       goto out;
-               }
+               if (security_sb_alloc(s))
+                       goto out_free_sb;
+
                if (init_sb_writers(s, type))
                        goto err_out;
                s->s_flags = flags;
@@ -213,6 +207,7 @@ out:
 err_out:
        security_sb_free(s);
        destroy_sb_writers(s);
+out_free_sb:
        kfree(s);
        s = NULL;
        goto out;
@@ -396,6 +391,11 @@ void generic_shutdown_super(struct super_block *sb)
 
                evict_inodes(sb);
 
+               if (sb->s_dio_done_wq) {
+                       destroy_workqueue(sb->s_dio_done_wq);
+                       sb->s_dio_done_wq = NULL;
+               }
+
                if (sop->put_super)
                        sop->put_super(sb);
 
index cfbb4c1..f572325 100644 (file)
@@ -86,14 +86,6 @@ xfs_destroy_ioend(
                bh->b_end_io(bh, !ioend->io_error);
        }
 
-       if (ioend->io_iocb) {
-               inode_dio_done(ioend->io_inode);
-               if (ioend->io_isasync) {
-                       aio_complete(ioend->io_iocb, ioend->io_error ?
-                                       ioend->io_error : ioend->io_result, 0);
-               }
-       }
-
        mempool_free(ioend, xfs_ioend_pool);
 }
 
@@ -281,7 +273,6 @@ xfs_alloc_ioend(
         * all the I/O from calling the completion routine too early.
         */
        atomic_set(&ioend->io_remaining, 1);
-       ioend->io_isasync = 0;
        ioend->io_isdirect = 0;
        ioend->io_error = 0;
        ioend->io_list = NULL;
@@ -291,8 +282,6 @@ xfs_alloc_ioend(
        ioend->io_buffer_tail = NULL;
        ioend->io_offset = 0;
        ioend->io_size = 0;
-       ioend->io_iocb = NULL;
-       ioend->io_result = 0;
        ioend->io_append_trans = NULL;
 
        INIT_WORK(&ioend->io_work, xfs_end_io);
@@ -1290,8 +1279,10 @@ __xfs_get_blocks(
                if (create || !ISUNWRITTEN(&imap))
                        xfs_map_buffer(inode, bh_result, &imap, offset);
                if (create && ISUNWRITTEN(&imap)) {
-                       if (direct)
+                       if (direct) {
                                bh_result->b_private = inode;
+                               set_buffer_defer_completion(bh_result);
+                       }
                        set_buffer_unwritten(bh_result);
                }
        }
@@ -1388,9 +1379,7 @@ xfs_end_io_direct_write(
        struct kiocb            *iocb,
        loff_t                  offset,
        ssize_t                 size,
-       void                    *private,
-       int                     ret,
-       bool                    is_async)
+       void                    *private)
 {
        struct xfs_ioend        *ioend = iocb->private;
 
@@ -1412,17 +1401,10 @@ xfs_end_io_direct_write(
 
        ioend->io_offset = offset;
        ioend->io_size = size;
-       ioend->io_iocb = iocb;
-       ioend->io_result = ret;
        if (private && size > 0)
                ioend->io_type = XFS_IO_UNWRITTEN;
 
-       if (is_async) {
-               ioend->io_isasync = 1;
-               xfs_finish_ioend(ioend);
-       } else {
-               xfs_finish_ioend_sync(ioend);
-       }
+       xfs_finish_ioend_sync(ioend);
 }
 
 STATIC ssize_t
index c325abb..f94dd45 100644 (file)
@@ -45,7 +45,6 @@ typedef struct xfs_ioend {
        unsigned int            io_type;        /* delalloc / unwritten */
        int                     io_error;       /* I/O error code */
        atomic_t                io_remaining;   /* hold count */
-       unsigned int            io_isasync : 1; /* needs aio_complete */
        unsigned int            io_isdirect : 1;/* direct I/O */
        struct inode            *io_inode;      /* file being written to */
        struct buffer_head      *io_buffer_head;/* buffer linked list head */
@@ -54,8 +53,6 @@ typedef struct xfs_ioend {
        xfs_off_t               io_offset;      /* offset in the file */
        struct work_struct      io_work;        /* xfsdatad work queue */
        struct xfs_trans        *io_append_trans;/* xact. for size update */
-       struct kiocb            *io_iocb;
-       int                     io_result;
 } xfs_ioend_t;
 
 extern const struct address_space_operations xfs_address_space_operations;
index d6a8a8d..7200ca4 100644 (file)
@@ -36,6 +36,7 @@ enum bh_state_bits {
        BH_Quiet,       /* Buffer Error Prinks to be quiet */
        BH_Meta,        /* Buffer contains metadata */
        BH_Prio,        /* Buffer should be submitted with REQ_PRIO */
+       BH_Defer_Completion, /* Defer AIO completion to workqueue */
 
        BH_PrivateStart,/* not a state bit, but the first bit available
                         * for private allocation by other entities
@@ -128,6 +129,7 @@ BUFFER_FNS(Write_EIO, write_io_error)
 BUFFER_FNS(Unwritten, unwritten)
 BUFFER_FNS(Meta, meta)
 BUFFER_FNS(Prio, prio)
+BUFFER_FNS(Defer_Completion, defer_completion)
 
 #define bh_offset(bh)          ((unsigned long)(bh)->b_data & ~PAGE_MASK)
 
index 184f485..3d43ffe 100644 (file)
@@ -46,6 +46,7 @@ struct vfsmount;
 struct cred;
 struct swap_info_struct;
 struct seq_file;
+struct workqueue_struct;
 
 extern void __init inode_init(void);
 extern void __init inode_init_early(void);
@@ -63,8 +64,7 @@ struct buffer_head;
 typedef int (get_block_t)(struct inode *inode, sector_t iblock,
                        struct buffer_head *bh_result, int create);
 typedef void (dio_iodone_t)(struct kiocb *iocb, loff_t offset,
-                       ssize_t bytes, void *private, int ret,
-                       bool is_async);
+                       ssize_t bytes, void *private);
 
 #define MAY_EXEC               0x00000001
 #define MAY_WRITE              0x00000002
@@ -1316,6 +1316,9 @@ struct super_block {
 
        /* Being remounted read-only */
        int s_readonly_remount;
+
+       /* AIO completions deferred from interrupt context */
+       struct workqueue_struct *s_dio_done_wq;
 };
 
 /* superblock cache pruning functions */