slub: Fix full list corruption if debugging is on
[linux-2.6.git] / fs / direct-io.c
index 7dde0df..44a360c 100644 (file)
@@ -35,7 +35,7 @@
 #include <linux/buffer_head.h>
 #include <linux/rwsem.h>
 #include <linux/uio.h>
-#include <asm/atomic.h>
+#include <linux/atomic.h>
 
 /*
  * How many user pages to map in one call to get_user_pages().  This determines
@@ -82,6 +82,8 @@ struct dio {
        int reap_counter;               /* rate limit reaping */
        get_block_t *get_block;         /* block mapping function */
        dio_iodone_t *end_io;           /* IO completion function */
+       dio_submit_t *submit_io;        /* IO submition function */
+       loff_t logical_offset_in_bio;   /* current first logical block in bio */
        sector_t final_block_in_bio;    /* current final block in bio + 1 */
        sector_t next_block_for_io;     /* next block to be put under IO,
                                           in dio_blocks units */
@@ -96,6 +98,19 @@ struct dio {
        unsigned cur_page_offset;       /* Offset into it, in bytes */
        unsigned cur_page_len;          /* Nr of bytes at cur_page_offset */
        sector_t cur_page_block;        /* Where it starts */
+       loff_t cur_page_fs_offset;      /* Offset in file */
+
+       /* BIO completion state */
+       spinlock_t bio_lock;            /* protects BIO fields below */
+       unsigned long refcount;         /* direct_io_worker() and bios */
+       struct bio *bio_list;           /* singly linked via bi_private */
+       struct task_struct *waiter;     /* waiting task (NULL if none) */
+
+       /* AIO related stuff */
+       struct kiocb *iocb;             /* kiocb */
+       int is_async;                   /* is IO async ? */
+       int io_error;                   /* IO error in completion path */
+       ssize_t result;                 /* IO result */
 
        /*
         * Page fetching state. These variables belong to dio_refill_pages().
@@ -108,24 +123,62 @@ struct dio {
         * Page queue.  These variables belong to dio_refill_pages() and
         * dio_get_page().
         */
-       struct page *pages[DIO_PAGES];  /* page buffer */
        unsigned head;                  /* next page to process */
        unsigned tail;                  /* last valid page + 1 */
        int page_errors;                /* errno from get_user_pages() */
 
-       /* BIO completion state */
-       spinlock_t bio_lock;            /* protects BIO fields below */
-       unsigned long refcount;         /* direct_io_worker() and bios */
-       struct bio *bio_list;           /* singly linked via bi_private */
-       struct task_struct *waiter;     /* waiting task (NULL if none) */
-
-       /* AIO related stuff */
-       struct kiocb *iocb;             /* kiocb */
-       int is_async;                   /* is IO async ? */
-       int io_error;                   /* IO error in completion path */
-       ssize_t result;                 /* IO result */
+       /*
+        * pages[] (and any fields placed after it) are not zeroed out at
+        * allocation time.  Don't add new fields after pages[] unless you
+        * wish that they not be zeroed.
+        */
+       struct page *pages[DIO_PAGES];  /* page buffer */
 };
 
+static void __inode_dio_wait(struct inode *inode)
+{
+       wait_queue_head_t *wq = bit_waitqueue(&inode->i_state, __I_DIO_WAKEUP);
+       DEFINE_WAIT_BIT(q, &inode->i_state, __I_DIO_WAKEUP);
+
+       do {
+               prepare_to_wait(wq, &q.wait, TASK_UNINTERRUPTIBLE);
+               if (atomic_read(&inode->i_dio_count))
+                       schedule();
+       } while (atomic_read(&inode->i_dio_count));
+       finish_wait(wq, &q.wait);
+}
+
+/**
+ * inode_dio_wait - wait for outstanding DIO requests to finish
+ * @inode: inode to wait for
+ *
+ * Waits for all pending direct I/O requests to finish so that we can
+ * proceed with a truncate or equivalent operation.
+ *
+ * Must be called under a lock that serializes taking new references
+ * to i_dio_count, usually by inode->i_mutex.
+ */
+void inode_dio_wait(struct inode *inode)
+{
+       if (atomic_read(&inode->i_dio_count))
+               __inode_dio_wait(inode);
+}
+EXPORT_SYMBOL_GPL(inode_dio_wait);
+
+/*
+ * inode_dio_done - signal finish of a direct I/O requests
+ * @inode: inode the direct I/O happens on
+ *
+ * This is called once we've finished processing a direct I/O request,
+ * and is used to wake up callers waiting for direct I/O to be quiesced.
+ */
+void inode_dio_done(struct inode *inode)
+{
+       if (atomic_dec_and_test(&inode->i_dio_count))
+               wake_up_bit(&inode->i_state, __I_DIO_WAKEUP);
+}
+EXPORT_SYMBOL_GPL(inode_dio_done);
+
 /*
  * How many pages are in the queue?
  */
