ceph: do not use i_wrbuffer_ref as refcount for Fb cap
[linux-2.6.git] / fs / ceph / addr.c
index 92f4821..38b8ab5 100644 (file)
@@ -1,15 +1,17 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/backing-dev.h>
 #include <linux/fs.h>
 #include <linux/mm.h>
 #include <linux/pagemap.h>
 #include <linux/writeback.h>   /* generic_writepages */
+#include <linux/slab.h>
 #include <linux/pagevec.h>
 #include <linux/task_io_accounting_ops.h>
 
 #include "super.h"
-#include "osd_client.h"
+#include "mds_client.h"
+#include <linux/ceph/osd_client.h>
 
 /*
  * Ceph address space ops.
@@ -22,7 +24,7 @@
  * context needs to be associated with the osd write during writeback.
  *
  * Similarly, struct ceph_inode_info maintains a set of counters to
- * count dirty pages on the inode.  In the absense of snapshots,
+ * count dirty pages on the inode.  In the absence of snapshots,
  * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
  *
  * When a snapshot is taken (that is, when the client receives
@@ -86,11 +88,11 @@ static int ceph_set_page_dirty(struct page *page)
 
        /* dirty the head */
        spin_lock(&inode->i_lock);
-       if (ci->i_wrbuffer_ref_head == 0)
+       if (ci->i_head_snapc == NULL)
                ci->i_head_snapc = ceph_get_snap_context(snapc);
        ++ci->i_wrbuffer_ref_head;
        if (ci->i_wrbuffer_ref == 0)
-               igrab(inode);
+               ihold(inode);
        ++ci->i_wrbuffer_ref;
        dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
             "snapc %p seq %lld (%d snaps)\n",
@@ -104,13 +106,7 @@ static int ceph_set_page_dirty(struct page *page)
        spin_lock_irq(&mapping->tree_lock);
        if (page->mapping) {    /* Race with truncate? */
                WARN_ON_ONCE(!PageUptodate(page));
-
-               if (mapping_cap_account_dirty(mapping)) {
-                       __inc_zone_page_state(page, NR_FILE_DIRTY);
-                       __inc_bdi_stat(mapping->backing_dev_info,
-                                       BDI_RECLAIMABLE);
-                       task_io_account_write(PAGE_CACHE_SIZE);
-               }
+               account_page_dirtied(page, page->mapping);
                radix_tree_tag_set(&mapping->page_tree,
                                page_index(page), PAGECACHE_TAG_DIRTY);
 
@@ -144,7 +140,7 @@ static int ceph_set_page_dirty(struct page *page)
  */
 static void ceph_invalidatepage(struct page *page, unsigned long offset)
 {
-       struct inode *inode = page->mapping->host;
+       struct inode *inode;
        struct ceph_inode_info *ci;
        struct ceph_snap_context *snapc = (void *)page->private;
 
@@ -153,6 +149,8 @@ static void ceph_invalidatepage(struct page *page, unsigned long offset)
        BUG_ON(!PagePrivate(page));
        BUG_ON(!page->mapping);
 
+       inode = page->mapping->host;
+
        /*
         * We can get non-dirty pages here due to races between
         * set_page_dirty and truncate_complete_page; just spit out a
@@ -196,7 +194,8 @@ static int readpage_nounlock(struct file *filp, struct page *page)
 {
        struct inode *inode = filp->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
+       struct ceph_osd_client *osdc = 
+               &ceph_inode_to_client(inode)->client->osdc;
        int err = 0;
        u64 len = PAGE_CACHE_SIZE;
 
@@ -205,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
                                  page->index << PAGE_CACHE_SHIFT, &len,
                                  ci->i_truncate_seq, ci->i_truncate_size,
-                                 &page, 1);
+                                 &page, 1, 0);
        if (err == -ENOENT)
                err = 0;
        if (err < 0) {
@@ -268,10 +267,10 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 {
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
+       struct ceph_osd_client *osdc =
+               &ceph_inode_to_client(inode)->client->osdc;
        int rc = 0;
        struct page **pages;
-       struct pagevec pvec;
        loff_t offset;
        u64 len;
 
@@ -288,14 +287,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
                                 offset, &len,
                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                pages, nr_pages);
+                                pages, nr_pages, 0);
        if (rc == -ENOENT)
                rc = 0;
        if (rc < 0)
                goto out;
 
-       /* set uptodate and add to lru in pagevec-sized chunks */
-       pagevec_init(&pvec, 0);
        for (; !list_empty(page_list) && len > 0;
             rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
                struct page *page =
@@ -309,7 +306,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
                        zero_user_segment(page, s, PAGE_CACHE_SIZE);
                }
 
-               if (add_to_page_cache(page, mapping, page->index, GFP_NOFS)) {
+               if (add_to_page_cache_lru(page, mapping, page->index,
+                                         GFP_NOFS)) {
                        page_cache_release(page);
                        dout("readpages %p add_to_page_cache failed %p\n",
                             inode, page);
@@ -320,10 +318,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
                flush_dcache_page(page);
                SetPageUptodate(page);
                unlock_page(page);
-               if (pagevec_add(&pvec, page) == 0)
-                       pagevec_lru_add_file(&pvec);   /* add to lru */
+               page_cache_release(page);
        }
-       pagevec_lru_add_file(&pvec);
        rc = 0;
 
 out:
@@ -334,16 +330,15 @@ out:
 /*
  * Get ref for the oldest snapc for an inode with dirty data... that is, the
  * only snap context we are allowed to write back.
- *
- * Caller holds i_lock.
  */
-static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
-                                                     u64 *snap_size)
+static struct ceph_snap_context *get_oldest_context(struct inode *inode,
+                                                   u64 *snap_size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_snap_context *snapc = NULL;
        struct ceph_cap_snap *capsnap = NULL;
 
+       spin_lock(&inode->i_lock);
        list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
                dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
                     capsnap->context, capsnap->dirty_pages);
@@ -354,21 +349,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
                        break;
                }
        }