@@ -209,7 +262,7 @@ static struct page *dio_get_page(struct dio *dio)
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static int dio_complete(struct dio *dio, loff_t offset, int ret)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
 {
        ssize_t transferred = 0;
 
@@ -230,14 +283,6 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
                        transferred = dio->i_size - offset;
        }
 
-       if (dio->end_io && dio->result)
-               dio->end_io(dio->iocb, offset, transferred,
-                           dio->map_bh.b_private);
-
-       if (dio->flags & DIO_LOCKING)
-               /* lockdep: non-owner release */
-               up_read_non_owner(&dio->inode->i_alloc_sem);
-
        if (ret == 0)
                ret = dio->page_errors;
        if (ret == 0)
@@ -245,6 +290,15 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
        if (ret == 0)
                ret = transferred;
 
+       if (dio->end_io && dio->result) {
+               dio->end_io(dio->iocb, offset, transferred,
+                           dio->map_bh.b_private, ret, is_async);
+       } else {
+               if (is_async)
+                       aio_complete(dio->iocb, ret, 0);
+               inode_dio_done(dio->inode);
+       }
+
        return ret;
 }
 
@@ -268,8 +322,7 @@ static void dio_bio_end_aio(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (remaining == 0) {
-               int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-               aio_complete(dio->iocb, ret, 0);
+               dio_complete(dio, dio->iocb->ki_pos, 0, true);
                kfree(dio);
        }
 }
@@ -294,12 +347,36 @@ static void dio_bio_end_io(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 }
 
-static int
+/**
+ * dio_end_io - handle the end io action for the given bio
+ * @bio: The direct io bio thats being completed
+ * @error: Error if there was one
+ *
+ * This is meant to be called by any filesystem that uses their own dio_submit_t
+ * so that the DIO specific endio actions are dealt with after the filesystem
+ * has done it's completion work.
+ */
+void dio_end_io(struct bio *bio, int error)
+{
+       struct dio *dio = bio->bi_private;
+
+       if (dio->is_async)
+               dio_bio_end_aio(bio, error);
+       else
+               dio_bio_end_io(bio, error);
+}
+EXPORT_SYMBOL_GPL(dio_end_io);
+
+static void
 dio_bio_alloc(struct dio *dio, struct block_device *bdev,
                sector_t first_sector, int nr_vecs)
 {
        struct bio *bio;
 
+       /*
+        * bio_alloc() is guaranteed to return a bio when called with
+        * __GFP_WAIT and we request a valid number of vectors.
+        */
        bio = bio_alloc(GFP_KERNEL, nr_vecs);
 
        bio->bi_bdev = bdev;
@@ -310,7 +387,7 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
                bio->bi_end_io = dio_bio_end_io;
 
        dio->bio = bio;
-       return 0;
+       dio->logical_offset_in_bio = dio->cur_page_fs_offset;
 }
 
 /*
@@ -334,10 +411,15 @@ static void dio_bio_submit(struct dio *dio)
        if (dio->is_async && dio->rw == READ)
                bio_set_pages_dirty(bio);
 
-       submit_bio(dio->rw, bio);
+       if (dio->submit_io)
+               dio->submit_io(dio->rw, bio, dio->inode,
+                              dio->logical_offset_in_bio);
+       else
+               submit_bio(dio->rw, bio);
 
        dio->bio = NULL;
        dio->boundary = 0;
+       dio->logical_offset_in_bio = 0;
 }
 
 /*
@@ -546,8 +628,9 @@ static int dio_new_bio(struct dio *dio, sector_t start_sector)
                goto out;
        sector = start_sector << (dio->blkbits - 9);
        nr_pages = min(dio->pages_in_io, bio_get_nr_vecs(dio->map_bh.b_bdev));
+       nr_pages = min(nr_pages, BIO_MAX_PAGES);
        BUG_ON(nr_pages <= 0);
-       ret = dio_bio_alloc(dio, dio->map_bh.b_bdev, sector, nr_pages);
+       dio_bio_alloc(dio, dio->map_bh.b_bdev, sector, nr_pages);
        dio->boundary = 0;
 out:
        return ret;
@@ -597,16 +680,32 @@ static int dio_send_cur_page(struct dio *dio)
        int ret = 0;
 
        if (dio->bio) {
+               loff_t cur_offset = dio->cur_page_fs_offset;
+               loff_t bio_next_offset = dio->logical_offset_in_bio +
+                       dio->bio->bi_size;
+
                /*
-                * See whether this new request is contiguous with the old
+                * See whether this new request is contiguous with the old.
+                *
+                * Btrfs cannot handle having logically non-contiguous requests
+                * submitted.  For example if you have
+                *
+                * Logical:  [0-4095][HOLE][8192-12287]
+                * Physical: [0-4095]      [4096-8191]
+                *
+                * We cannot submit those pages together as one BIO.  So if our
+                * current logical offset in the file does not equal what would
+                * be the next logical offset in the bio, submit the bio we
+                * have.
                 */
-               if (dio->final_block_in_bio != dio->cur_page_block)
+               if (dio->final_block_in_bio != dio->cur_page_block ||
+                   cur_offset != bio_next_offset)
                        dio_bio_submit(dio);
                /*
                 * Submit now if the underlying fs is about to perform a
                 * metadata read
                 */
-               if (dio->boundary)
+               else if (dio->boundary)
                        dio_bio_submit(dio);
        }
 
@@ -695,6 +794,7 @@ submit_page_section(struct dio *dio, struct page *page,
        dio->cur_page_offset = offset;
        dio->cur_page_len = len;
        dio->cur_page_block = blocknr;
+       dio->cur_page_fs_offset = dio->block_in_file << dio->blkbits;
 out:
        return ret;
 }
@@ -922,14 +1022,11 @@ out:
        return ret;
 }
 
-/*
- * Releases both i_mutex and i_alloc_sem
- */
 static ssize_t
 direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, 
        const struct iovec *iov, loff_t offset, unsigned long nr_segs, 
        unsigned blkbits, get_block_t get_block, dio_iodone_t end_io,
-       struct dio *dio)
+       dio_submit_t submit_io, struct dio *dio)
 {
        unsigned long user_addr; 
        unsigned long flags;
@@ -946,6 +1043,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 
        dio->get_block = get_block;
        dio->end_io = end_io;
+       dio->submit_io = submit_io;
        dio->final_block_in_bio = -1;
        dio->next_block_for_io = -1;
 
@@ -1002,7 +1100,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
                }
        } /* end iovec loop */
 