-       if (!snapc && ci->i_snap_realm) {
-               snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
+       if (!snapc && ci->i_wrbuffer_ref_head) {
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
                dout(" head snapc %p has %d dirty pages\n",
                     snapc, ci->i_wrbuffer_ref_head);
        }
-       return snapc;
-}
-
-static struct ceph_snap_context *get_oldest_context(struct inode *inode,
-                                                   u64 *snap_size)
-{
-       struct ceph_snap_context *snapc = NULL;
-
-       spin_lock(&inode->i_lock);
-       snapc = __get_oldest_context(inode, snap_size);
        spin_unlock(&inode->i_lock);
        return snapc;
 }
@@ -383,13 +368,13 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 {
        struct inode *inode;
        struct ceph_inode_info *ci;
-       struct ceph_client *client;
+       struct ceph_fs_client *fsc;
        struct ceph_osd_client *osdc;
        loff_t page_off = page->index << PAGE_CACHE_SHIFT;
        int len = PAGE_CACHE_SIZE;
        loff_t i_size;
        int err = 0;
-       struct ceph_snap_context *snapc;
+       struct ceph_snap_context *snapc, *oldest;
        u64 snap_size = 0;
        long writeback_stat;
 
@@ -401,8 +386,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        }
        inode = page->mapping->host;
        ci = ceph_inode(inode);
-       client = ceph_inode_to_client(inode);
-       osdc = &client->osdc;
+       fsc = ceph_inode_to_client(inode);
+       osdc = &fsc->client->osdc;
 
        /* verify this is a writeable snap context */
        snapc = (void *)page->private;
@@ -410,13 +395,16 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                dout("writepage %p page %p not dirty?\n", inode, page);
                goto out;
        }
-       if (snapc != get_oldest_context(inode, &snap_size)) {
+       oldest = get_oldest_context(inode, &snap_size);
+       if (snapc->seq > oldest->seq) {
                dout("writepage %p page %p snapc %p not writeable - noop\n",
                     inode, page, (void *)page->private);
                /* we should only noop if called by kswapd */
                WARN_ON((current->flags & PF_MEMALLOC) == 0);
+               ceph_put_snap_context(oldest);
                goto out;
        }
+       ceph_put_snap_context(oldest);
 
        /* is this a partial page at end of file? */
        if (snap_size)
@@ -426,13 +414,13 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        if (i_size < page_off + len)
                len = i_size - page_off;
 
-       dout("writepage %p page %p index %lu on %llu~%u\n",
-            inode, page, page->index, page_off, len);
+       dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
+            inode, page, page->index, page_off, len, snapc);
 
-       writeback_stat = atomic_long_inc_return(&client->writeback_count);
+       writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
        if (writeback_stat >
-           CONGESTION_ON_THRESH(client->mount_args->congestion_kb))
-               set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+           CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
+               set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 
        set_page_writeback(page);
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
@@ -455,7 +443,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        ClearPagePrivate(page);
        end_page_writeback(page);
        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-       ceph_put_snap_context(snapc);
+       ceph_put_snap_context(snapc);  /* page's reference */
 out:
        return err;
 }