-       if (ret == -ENOTBLK && (rw & WRITE)) {
+       if (ret == -ENOTBLK) {
                /*
                 * The remaining part of the request will be
                 * be handled by buffered I/O when we return
@@ -1051,11 +1149,8 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
            ((rw & READ) || (dio->result == dio->size)))
                ret = -EIOCBQUEUED;
 
-       if (ret != -EIOCBQUEUED) {
-               /* All IO is now issued, send it on its way */
-               blk_run_address_space(inode->i_mapping);
+       if (ret != -EIOCBQUEUED)
                dio_await_completion(dio);
-       }
 
        /*
         * Sync will always be dropping the final ref and completing the
@@ -1073,7 +1168,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (ret2 == 0) {
-               ret = dio_complete(dio, offset, ret);
+               ret = dio_complete(dio, offset, ret, false);
                kfree(dio);
        } else
                BUG_ON(ret != -EIOCBQUEUED);
@@ -1090,21 +1185,22 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
  *    For writes this function is called under i_mutex and returns with
  *    i_mutex held, for reads, i_mutex is not held on entry, but it is
  *    taken and dropped again before returning.
- *    For reads and writes i_alloc_sem is taken in shared mode and released
- *    on I/O completion (which may happen asynchronously after returning to
- *    the caller).
- *
  *  - if the flags value does NOT contain DIO_LOCKING we don't use any
  *    internal locking but rather rely on the filesystem to synchronize
  *    direct I/O reads/writes versus each other and truncate.
- *    For reads and writes both i_mutex and i_alloc_sem are not held on
- *    entry and are never taken.
+ *
+ * To help with locking against truncate we incremented the i_dio_count
+ * counter before starting direct I/O, and decrement it once we are done.
+ * Truncate can wait for it to reach zero to provide exclusion.  It is
+ * expected that filesystem provide exclusion between new direct I/O
+ * and truncates.  For DIO_LOCKING filesystems this is done by i_mutex,
+ * but other filesystems need to take care of this on their own.
  */
 ssize_t
 __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
        struct block_device *bdev, const struct iovec *iov, loff_t offset, 
        unsigned long nr_segs, get_block_t get_block, dio_iodone_t end_io,
-       int flags)
+       dio_submit_t submit_io, int flags)
 {
        int seg;
        size_t size;
@@ -1117,7 +1213,7 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
        struct dio *dio;
 
        if (rw & WRITE)
-               rw = WRITE_ODIRECT_PLUG;
+               rw = WRITE_ODIRECT;
 
        if (bdev)
                bdev_blkbits = blksize_bits(bdev_logical_block_size(bdev));
@@ -1144,15 +1240,24 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
                }
        }
 
-       dio = kzalloc(sizeof(*dio), GFP_KERNEL);
+       /* watch out for a 0 len io from a tricksy fs */
+       if (rw == READ && end == offset)
+               return 0;
+
+       dio = kmalloc(sizeof(*dio), GFP_KERNEL);
        retval = -ENOMEM;
        if (!dio)
                goto out;
+       /*
+        * Believe it or not, zeroing out the page array caused a .5%
+        * performance regression in a database benchmark.  So, we take
+        * care to only zero out what's needed.
+        */
+       memset(dio, 0, offsetof(struct dio, pages));
 
        dio->flags = flags;
        if (dio->flags & DIO_LOCKING) {
-               /* watch out for a 0 len io from a tricksy fs */
-               if (rw == READ && end > offset) {
+               if (rw == READ) {
                        struct address_space *mapping =
                                        iocb->ki_filp->f_mapping;
 
@@ -1167,15 +1272,14 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
                                goto out;
                        }
                }
-
-               /*
-                * Will be released at I/O completion, possibly in a
-                * different thread.
-                */
-               down_read_non_owner(&inode->i_alloc_sem);
        }
 
        /*
+        * Will be decremented at I/O completion time.
+        */
+       atomic_inc(&inode->i_dio_count);
+
+       /*
         * For file extending writes updating i_size before data
         * writeouts complete can expose uninitialized blocks. So
         * even for AIO, we need to wait for i/o to complete before
@@ -1185,22 +1289,8 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
                (end > i_size_read(inode)));
 
        retval = direct_io_worker(rw, iocb, inode, iov, offset,
-                               nr_segs, blkbits, get_block, end_io, dio);
-
-       /*
-        * In case of error extending write may have instantiated a few
-        * blocks outside i_size. Trim these off again for DIO_LOCKING.
-        *
-        * NOTE: filesystems with their own locking have to handle this
-        * on their own.
-        */
-       if (dio->flags & DIO_LOCKING) {
-               if (unlikely((rw & WRITE) && retval < 0)) {
-                       loff_t isize = i_size_read(inode);
-                       if (end > isize )
-                               vmtruncate(inode, isize);
-               }
-       }
+                               nr_segs, blkbits, get_block, end_io,
+                               submit_io, dio);
 
 out:
        return retval;