@@ -509,11 +497,11 @@ static void writepages_finish(struct ceph_osd_request *req,
        int i;
        struct ceph_snap_context *snapc = req->r_snapc;
        struct address_space *mapping = inode->i_mapping;
-       struct writeback_control *wbc = req->r_wbc;
        __s32 rc = -EIO;
        u64 bytes = 0;
-       struct ceph_client *client = ceph_inode_to_client(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        long writeback_stat;
+       unsigned issued = ceph_caps_issued(ci);
 
        /* parse reply */
        replyhead = msg->front.iov_base;
@@ -544,21 +532,27 @@ static void writepages_finish(struct ceph_osd_request *req,
                WARN_ON(!PageUptodate(page));
 
                writeback_stat =
-                       atomic_long_dec_return(&client->writeback_count);
+                       atomic_long_dec_return(&fsc->writeback_count);
                if (writeback_stat <
-                   CONGESTION_OFF_THRESH(client->mount_args->congestion_kb))
-                       clear_bdi_congested(&client->backing_dev_info,
+                   CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
+                       clear_bdi_congested(&fsc->backing_dev_info,
                                            BLK_RW_ASYNC);
 
-               if (i >= wrote) {
-                       dout("inode %p skipping page %p\n", inode, page);
-                       wbc->pages_skipped++;
-               }
+               ceph_put_snap_context((void *)page->private);
                page->private = 0;
                ClearPagePrivate(page);
-               ceph_put_snap_context(snapc);
                dout("unlocking %d %p\n", i, page);
                end_page_writeback(page);
+
+               /*
+                * We lost the cache cap, need to truncate the page before
+                * it is unlocked, otherwise we'd truncate it later in the
+                * page truncation thread, possibly losing some data that
+                * raced its way in
+                */
+               if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
+                       generic_error_remove_page(inode->i_mapping, page);
+
                unlock_page(page);
        }
        dout("%p wrote+cleaned %d pages\n", inode, wrote);
@@ -567,7 +561,7 @@ static void writepages_finish(struct ceph_osd_request *req,
        ceph_release_pages(req->r_pages, req->r_num_pages);
        if (req->r_pages_from_pool)
                mempool_free(req->r_pages,
-                            ceph_client(inode->i_sb)->wb_pagevec_pool);
+                            ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
        else
                kfree(req->r_pages);
        ceph_osdc_put_request(req);
@@ -578,13 +572,13 @@ static void writepages_finish(struct ceph_osd_request *req,
  * mempool.  we avoid the mempool if we can because req->r_num_pages
  * may be less than the maximum write size.
  */
-static void alloc_page_vec(struct ceph_client *client,
+static void alloc_page_vec(struct ceph_fs_client *fsc,
                           struct ceph_osd_request *req)
 {
        req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
                               GFP_NOFS);
        if (!req->r_pages) {
-               req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
+               req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS);
                req->r_pages_from_pool = 1;
                WARN_ON(!req->r_pages);
        }
@@ -597,14 +591,13 @@ static int ceph_writepages_start(struct address_space *mapping,
                                 struct writeback_control *wbc)
 {
        struct inode *inode = mapping->host;
-       struct backing_dev_info *bdi = mapping->backing_dev_info;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_client *client;
+       struct ceph_fs_client *fsc;
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
        pgoff_t max_pages = 0, max_pages_ever = 0;
-       struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
+       struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
        struct pagevec pvec;
        int done = 0;
        int rc = 0;
@@ -626,26 +619,19 @@ static int ceph_writepages_start(struct address_space *mapping,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-       client = ceph_inode_to_client(inode);
-       if (client->mount_state == CEPH_MOUNT_SHUTDOWN) {
+       fsc = ceph_inode_to_client(inode);
+       if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
                pr_warning("writepage_start %p on forced umount\n", inode);
                return -EIO; /* we're in a forced umount, don't write! */
        }
-       if (client->mount_args->wsize && client->mount_args->wsize < wsize)
-               wsize = client->mount_args->wsize;
+       if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
+               wsize = fsc->mount_options->wsize;
        if (wsize < PAGE_CACHE_SIZE)
                wsize = PAGE_CACHE_SIZE;
        max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
 
        pagevec_init(&pvec, 0);
 
-       /* ?? */
-       if (wbc->nonblocking && bdi_write_congested(bdi)) {
-               dout(" writepages congested\n");
-               wbc->encountered_congestion = 1;
-               goto out_final;
-       }
-
        /* where to start/end? */
        if (wbc->range_cyclic) {
                start = mapping->writeback_index; /* Start from prev offset */
@@ -756,9 +742,10 @@ get_more_pages:
                        }
 
                        /* only if matching snap context */
-                       if (snapc != (void *)page->private) {
-                               dout("page snapc %p != oldest %p\n",
-                                    (void *)page->private, snapc);
+                       pgsnapc = (void *)page->private;
+                       if (pgsnapc->seq > snapc->seq) {
+                               dout("page snapc %p %lld > oldest %p %lld\n",
+                                    pgsnapc, pgsnapc->seq, snapc, snapc->seq);
                                unlock_page(page);
                                if (!locked_pages)
                                        continue; /* keep looking for snap */
@@ -774,9 +761,10 @@ get_more_pages:
                        /* ok */
                        if (locked_pages == 0) {
                                /* prepare async write request */
-                               offset = page->index << PAGE_CACHE_SHIFT;
+                               offset = (unsigned long long)page->index
+                                       << PAGE_CACHE_SHIFT;
                                len = wsize;
-                               req = ceph_osdc_new_request(&client->osdc,
+                               req = ceph_osdc_new_request(&fsc->client->osdc,
                                            &ci->i_layout,
                                            ceph_vino(inode),
                                            offset, &len,
@@ -786,13 +774,19 @@ get_more_pages:
                                            snapc, do_sync,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
-                                           &inode->i_mtime, true, 1);
+                                           &inode->i_mtime, true, 1, 0);
+
+                               if (!req) {
+                                       rc = -ENOMEM;
+                                       unlock_page(page);
+                                       break;
+                               }
+
                                max_pages = req->r_num_pages;
 
-                               alloc_page_vec(client, req);
+                               alloc_page_vec(fsc, req);
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
-                               req->r_wbc = wbc;
                        }
 
                        /* note position of first page in pvec */
@@ -801,9 +795,12 @@ get_more_pages:
                        dout("%p will write page %p idx %lu\n",
                             inode, page, page->index);
 
-                       writeback_stat = atomic_long_inc_return(&client->writeback_count);
-                       if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
-                               set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+                       writeback_stat =
+                              atomic_long_inc_return(&fsc->writeback_count);
+                       if (writeback_stat > CONGESTION_ON_THRESH(
+                                   fsc->mount_options->congestion_kb)) {
+                               set_bdi_congested(&fsc->backing_dev_info,
+                                                 BLK_RW_ASYNC);
                        }
 
                        set_page_writeback(page);
@@ -851,7 +848,7 @@ get_more_pages:
                op->payload_len = cpu_to_le32(len);
                req->r_request->hdr.data_len = cpu_to_le32(len);
 
-               ceph_osdc_start_request(&client->osdc, req, true);
+               ceph_osdc_start_request(&fsc->client->osdc, req, true);
                req = NULL;
 
                /* continue? */
@@ -887,7 +884,6 @@ out:
                rc = 0;  /* vfs expects us to return 0 */
        ceph_put_snap_context(snapc);
        dout("writepages done, rc = %d\n", rc);
-out_final:
        return rc;
 }
 
@@ -900,38 +896,33 @@ static int context_is_writeable_or_written(struct inode *inode,
                                           struct ceph_snap_context *snapc)
 {
        struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
-       return !oldest || snapc->seq <= oldest->seq;
+       int ret = !oldest || snapc->seq <= oldest->seq;
+
+       ceph_put_snap_context(oldest);
+       return ret;
 }
 
 /*
  * We are only allowed to write into/dirty the page if the page is
  * clean, or already dirty within the same snap context.
+ *
+ * called with page locked.
+ * return success with page locked,
+ * or any failure (incl -EAGAIN) with page unlocked.
  */
-static int ceph_write_begin(struct file *file, struct address_space *mapping,
-                           loff_t pos, unsigned len, unsigned flags,
-                           struct page **pagep, void **fsdata)
+static int ceph_update_writeable_page(struct file *file,
+                           loff_t pos, unsigned len,
+                           struct page *page)
 {
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
-       struct page *page;
-       pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        loff_t page_off = pos & PAGE_CACHE_MASK;
        int pos_in_page = pos & ~PAGE_CACHE_MASK;
        int end_in_page = pos_in_page + len;
        loff_t i_size;
-       struct ceph_snap_context *snapc;
        int r;
-
-       /* get a page*/
-retry:
-       page = grab_cache_page_write_begin(mapping, index, 0);
-       if (!page)
-               return -ENOMEM;
-       *pagep = page;
-
-       dout("write_begin file %p inode %p page %p %d~%d\n", file,
-            inode, page, (int)pos, (int)len);
+       struct ceph_snap_context *snapc, *oldest;
 
 retry_locked:
        /* writepages currently holds page lock, but if we change that later, */
@@ -941,31 +932,34 @@ retry_locked:
        BUG_ON(!ci->i_snap_realm);
        down_read(&mdsc->snap_rwsem);
        BUG_ON(!ci->i_snap_realm->cached_context);
-       if (page->private &&
-           (void *)page->private != ci->i_snap_realm->cached_context) {
+       snapc = (void *)page->private;
+       if (snapc && snapc != ci->i_head_snapc) {
                /*
                 * this page is already dirty in another (older) snap
                 * context!  is it writeable now?
                 */
-               snapc = get_oldest_context(inode, NULL);
+               oldest = get_oldest_context(inode, NULL);
                up_read(&mdsc->snap_rwsem);
 
-               if (snapc != (void *)page->private) {
+               if (snapc->seq > oldest->seq) {
+                       ceph_put_snap_context(oldest);
                        dout(" page %p snapc %p not current or oldest\n",
-                            page, (void *)page->private);
+                            page, snapc);
                        /*
                         * queue for writeback, and wait for snapc to
                         * be writeable or written
                         */
-                       snapc = ceph_get_snap_context((void *)page->private);
+                       snapc = ceph_get_snap_context(snapc);
                        unlock_page(page);
-                       if (ceph_queue_writeback(inode))
-                               igrab(inode);
-                       wait_event_interruptible(ci->i_cap_wq,
+                       ceph_queue_writeback(inode);
+                       r = wait_event_interruptible(ci->i_cap_wq,
                               context_is_writeable_or_written(inode, snapc));
                        ceph_put_snap_context(snapc);
-                       goto retry;
+                       if (r == -ERESTARTSYS)
+                               return r;
+                       return -EAGAIN;
                }
+               ceph_put_snap_context(oldest);
 
                /* yay, writeable, do it now (without dropping page lock) */
                dout(" page %p snapc %p not current, but oldest\n",
@@ -1022,6 +1016,35 @@ fail_nosnap:
 }
 
 /*
+ * We are only allowed to write into/dirty the page if the page is
+ * clean, or already dirty within the same snap context.
+ */
+static int ceph_write_begin(struct file *file, struct address_space *mapping,
+                           loff_t pos, unsigned len, unsigned flags,
+                           struct page **pagep, void **fsdata)
+{
+       struct inode *inode = file->f_dentry->d_inode;
+       struct page *page;
+       pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+       int r;
+
+       do {
+               /* get a page */
+               page = grab_cache_page_write_begin(mapping, index, 0);
+               if (!page)
+                       return -ENOMEM;
+               *pagep = page;
+
+               dout("write_begin file %p inode %p page %p %d~%d\n", file,
+                    inode, page, (int)pos, (int)len);
+
+               r = ceph_update_writeable_page(file, pos, len, page);
+       } while (r == -EAGAIN);
+
+       return r;
+}
+
+/*
  * we don't do anything in here that simple_write_end doesn't do
  * except adjust dirty page accounting and drop read lock on
  * mdsc->snap_rwsem.
@@ -1031,8 +1054,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
                          struct page *page, void *fsdata)
 {
        struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_client *client = ceph_inode_to_client(inode);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
        int check_cap = 0;
 
@@ -1101,11 +1124,9 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 {
        struct inode *inode = vma->vm_file->f_dentry->d_inode;
        struct page *page = vmf->page;
-       struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        loff_t off = page->index << PAGE_CACHE_SHIFT;
        loff_t size, len;
-       struct page *locked_page = NULL;
-       void *fsdata = NULL;
        int ret;
 
        size = i_size_read(inode);
@@ -1116,23 +1137,30 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 
        dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
             off, len, page, page->index);
-       ret = ceph_write_begin(vma->vm_file, inode->i_mapping, off, len, 0,
-                              &locked_page, &fsdata);
-       WARN_ON(page != locked_page);
-       if (!ret) {
-               /*
-                * doing the following, instead of calling
-                * ceph_write_end. Note that we keep the
-                * page locked
-                */
+
+       lock_page(page);
+
+       ret = VM_FAULT_NOPAGE;
+       if ((off > size) ||
+           (page->mapping != inode->i_mapping))
+               goto out;
+
+       ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
+       if (ret == 0) {
+               /* success.  we'll keep the page locked. */
                set_page_dirty(page);
                up_read(&mdsc->snap_rwsem);
-               page_cache_release(page);
                ret = VM_FAULT_LOCKED;
        } else {
-               ret = VM_FAULT_SIGBUS;
+               if (ret == -ENOMEM)
+                       ret = VM_FAULT_OOM;
+               else
+                       ret = VM_FAULT_SIGBUS;
        }
+out:
        dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
+       if (ret != VM_FAULT_LOCKED)
+               unlock_page(page);
        return ret;
 }