Merge git://git.linux-nfs.org/pub/linux/nfs-2.6
Linus Torvalds [Sat, 25 Mar 2006 17:18:27 +0000 (09:18 -0800)]
* git://git.linux-nfs.org/pub/linux/nfs-2.6: (103 commits)
  SUNRPC,RPCSEC_GSS: spkm3--fix config dependencies
  SUNRPC,RPCSEC_GSS: spkm3: import contexts using NID_cast5_cbc
  LOCKD: Make nlmsvc_traverse_shares return void
  LOCKD: nlmsvc_traverse_blocks return is unused
  SUNRPC,RPCSEC_GSS: fix krb5 sequence numbers.
  NFSv4: Dont list system.nfs4_acl for filesystems that don't support it.
  SUNRPC,RPCSEC_GSS: remove unnecessary kmalloc of a checksum
  SUNRPC: Ensure rpc_call_async() always calls tk_ops->rpc_release()
  SUNRPC: Fix memory barriers for req->rq_received
  NFS: Fix a race in nfs_sync_inode()
  NFS: Clean up nfs_flush_list()
  NFS: Fix a race with PG_private and nfs_release_page()
  NFSv4: Ensure the callback daemon flushes signals
  SUNRPC: Fix a 'Busy inodes' error in rpc_pipefs
  NFS, NLM: Allow blocking locks to respect signals
  NFS: Make nfs_fhget() return appropriate error values
  NFSv4: Fix an oops in nfs4_fill_super
  lockd: blocks should hold a reference to the nlm_file
  NFSv4: SETCLIENTID_CONFIRM should handle NFS4ERR_DELAY/NFS4ERR_RESOURCE
  NFSv4: Send the delegation stateid for SETATTR calls
  ...

1  2 
fs/lockd/mon.c
fs/lockd/xdr.c
fs/nfs/direct.c
fs/nfs/inode.c
fs/nfs/mount_clnt.c
fs/nfs/nfs2xdr.c
fs/nfs/nfs3xdr.c
fs/nfs/nfs4xdr.c
fs/nfsd/nfs4callback.c
include/linux/fs.h
net/sunrpc/rpc_pipe.c

diff --combined fs/lockd/mon.c
@@@ -35,6 -35,10 +35,10 @@@ nsm_mon_unmon(struct nlm_host *host, u3
        struct rpc_clnt *clnt;
        int             status;
        struct nsm_args args;
+       struct rpc_message msg = {
+               .rpc_argp       = &args,
+               .rpc_resp       = res,
+       };
  
        clnt = nsm_create();
        if (IS_ERR(clnt)) {
@@@ -49,7 -53,8 +53,8 @@@
        args.proc = NLMPROC_NSM_NOTIFY;
        memset(res, 0, sizeof(*res));
  
-       status = rpc_call(clnt, proc, &args, res, 0);
+       msg.rpc_proc = &clnt->cl_procinfo[proc];
+       status = rpc_call_sync(clnt, &msg, 0);
        if (status < 0)
                printk(KERN_DEBUG "nsm_mon_unmon: rpc failed, status=%d\n",
                        status);
@@@ -214,18 -219,22 +219,22 @@@ static struct rpc_procinfo      nsm_procedur
                .p_encode       = (kxdrproc_t) xdr_encode_mon,
                .p_decode       = (kxdrproc_t) xdr_decode_stat_res,
                .p_bufsiz       = MAX(SM_mon_sz, SM_monres_sz) << 2,
+               .p_statidx      = SM_MON,
+               .p_name         = "MONITOR",
        },
  [SM_UNMON] = {
                .p_proc         = SM_UNMON,
                .p_encode       = (kxdrproc_t) xdr_encode_unmon,
                .p_decode       = (kxdrproc_t) xdr_decode_stat,
                .p_bufsiz       = MAX(SM_mon_id_sz, SM_unmonres_sz) << 2,
+               .p_statidx      = SM_UNMON,
+               .p_name         = "UNMONITOR",
        },
  };
  
  static struct rpc_version     nsm_version1 = {
 -              .number         = 1, 
 -              .nrprocs        = sizeof(nsm_procedures)/sizeof(nsm_procedures[0]),
 +              .number         = 1,
 +              .nrprocs        = ARRAY_SIZE(nsm_procedures),
                .procs          = nsm_procedures
  };
  
@@@ -238,7 -247,7 +247,7 @@@ static struct rpc_stat             nsm_stats
  static struct rpc_program     nsm_program = {
                .name           = "statd",
                .number         = SM_PROGRAM,
 -              .nrvers         = sizeof(nsm_version)/sizeof(nsm_version[0]),
 +              .nrvers         = ARRAY_SIZE(nsm_version),
                .version        = nsm_version,
                .stats          = &nsm_stats
  };
diff --combined fs/lockd/xdr.c
@@@ -131,10 -131,11 +131,11 @@@ nlm_decode_lock(u32 *p, struct nlm_loc
         || !(p = nlm_decode_fh(p, &lock->fh))
         || !(p = nlm_decode_oh(p, &lock->oh)))
                return NULL;
+       lock->svid  = ntohl(*p++);
  
        locks_init_lock(fl);
        fl->fl_owner = current->files;
-       fl->fl_pid   = ntohl(*p++);
+       fl->fl_pid   = (pid_t)lock->svid;
        fl->fl_flags = FL_POSIX;
        fl->fl_type  = F_RDLCK;         /* as good as anything else */
        start = ntohl(*p++);
@@@ -174,7 -175,7 +175,7 @@@ nlm_encode_lock(u32 *p, struct nlm_loc
        else
                len = loff_t_to_s32(fl->fl_end - fl->fl_start + 1);
  
-       *p++ = htonl(fl->fl_pid);
+       *p++ = htonl(lock->svid);
        *p++ = htonl(start);
        *p++ = htonl(len);
  
@@@ -197,7 -198,7 +198,7 @@@ nlm_encode_testres(u32 *p, struct nlm_r
                struct file_lock        *fl = &resp->lock.fl;
  
                *p++ = (fl->fl_type == F_RDLCK)? xdr_zero : xdr_one;
-               *p++ = htonl(fl->fl_pid);
+               *p++ = htonl(resp->lock.svid);
  
                /* Encode owner handle. */
                if (!(p = xdr_encode_netobj(p, &resp->lock.oh)))
@@@ -298,7 -299,8 +299,8 @@@ nlmsvc_decode_shareargs(struct svc_rqs
  
        memset(lock, 0, sizeof(*lock));
        locks_init_lock(&lock->fl);
-       lock->fl.fl_pid = ~(u32) 0;
+       lock->svid = ~(u32) 0;
+       lock->fl.fl_pid = (pid_t)lock->svid;
  
        if (!(p = nlm_decode_cookie(p, &argp->cookie))
         || !(p = xdr_decode_string_inplace(p, &lock->caller,
@@@ -415,7 -417,8 +417,8 @@@ nlmclt_decode_testres(struct rpc_rqst *
                memset(&resp->lock, 0, sizeof(resp->lock));
                locks_init_lock(fl);
                excl = ntohl(*p++);
-               fl->fl_pid = ntohl(*p++);
+               resp->lock.svid = ntohl(*p++);
+               fl->fl_pid = (pid_t)resp->lock.svid;
                if (!(p = nlm_decode_oh(p, &resp->lock.oh)))
                        return -EIO;
  
@@@ -543,7 -546,9 +546,9 @@@ nlmclt_decode_res(struct rpc_rqst *req
        .p_proc      = NLMPROC_##proc,                                  \
        .p_encode    = (kxdrproc_t) nlmclt_encode_##argtype,            \
        .p_decode    = (kxdrproc_t) nlmclt_decode_##restype,            \
-       .p_bufsiz    = MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2 \
+       .p_bufsiz    = MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2,        \
+       .p_statidx   = NLMPROC_##proc,                                  \
+       .p_name      = #proc,                                           \
        }
  
  static struct rpc_procinfo    nlm_procedures[] = {
@@@ -599,7 -604,7 +604,7 @@@ static struct rpc_stat             nlm_stats
  struct rpc_program            nlm_program = {
                .name           = "lockd",
                .number         = NLM_PROGRAM,
 -              .nrvers         = sizeof(nlm_versions) / sizeof(nlm_versions[0]),
 +              .nrvers         = ARRAY_SIZE(nlm_versions),
                .version        = nlm_versions,
                .stats          = &nlm_stats,
  };
diff --combined fs/nfs/direct.c
@@@ -7,11 -7,11 +7,11 @@@
   *
   * There are important applications whose performance or correctness
   * depends on uncached access to file data.  Database clusters
-  * (multiple copies of the same instance running on separate hosts) 
+  * (multiple copies of the same instance running on separate hosts)
   * implement their own cache coherency protocol that subsumes file
-  * system cache protocols.  Applications that process datasets 
-  * considerably larger than the client's memory do not always benefit 
-  * from a local cache.  A streaming video server, for instance, has no 
+  * system cache protocols.  Applications that process datasets
+  * considerably larger than the client's memory do not always benefit
+  * from a local cache.  A streaming video server, for instance, has no
   * need to cache the contents of a file.
   *
   * When an application requests uncached I/O, all read and write requests
@@@ -34,6 -34,7 +34,7 @@@
   * 08 Jun 2003        Port to 2.5 APIs  --cel
   * 31 Mar 2004        Handle direct I/O without VFS support  --cel
   * 15 Sep 2004        Parallel async reads  --cel
+  * 04 May 2005        support O_DIRECT with aio  --cel
   *
   */
  
  #include <asm/uaccess.h>
  #include <asm/atomic.h>
  
+ #include "iostat.h"
  #define NFSDBG_FACILITY               NFSDBG_VFS
- #define MAX_DIRECTIO_SIZE     (4096UL << PAGE_SHIFT)
  
- static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
  static kmem_cache_t *nfs_direct_cachep;
  
  /*
   */
  struct nfs_direct_req {
        struct kref             kref;           /* release manager */
-       struct list_head        list;           /* nfs_read_data structs */
-       wait_queue_head_t       wait;           /* wait for i/o completion */
+       /* I/O parameters */
+       struct list_head        list,           /* nfs_read/write_data structs */
+                               rewrite_list;   /* saved nfs_write_data structs */
+       struct nfs_open_context *ctx;           /* file open context info */
+       struct kiocb *          iocb;           /* controlling i/o request */
+       struct inode *          inode;          /* target file of i/o */
+       unsigned long           user_addr;      /* location of user's buffer */
+       size_t                  user_count;     /* total bytes to move */
+       loff_t                  pos;            /* starting offset in file */
        struct page **          pages;          /* pages in our buffer */
        unsigned int            npages;         /* count of pages */
-       atomic_t                complete,       /* i/os we're waiting for */
-                               count,          /* bytes actually processed */
+       /* completion state */
+       spinlock_t              lock;           /* protect completion state */
+       int                     outstanding;    /* i/os we're waiting for */
+       ssize_t                 count,          /* bytes actually processed */
                                error;          /* any reported error */
+       struct completion       completion;     /* wait for i/o completion */
+       /* commit state */
+       struct nfs_write_data * commit_data;    /* special write_data for commits */
+       int                     flags;
+ #define NFS_ODIRECT_DO_COMMIT         (1)     /* an unstable reply was received */
+ #define NFS_ODIRECT_RESCHED_WRITES    (2)     /* write verification failed */
+       struct nfs_writeverf    verf;           /* unstable write verifier */
  };
  
+ static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync);
+ static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
  
  /**
-  * nfs_get_user_pages - find and set up pages underlying user's buffer
-  * rw: direction (read or write)
-  * user_addr: starting address of this segment of user's buffer
-  * count: size of this segment
-  * @pages: returned array of page struct pointers underlying user's buffer
+  * nfs_direct_IO - NFS address space operation for direct I/O
+  * @rw: direction (read or write)
+  * @iocb: target I/O control block
+  * @iov: array of vectors that define I/O buffer
+  * @pos: offset in file to begin the operation
+  * @nr_segs: size of iovec array
+  *
+  * The presence of this routine in the address space ops vector means
+  * the NFS client supports direct I/O.  However, we shunt off direct
+  * read and write requests before the VFS gets them, so this method
+  * should never be called.
   */
- static inline int
- nfs_get_user_pages(int rw, unsigned long user_addr, size_t size,
-               struct page ***pages)
+ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
+ {
+       struct dentry *dentry = iocb->ki_filp->f_dentry;
+       dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
+                       dentry->d_name.name, (long long) pos, nr_segs);
+       return -EINVAL;
+ }
+ static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
+ {
+       int i;
+       for (i = 0; i < npages; i++) {
+               struct page *page = pages[i];
+               if (do_dirty && !PageCompound(page))
+                       set_page_dirty_lock(page);
+               page_cache_release(page);
+       }
+       kfree(pages);
+ }
+ static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, struct page ***pages)
  {
        int result = -ENOMEM;
        unsigned long page_count;
        size_t array_size;
  
-       /* set an arbitrary limit to prevent type overflow */
-       /* XXX: this can probably be as large as INT_MAX */
-       if (size > MAX_DIRECTIO_SIZE) {
-               *pages = NULL;
-               return -EFBIG;
-       }
        page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
        page_count -= user_addr >> PAGE_SHIFT;
  
                                        page_count, (rw == READ), 0,
                                        *pages, NULL);
                up_read(&current->mm->mmap_sem);
-               /*
-                * If we got fewer pages than expected from get_user_pages(),
-                * the user buffer runs off the end of a mapping; return EFAULT.
-                */
-               if (result >= 0 && result < page_count) {
-                       nfs_free_user_pages(*pages, result, 0);
+               if (result != page_count) {
+                       /*
+                        * If we got fewer pages than expected from
+                        * get_user_pages(), the user buffer runs off the
+                        * end of a mapping; return EFAULT.
+                        */
+                       if (result >= 0) {
+                               nfs_free_user_pages(*pages, result, 0);
+                               result = -EFAULT;
+                       } else
+                               kfree(*pages);
                        *pages = NULL;
-                       result = -EFAULT;
                }
        }
        return result;
  }
  
- /**
-  * nfs_free_user_pages - tear down page struct array
-  * @pages: array of page struct pointers underlying target buffer
-  * @npages: number of pages in the array
-  * @do_dirty: dirty the pages as we release them
-  */
- static void
- nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
+ static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
  {
-       int i;
-       for (i = 0; i < npages; i++) {
-               struct page *page = pages[i];
-               if (do_dirty && !PageCompound(page))
-                       set_page_dirty_lock(page);
-               page_cache_release(page);
-       }
-       kfree(pages);
+       struct nfs_direct_req *dreq;
+       dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
+       if (!dreq)
+               return NULL;
+       kref_init(&dreq->kref);
+       init_completion(&dreq->completion);
+       INIT_LIST_HEAD(&dreq->list);
+       INIT_LIST_HEAD(&dreq->rewrite_list);
+       dreq->iocb = NULL;
+       dreq->ctx = NULL;
+       spin_lock_init(&dreq->lock);
+       dreq->outstanding = 0;
+       dreq->count = 0;
+       dreq->error = 0;
+       dreq->flags = 0;
+       return dreq;
  }
  
  static void nfs_direct_req_release(struct kref *kref)
  {
        struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
+       if (dreq->ctx != NULL)
+               put_nfs_open_context(dreq->ctx);
        kmem_cache_free(nfs_direct_cachep, dreq);
  }
  
- /**
-  * nfs_direct_read_alloc - allocate nfs_read_data structures for direct read
-  * @count: count of bytes for the read request
-  * @rsize: local rsize setting
+ /*
+  * Collects and returns the final error value/byte-count.
+  */
+ static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
+ {
+       ssize_t result = -EIOCBQUEUED;
+       /* Async requests don't wait here */
+       if (dreq->iocb)
+               goto out;
+       result = wait_for_completion_interruptible(&dreq->completion);
+       if (!result)
+               result = dreq->error;
+       if (!result)
+               result = dreq->count;
+ out:
+       kref_put(&dreq->kref, nfs_direct_req_release);
+       return (ssize_t) result;
+ }
+ /*
+  * We must hold a reference to all the pages in this direct read request
+  * until the RPCs complete.  This could be long *after* we are woken up in
+  * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
   *
+  * In addition, synchronous I/O uses a stack-allocated iocb.  Thus we
+  * can't trust the iocb is still valid here if this is a synchronous
+  * request.  If the waiter is woken prematurely, the iocb is long gone.
+  */
+ static void nfs_direct_complete(struct nfs_direct_req *dreq)
+ {
+       nfs_free_user_pages(dreq->pages, dreq->npages, 1);
+       if (dreq->iocb) {
+               long res = (long) dreq->error;
+               if (!res)
+                       res = (long) dreq->count;
+               aio_complete(dreq->iocb, res, 0);
+       }
+       complete_all(&dreq->completion);
+       kref_put(&dreq->kref, nfs_direct_req_release);
+ }
+ /*
   * Note we also set the number of requests we have in the dreq when we are
   * done.  This prevents races with I/O completion so we will always wait
   * until all requests have been dispatched and completed.
   */
- static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, unsigned int rsize)
+ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
  {
        struct list_head *list;
        struct nfs_direct_req *dreq;
-       unsigned int reads = 0;
        unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
  
-       dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
+       dreq = nfs_direct_req_alloc();
        if (!dreq)
                return NULL;
  
-       kref_init(&dreq->kref);
-       init_waitqueue_head(&dreq->wait);
-       INIT_LIST_HEAD(&dreq->list);
-       atomic_set(&dreq->count, 0);
-       atomic_set(&dreq->error, 0);
        list = &dreq->list;
        for(;;) {
                struct nfs_read_data *data = nfs_readdata_alloc(rpages);
                list_add(&data->pages, list);
  
                data->req = (struct nfs_page *) dreq;
-               reads++;
+               dreq->outstanding++;
                if (nbytes <= rsize)
                        break;
                nbytes -= rsize;
        }
        kref_get(&dreq->kref);
        return dreq;
  }
  
- /**
-  * nfs_direct_read_result - handle a read reply for a direct read request
-  * @data: address of NFS READ operation control block
-  * @status: status of this NFS READ operation
-  *
-  * We must hold a reference to all the pages in this direct read request
-  * until the RPCs complete.  This could be long *after* we are woken up in
-  * nfs_direct_read_wait (for instance, if someone hits ^C on a slow server).
-  */
- static void nfs_direct_read_result(struct nfs_read_data *data, int status)
+ static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
  {
+       struct nfs_read_data *data = calldata;
        struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
  
-       if (likely(status >= 0))
-               atomic_add(data->res.count, &dreq->count);
+       if (nfs_readpage_result(task, data) != 0)
+               return;
+       spin_lock(&dreq->lock);
+       if (likely(task->tk_status >= 0))
+               dreq->count += data->res.count;
        else
-               atomic_set(&dreq->error, status);
+               dreq->error = task->tk_status;
  
-       if (unlikely(atomic_dec_and_test(&dreq->complete))) {
-               nfs_free_user_pages(dreq->pages, dreq->npages, 1);
-               wake_up(&dreq->wait);
-               kref_put(&dreq->kref, nfs_direct_req_release);
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
        }
+       spin_unlock(&dreq->lock);
+       nfs_direct_complete(dreq);
  }
  
- /**
-  * nfs_direct_read_schedule - dispatch NFS READ operations for a direct read
-  * @dreq: address of nfs_direct_req struct for this request
-  * @inode: target inode
-  * @ctx: target file open context
-  * @user_addr: starting address of this segment of user's buffer
-  * @count: size of this segment
-  * @file_offset: offset in file to begin the operation
-  *
+ static const struct rpc_call_ops nfs_read_direct_ops = {
+       .rpc_call_done = nfs_direct_read_result,
+       .rpc_release = nfs_readdata_release,
+ };
+ /*
   * For each nfs_read_data struct that was allocated on the list, dispatch
   * an NFS READ operation
   */
- static void nfs_direct_read_schedule(struct nfs_direct_req *dreq,
-               struct inode *inode, struct nfs_open_context *ctx,
-               unsigned long user_addr, size_t count, loff_t file_offset)
+ static void nfs_direct_read_schedule(struct nfs_direct_req *dreq)
  {
+       struct nfs_open_context *ctx = dreq->ctx;
+       struct inode *inode = ctx->dentry->d_inode;
        struct list_head *list = &dreq->list;
        struct page **pages = dreq->pages;
+       size_t count = dreq->user_count;
+       loff_t pos = dreq->pos;
+       size_t rsize = NFS_SERVER(inode)->rsize;
        unsigned int curpage, pgbase;
-       unsigned int rsize = NFS_SERVER(inode)->rsize;
  
        curpage = 0;
-       pgbase = user_addr & ~PAGE_MASK;
+       pgbase = dreq->user_addr & ~PAGE_MASK;
        do {
                struct nfs_read_data *data;
-               unsigned int bytes;
+               size_t bytes;
  
                bytes = rsize;
                if (count < rsize)
                        bytes = count;
  
+               BUG_ON(list_empty(list));
                data = list_entry(list->next, struct nfs_read_data, pages);
                list_del_init(&data->pages);
  
                data->cred = ctx->cred;
                data->args.fh = NFS_FH(inode);
                data->args.context = ctx;
-               data->args.offset = file_offset;
+               data->args.offset = pos;
                data->args.pgbase = pgbase;
                data->args.pages = &pages[curpage];
                data->args.count = bytes;
                data->res.eof = 0;
                data->res.count = bytes;
  
+               rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
+                               &nfs_read_direct_ops, data);
                NFS_PROTO(inode)->read_setup(data);
  
                data->task.tk_cookie = (unsigned long) inode;
-               data->complete = nfs_direct_read_result;
  
                lock_kernel();
                rpc_execute(&data->task);
                unlock_kernel();
  
-               dfprintk(VFS, "NFS: %4d initiated direct read call (req %s/%Ld, %u bytes @ offset %Lu)\n",
+               dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
                                data->task.tk_pid,
                                inode->i_sb->s_id,
                                (long long)NFS_FILEID(inode),
                                bytes,
                                (unsigned long long)data->args.offset);
  
-               file_offset += bytes;
+               pos += bytes;
                pgbase += bytes;
                curpage += pgbase >> PAGE_SHIFT;
                pgbase &= ~PAGE_MASK;
  
                count -= bytes;
        } while (count != 0);
+       BUG_ON(!list_empty(list));
  }
  
- /**
-  * nfs_direct_read_wait - wait for I/O completion for direct reads
-  * @dreq: request on which we are to wait
-  * @intr: whether or not this wait can be interrupted
-  *
-  * Collects and returns the final error value/byte-count.
-  */
- static ssize_t nfs_direct_read_wait(struct nfs_direct_req *dreq, int intr)
- {
-       int result = 0;
-       if (intr) {
-               result = wait_event_interruptible(dreq->wait,
-                                       (atomic_read(&dreq->complete) == 0));
-       } else {
-               wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
-       }
-       if (!result)
-               result = atomic_read(&dreq->error);
-       if (!result)
-               result = atomic_read(&dreq->count);
-       kref_put(&dreq->kref, nfs_direct_req_release);
-       return (ssize_t) result;
- }
- /**
-  * nfs_direct_read_seg - Read in one iov segment.  Generate separate
-  *                        read RPCs for each "rsize" bytes.
-  * @inode: target inode
-  * @ctx: target file open context
-  * @user_addr: starting address of this segment of user's buffer
-  * @count: size of this segment
-  * @file_offset: offset in file to begin the operation
-  * @pages: array of addresses of page structs defining user's buffer
-  * @nr_pages: number of pages in the array
-  *
-  */
- static ssize_t nfs_direct_read_seg(struct inode *inode,
-               struct nfs_open_context *ctx, unsigned long user_addr,
-               size_t count, loff_t file_offset, struct page **pages,
-               unsigned int nr_pages)
+ static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, unsigned int nr_pages)
  {
        ssize_t result;
        sigset_t oldset;
+       struct inode *inode = iocb->ki_filp->f_mapping->host;
        struct rpc_clnt *clnt = NFS_CLIENT(inode);
        struct nfs_direct_req *dreq;
  
        if (!dreq)
                return -ENOMEM;
  
+       dreq->user_addr = user_addr;
+       dreq->user_count = count;
+       dreq->pos = pos;
        dreq->pages = pages;
        dreq->npages = nr_pages;
+       dreq->inode = inode;
+       dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
+       if (!is_sync_kiocb(iocb))
+               dreq->iocb = iocb;
  
+       nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
        rpc_clnt_sigmask(clnt, &oldset);
-       nfs_direct_read_schedule(dreq, inode, ctx, user_addr, count,
-                                file_offset);
-       result = nfs_direct_read_wait(dreq, clnt->cl_intr);
+       nfs_direct_read_schedule(dreq);
+       result = nfs_direct_wait(dreq);
        rpc_clnt_sigunmask(clnt, &oldset);
  
        return result;
  }
  
- /**
-  * nfs_direct_read - For each iov segment, map the user's buffer
-  *                   then generate read RPCs.
-  * @inode: target inode
-  * @ctx: target file open context
-  * @iov: array of vectors that define I/O buffer
-  * file_offset: offset in file to begin the operation
-  * nr_segs: size of iovec array
-  *
-  * We've already pushed out any non-direct writes so that this read
-  * will see them when we read from the server.
-  */
- static ssize_t
- nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
-               const struct iovec *iov, loff_t file_offset,
-               unsigned long nr_segs)
+ static void nfs_direct_free_writedata(struct nfs_direct_req *dreq)
  {
-       ssize_t tot_bytes = 0;
-       unsigned long seg = 0;
-       while ((seg < nr_segs) && (tot_bytes >= 0)) {
-               ssize_t result;
-               int page_count;
-               struct page **pages;
-               const struct iovec *vec = &iov[seg++];
-               unsigned long user_addr = (unsigned long) vec->iov_base;
-               size_t size = vec->iov_len;
-                 page_count = nfs_get_user_pages(READ, user_addr, size, &pages);
-                 if (page_count < 0) {
-                         nfs_free_user_pages(pages, 0, 0);
-                       if (tot_bytes > 0)
-                               break;
-                         return page_count;
-                 }
+       list_splice_init(&dreq->rewrite_list, &dreq->list);
+       while (!list_empty(&dreq->list)) {
+               struct nfs_write_data *data = list_entry(dreq->list.next, struct nfs_write_data, pages);
+               list_del(&data->pages);
+               nfs_writedata_release(data);
+       }
+ }
  
-               result = nfs_direct_read_seg(inode, ctx, user_addr, size,
-                               file_offset, pages, page_count);
+ #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
+ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
+ {
+       struct list_head *pos;
  
-               if (result <= 0) {
-                       if (tot_bytes > 0)
-                               break;
-                       return result;
-               }
-               tot_bytes += result;
-               file_offset += result;
-               if (result < size)
-                       break;
+       list_splice_init(&dreq->rewrite_list, &dreq->list);
+       list_for_each(pos, &dreq->list)
+               dreq->outstanding++;
+       dreq->count = 0;
+       nfs_direct_write_schedule(dreq, FLUSH_STABLE);
+ }
+ static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
+ {
+       struct nfs_write_data *data = calldata;
+       struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
+       /* Call the NFS version-specific code */
+       if (NFS_PROTO(data->inode)->commit_done(task, data) != 0)
+               return;
+       if (unlikely(task->tk_status < 0)) {
+               dreq->error = task->tk_status;
+               dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
+       }
+       if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
+               dprintk("NFS: %5u commit verify failed\n", task->tk_pid);
+               dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
        }
  
-       return tot_bytes;
+       dprintk("NFS: %5u commit returned %d\n", task->tk_pid, task->tk_status);
+       nfs_direct_write_complete(dreq, data->inode);
  }
  
- /**
-  * nfs_direct_write_seg - Write out one iov segment.  Generate separate
-  *                        write RPCs for each "wsize" bytes, then commit.
-  * @inode: target inode
-  * @ctx: target file open context
-  * user_addr: starting address of this segment of user's buffer
-  * count: size of this segment
-  * file_offset: offset in file to begin the operation
-  * @pages: array of addresses of page structs defining user's buffer
-  * nr_pages: size of pages array
-  */
- static ssize_t nfs_direct_write_seg(struct inode *inode,
-               struct nfs_open_context *ctx, unsigned long user_addr,
-               size_t count, loff_t file_offset, struct page **pages,
-               int nr_pages)
+ static const struct rpc_call_ops nfs_commit_direct_ops = {
+       .rpc_call_done = nfs_direct_commit_result,
+       .rpc_release = nfs_commit_release,
+ };
+ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
  {
-       const unsigned int wsize = NFS_SERVER(inode)->wsize;
-       size_t request;
-       int curpage, need_commit;
-       ssize_t result, tot_bytes;
-       struct nfs_writeverf first_verf;
-       struct nfs_write_data *wdata;
-       wdata = nfs_writedata_alloc(NFS_SERVER(inode)->wpages);
-       if (!wdata)
-               return -ENOMEM;
+       struct nfs_write_data *data = dreq->commit_data;
+       struct rpc_task *task = &data->task;
  
-       wdata->inode = inode;
-       wdata->cred = ctx->cred;
-       wdata->args.fh = NFS_FH(inode);
-       wdata->args.context = ctx;
-       wdata->args.stable = NFS_UNSTABLE;
-       if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize)
-               wdata->args.stable = NFS_FILE_SYNC;
-       wdata->res.fattr = &wdata->fattr;
-       wdata->res.verf = &wdata->verf;
+       data->inode = dreq->inode;
+       data->cred = dreq->ctx->cred;
  
-       nfs_begin_data_update(inode);
- retry:
-       need_commit = 0;
-       tot_bytes = 0;
-       curpage = 0;
-       request = count;
-       wdata->args.pgbase = user_addr & ~PAGE_MASK;
-       wdata->args.offset = file_offset;
-       do {
-               wdata->args.count = request;
-               if (wdata->args.count > wsize)
-                       wdata->args.count = wsize;
-               wdata->args.pages = &pages[curpage];
+       data->args.fh = NFS_FH(data->inode);
+       data->args.offset = dreq->pos;
+       data->args.count = dreq->user_count;
+       data->res.count = 0;
+       data->res.fattr = &data->fattr;
+       data->res.verf = &data->verf;
  
-               dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n",
-                       wdata->args.count, (long long) wdata->args.offset,
-                       user_addr + tot_bytes, wdata->args.pgbase, curpage);
+       rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
+                               &nfs_commit_direct_ops, data);
+       NFS_PROTO(data->inode)->commit_setup(data, 0);
  
-               lock_kernel();
-               result = NFS_PROTO(inode)->write(wdata);
-               unlock_kernel();
+       data->task.tk_priority = RPC_PRIORITY_NORMAL;
+       data->task.tk_cookie = (unsigned long)data->inode;
+       /* Note: task.tk_ops->rpc_release will free dreq->commit_data */
+       dreq->commit_data = NULL;
  
-               if (result <= 0) {
-                       if (tot_bytes > 0)
-                               break;
-                       goto out;
-               }
+       dprintk("NFS: %5u initiated commit call\n", task->tk_pid);
  
-               if (tot_bytes == 0)
-                       memcpy(&first_verf.verifier, &wdata->verf.verifier,
-                                               sizeof(first_verf.verifier));
-               if (wdata->verf.committed != NFS_FILE_SYNC) {
-                       need_commit = 1;
-                       if (memcmp(&first_verf.verifier, &wdata->verf.verifier,
-                                       sizeof(first_verf.verifier)))
-                               goto sync_retry;
-               }
+       lock_kernel();
+       rpc_execute(&data->task);
+       unlock_kernel();
+ }
  
-               tot_bytes += result;
+ static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
+ {
+       int flags = dreq->flags;
  
-               /* in case of a short write: stop now, let the app recover */
-               if (result < wdata->args.count)
+       dreq->flags = 0;
+       switch (flags) {
+               case NFS_ODIRECT_DO_COMMIT:
+                       nfs_direct_commit_schedule(dreq);
                        break;
+               case NFS_ODIRECT_RESCHED_WRITES:
+                       nfs_direct_write_reschedule(dreq);
+                       break;
+               default:
+                       nfs_end_data_update(inode);
+                       if (dreq->commit_data != NULL)
+                               nfs_commit_free(dreq->commit_data);
+                       nfs_direct_free_writedata(dreq);
+                       nfs_direct_complete(dreq);
+       }
+ }
  
-               wdata->args.offset += result;
-               wdata->args.pgbase += result;
-               curpage += wdata->args.pgbase >> PAGE_SHIFT;
-               wdata->args.pgbase &= ~PAGE_MASK;
-               request -= result;
-       } while (request != 0);
+ static void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
+ {
+       dreq->commit_data = nfs_commit_alloc(0);
+       if (dreq->commit_data != NULL)
+               dreq->commit_data->req = (struct nfs_page *) dreq;
+ }
+ #else
+ static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
+ {
+       dreq->commit_data = NULL;
+ }
  
-       /*
-        * Commit data written so far, even in the event of an error
-        */
-       if (need_commit) {
-               wdata->args.count = tot_bytes;
-               wdata->args.offset = file_offset;
+ static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
+ {
+       nfs_end_data_update(inode);
+       nfs_direct_free_writedata(dreq);
+       nfs_direct_complete(dreq);
+ }
+ #endif
  
-               lock_kernel();
-               result = NFS_PROTO(inode)->commit(wdata);
-               unlock_kernel();
+ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
+ {
+       struct list_head *list;
+       struct nfs_direct_req *dreq;
+       unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+       dreq = nfs_direct_req_alloc();
+       if (!dreq)
+               return NULL;
+       list = &dreq->list;
+       for(;;) {
+               struct nfs_write_data *data = nfs_writedata_alloc(wpages);
  
-               if (result < 0 || memcmp(&first_verf.verifier,
-                                        &wdata->verf.verifier,
-                                        sizeof(first_verf.verifier)) != 0)
-                       goto sync_retry;
+               if (unlikely(!data)) {
+                       while (!list_empty(list)) {
+                               data = list_entry(list->next,
+                                                 struct nfs_write_data, pages);
+                               list_del(&data->pages);
+                               nfs_writedata_free(data);
+                       }
+                       kref_put(&dreq->kref, nfs_direct_req_release);
+                       return NULL;
+               }
+               INIT_LIST_HEAD(&data->pages);
+               list_add(&data->pages, list);
+               data->req = (struct nfs_page *) dreq;
+               dreq->outstanding++;
+               if (nbytes <= wsize)
+                       break;
+               nbytes -= wsize;
        }
-       result = tot_bytes;
  
- out:
-       nfs_end_data_update(inode);
-       nfs_writedata_free(wdata);
-       return result;
+       nfs_alloc_commit_data(dreq);
  
- sync_retry:
-       wdata->args.stable = NFS_FILE_SYNC;
-       goto retry;
+       kref_get(&dreq->kref);
+       return dreq;
  }
  
- /**
-  * nfs_direct_write - For each iov segment, map the user's buffer
-  *                    then generate write and commit RPCs.
-  * @inode: target inode
-  * @ctx: target file open context
-  * @iov: array of vectors that define I/O buffer
-  * file_offset: offset in file to begin the operation
-  * nr_segs: size of iovec array
-  *
-  * Upon return, generic_file_direct_IO invalidates any cached pages
-  * that non-direct readers might access, so they will pick up these
-  * writes immediately.
-  */
- static ssize_t nfs_direct_write(struct inode *inode,
-               struct nfs_open_context *ctx, const struct iovec *iov,
-               loff_t file_offset, unsigned long nr_segs)
+ static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
  {
-       ssize_t tot_bytes = 0;
-       unsigned long seg = 0;
-       while ((seg < nr_segs) && (tot_bytes >= 0)) {
-               ssize_t result;
-               int page_count;
-               struct page **pages;
-               const struct iovec *vec = &iov[seg++];
-               unsigned long user_addr = (unsigned long) vec->iov_base;
-               size_t size = vec->iov_len;
-                 page_count = nfs_get_user_pages(WRITE, user_addr, size, &pages);
-                 if (page_count < 0) {
-                         nfs_free_user_pages(pages, 0, 0);
-                       if (tot_bytes > 0)
-                               break;
-                         return page_count;
-                 }
+       struct nfs_write_data *data = calldata;
+       struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
+       int status = task->tk_status;
+       if (nfs_writeback_done(task, data) != 0)
+               return;
+       spin_lock(&dreq->lock);
  
-               result = nfs_direct_write_seg(inode, ctx, user_addr, size,
-                               file_offset, pages, page_count);
-               nfs_free_user_pages(pages, page_count, 0);
+       if (likely(status >= 0))
+               dreq->count += data->res.count;
+       else
+               dreq->error = task->tk_status;
  
-               if (result <= 0) {
-                       if (tot_bytes > 0)
+       if (data->res.verf->committed != NFS_FILE_SYNC) {
+               switch (dreq->flags) {
+                       case 0:
+                               memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf));
+                               dreq->flags = NFS_ODIRECT_DO_COMMIT;
                                break;
-                       return result;
+                       case NFS_ODIRECT_DO_COMMIT:
+                               if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) {
+                                       dprintk("NFS: %5u write verify failed\n", task->tk_pid);
+                                       dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
+                               }
                }
-               tot_bytes += result;
-               file_offset += result;
-               if (result < size)
-                       break;
        }
-       return tot_bytes;
+       /* In case we have to resend */
+       data->args.stable = NFS_FILE_SYNC;
+       spin_unlock(&dreq->lock);
  }
  
- /**
-  * nfs_direct_IO - NFS address space operation for direct I/O
-  * rw: direction (read or write)
-  * @iocb: target I/O control block
-  * @iov: array of vectors that define I/O buffer
-  * file_offset: offset in file to begin the operation
-  * nr_segs: size of iovec array
-  *
+ /*
+  * NB: Return the value of the first error return code.  Subsequent
+  *     errors after the first one are ignored.
   */
- ssize_t
- nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
-               loff_t file_offset, unsigned long nr_segs)
+ static void nfs_direct_write_release(void *calldata)
  {
-       ssize_t result = -EINVAL;
-       struct file *file = iocb->ki_filp;
-       struct nfs_open_context *ctx;
-       struct dentry *dentry = file->f_dentry;
-       struct inode *inode = dentry->d_inode;
+       struct nfs_write_data *data = calldata;
+       struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
  
-       /*
-        * No support for async yet
-        */
-       if (!is_sync_kiocb(iocb))
-               return result;
-       ctx = (struct nfs_open_context *)file->private_data;
-       switch (rw) {
-       case READ:
-               dprintk("NFS: direct_IO(read) (%s) off/no(%Lu/%lu)\n",
-                               dentry->d_name.name, file_offset, nr_segs);
-               result = nfs_direct_read(inode, ctx, iov,
-                                               file_offset, nr_segs);
-               break;
-       case WRITE:
-               dprintk("NFS: direct_IO(write) (%s) off/no(%Lu/%lu)\n",
-                               dentry->d_name.name, file_offset, nr_segs);
-               result = nfs_direct_write(inode, ctx, iov,
-                                               file_offset, nr_segs);
-               break;
-       default:
-               break;
+       spin_lock(&dreq->lock);
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
        }
+       spin_unlock(&dreq->lock);
+       nfs_direct_write_complete(dreq, data->inode);
+ }
+ static const struct rpc_call_ops nfs_write_direct_ops = {
+       .rpc_call_done = nfs_direct_write_result,
+       .rpc_release = nfs_direct_write_release,
+ };
+ /*
+  * For each nfs_write_data struct that was allocated on the list, dispatch
+  * an NFS WRITE operation
+  */
+ static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync)
+ {
+       struct nfs_open_context *ctx = dreq->ctx;
+       struct inode *inode = ctx->dentry->d_inode;
+       struct list_head *list = &dreq->list;
+       struct page **pages = dreq->pages;
+       size_t count = dreq->user_count;
+       loff_t pos = dreq->pos;
+       size_t wsize = NFS_SERVER(inode)->wsize;
+       unsigned int curpage, pgbase;
+       curpage = 0;
+       pgbase = dreq->user_addr & ~PAGE_MASK;
+       do {
+               struct nfs_write_data *data;
+               size_t bytes;
+               bytes = wsize;
+               if (count < wsize)
+                       bytes = count;
+               BUG_ON(list_empty(list));
+               data = list_entry(list->next, struct nfs_write_data, pages);
+               list_move_tail(&data->pages, &dreq->rewrite_list);
+               data->inode = inode;
+               data->cred = ctx->cred;
+               data->args.fh = NFS_FH(inode);
+               data->args.context = ctx;
+               data->args.offset = pos;
+               data->args.pgbase = pgbase;
+               data->args.pages = &pages[curpage];
+               data->args.count = bytes;
+               data->res.fattr = &data->fattr;
+               data->res.count = bytes;
+               data->res.verf = &data->verf;
+               rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
+                               &nfs_write_direct_ops, data);
+               NFS_PROTO(inode)->write_setup(data, sync);
+               data->task.tk_priority = RPC_PRIORITY_NORMAL;
+               data->task.tk_cookie = (unsigned long) inode;
+               lock_kernel();
+               rpc_execute(&data->task);
+               unlock_kernel();
+               dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
+                               data->task.tk_pid,
+                               inode->i_sb->s_id,
+                               (long long)NFS_FILEID(inode),
+                               bytes,
+                               (unsigned long long)data->args.offset);
+               pos += bytes;
+               pgbase += bytes;
+               curpage += pgbase >> PAGE_SHIFT;
+               pgbase &= ~PAGE_MASK;
+               count -= bytes;
+       } while (count != 0);
+       BUG_ON(!list_empty(list));
+ }
+ static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, int nr_pages)
+ {
+       ssize_t result;
+       sigset_t oldset;
+       struct inode *inode = iocb->ki_filp->f_mapping->host;
+       struct rpc_clnt *clnt = NFS_CLIENT(inode);
+       struct nfs_direct_req *dreq;
+       size_t wsize = NFS_SERVER(inode)->wsize;
+       int sync = 0;
+       dreq = nfs_direct_write_alloc(count, wsize);
+       if (!dreq)
+               return -ENOMEM;
+       if (dreq->commit_data == NULL || count < wsize)
+               sync = FLUSH_STABLE;
+       dreq->user_addr = user_addr;
+       dreq->user_count = count;
+       dreq->pos = pos;
+       dreq->pages = pages;
+       dreq->npages = nr_pages;
+       dreq->inode = inode;
+       dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
+       if (!is_sync_kiocb(iocb))
+               dreq->iocb = iocb;
+       nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, count);
+       nfs_begin_data_update(inode);
+       rpc_clnt_sigmask(clnt, &oldset);
+       nfs_direct_write_schedule(dreq, sync);
+       result = nfs_direct_wait(dreq);
+       rpc_clnt_sigunmask(clnt, &oldset);
        return result;
  }
  
   * nfs_file_direct_read - file direct read operation for NFS files
   * @iocb: target I/O control block
   * @buf: user's buffer into which to read data
-  * count: number of bytes to read
-  * pos: byte offset in file where reading starts
+  * @count: number of bytes to read
+  * @pos: byte offset in file where reading starts
   *
   * We use this function for direct reads instead of calling
   * generic_file_aio_read() in order to avoid gfar's check to see if
   * the request starts before the end of the file.  For that check
   * to work, we must generate a GETATTR before each direct read, and
   * even then there is a window between the GETATTR and the subsequent
-  * READ where the file size could change.  So our preference is simply
+  * READ where the file size could change.  Our preference is simply
   * to do all reads the application wants, and the server will take
   * care of managing the end of file boundary.
-  * 
+  *
   * This function also eliminates unnecessarily updating the file's
   * atime locally, as the NFS server sets the file's atime, and this
   * client must read the updated atime from the server back into its
   * cache.
   */
- ssize_t
- nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
+ ssize_t nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
  {
        ssize_t retval = -EINVAL;
-       loff_t *ppos = &iocb->ki_pos;
+       int page_count;
+       struct page **pages;
        struct file *file = iocb->ki_filp;
-       struct nfs_open_context *ctx =
-                       (struct nfs_open_context *) file->private_data;
        struct address_space *mapping = file->f_mapping;
-       struct inode *inode = mapping->host;
-       struct iovec iov = {
-               .iov_base = buf,
-               .iov_len = count,
-       };
  
        dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
                file->f_dentry->d_parent->d_name.name,
                file->f_dentry->d_name.name,
                (unsigned long) count, (long long) pos);
  
        if (count < 0)
                goto out;
        retval = -EFAULT;
-       if (!access_ok(VERIFY_WRITE, iov.iov_base, iov.iov_len))
+       if (!access_ok(VERIFY_WRITE, buf, count))
                goto out;
        retval = 0;
        if (!count)
        if (retval)
                goto out;
  
-       retval = nfs_direct_read(inode, ctx, &iov, pos, 1);
+       retval = nfs_get_user_pages(READ, (unsigned long) buf,
+                                               count, &pages);
+       if (retval < 0)
+               goto out;
+       page_count = retval;
+       retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos,
+                                               pages, page_count);
        if (retval > 0)
-               *ppos = pos + retval;
+               iocb->ki_pos = pos + retval;
  
  out:
        return retval;
   * nfs_file_direct_write - file direct write operation for NFS files
   * @iocb: target I/O control block
   * @buf: user's buffer from which to write data
-  * count: number of bytes to write
-  * pos: byte offset in file where writing starts
+  * @count: number of bytes to write
+  * @pos: byte offset in file where writing starts
   *
   * We use this function for direct writes instead of calling
   * generic_file_aio_write() in order to avoid taking the inode
   * Note that O_APPEND is not supported for NFS direct writes, as there
   * is no atomic O_APPEND write facility in the NFS protocol.
   */
- ssize_t
- nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos)
+ ssize_t nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos)
  {
        ssize_t retval;
+       int page_count;
+       struct page **pages;
        struct file *file = iocb->ki_filp;
-       struct nfs_open_context *ctx =
-                       (struct nfs_open_context *) file->private_data;
        struct address_space *mapping = file->f_mapping;
-       struct inode *inode = mapping->host;
-       struct iovec iov = {
-               .iov_base = (char __user *)buf,
-       };
  
        dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
                file->f_dentry->d_parent->d_name.name,
                file->f_dentry->d_name.name,
                (unsigned long) count, (long long) pos);
  
-       retval = -EINVAL;
-       if (!is_sync_kiocb(iocb))
-               goto out;
        retval = generic_write_checks(file, &pos, &count, 0);
        if (retval)
                goto out;
        retval = 0;
        if (!count)
                goto out;
-       iov.iov_len = count,
  
        retval = -EFAULT;
-       if (!access_ok(VERIFY_READ, iov.iov_base, iov.iov_len))
+       if (!access_ok(VERIFY_READ, buf, count))
                goto out;
  
        retval = nfs_sync_mapping(mapping);
        if (retval)
                goto out;
  
-       retval = nfs_direct_write(inode, ctx, &iov, pos, 1);
+       retval = nfs_get_user_pages(WRITE, (unsigned long) buf,
+                                               count, &pages);
+       if (retval < 0)
+               goto out;
+       page_count = retval;
+       retval = nfs_direct_write(iocb, (unsigned long) buf, count,
+                                       pos, pages, page_count);
+       /*
+        * XXX: nfs_end_data_update() already ensures this file's
+        *      cached data is subsequently invalidated.  Do we really
+        *      need to call invalidate_inode_pages2() again here?
+        *
+        *      For aio writes, this invalidation will almost certainly
+        *      occur before the writes complete.  Kind of racey.
+        */
        if (mapping->nrpages)
                invalidate_inode_pages2(mapping);
        if (retval > 0)
                iocb->ki_pos = pos + retval;
  
@@@ -777,12 -890,15 +890,16 @@@ out
        return retval;
  }
  
+ /**
+  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
+  *
+  */
  int nfs_init_directcache(void)
  {
        nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
                                                sizeof(struct nfs_direct_req),
 -                                              0, SLAB_RECLAIM_ACCOUNT,
 +                                              0, (SLAB_RECLAIM_ACCOUNT|
 +                                                      SLAB_MEM_SPREAD),
                                                NULL, NULL);
        if (nfs_direct_cachep == NULL)
                return -ENOMEM;
        return 0;
  }
  
+ /**
+  * nfs_init_directcache - destroy the slab cache for nfs_direct_req structures
+  *
+  */
  void nfs_destroy_directcache(void)
  {
        if (kmem_cache_destroy(nfs_direct_cachep))
diff --combined fs/nfs/inode.c
@@@ -26,6 -26,7 +26,7 @@@
  #include <linux/unistd.h>
  #include <linux/sunrpc/clnt.h>
  #include <linux/sunrpc/stats.h>
+ #include <linux/sunrpc/metrics.h>
  #include <linux/nfs_fs.h>
  #include <linux/nfs_mount.h>
  #include <linux/nfs4_mount.h>
@@@ -42,6 -43,7 +43,7 @@@
  #include "nfs4_fs.h"
  #include "callback.h"
  #include "delegation.h"
+ #include "iostat.h"
  
  #define NFSDBG_FACILITY               NFSDBG_VFS
  #define NFS_PARANOIA 1
@@@ -65,6 -67,7 +67,7 @@@ static void nfs_clear_inode(struct inod
  static void nfs_umount_begin(struct super_block *);
  static int  nfs_statfs(struct super_block *, struct kstatfs *);
  static int  nfs_show_options(struct seq_file *, struct vfsmount *);
+ static int  nfs_show_stats(struct seq_file *, struct vfsmount *);
  static void nfs_zap_acl_cache(struct inode *);
  
  static struct rpc_program     nfs_program;
@@@ -78,6 -81,7 +81,7 @@@ static struct super_operations nfs_sop
        .clear_inode    = nfs_clear_inode,
        .umount_begin   = nfs_umount_begin,
        .show_options   = nfs_show_options,
+       .show_stats     = nfs_show_stats,
  };
  
  /*
@@@ -103,7 -107,7 +107,7 @@@ static struct rpc_version *        nfs_version
  static struct rpc_program     nfs_program = {
        .name                   = "nfs",
        .number                 = NFS_PROGRAM,
 -      .nrvers                 = sizeof(nfs_version) / sizeof(nfs_version[0]),
 +      .nrvers                 = ARRAY_SIZE(nfs_version),
        .version                = nfs_version,
        .stats                  = &nfs_rpcstat,
        .pipe_dir_name          = "/nfs",
@@@ -118,7 -122,7 +122,7 @@@ static struct rpc_version *        nfsacl_vers
  struct rpc_program            nfsacl_program = {
        .name =                 "nfsacl",
        .number =               NFS_ACL_PROGRAM,
 -      .nrvers =               sizeof(nfsacl_version) / sizeof(nfsacl_version[0]),
 +      .nrvers =               ARRAY_SIZE(nfsacl_version),
        .version =              nfsacl_version,
        .stats =                &nfsacl_rpcstat,
  };
@@@ -133,7 -137,7 +137,7 @@@ nfs_fattr_to_ino_t(struct nfs_fattr *fa
  static int
  nfs_write_inode(struct inode *inode, int sync)
  {
-       int flags = sync ? FLUSH_WAIT : 0;
+       int flags = sync ? FLUSH_SYNC : 0;
        int ret;
  
        ret = nfs_commit_inode(inode, flags);
@@@ -237,7 -241,6 +241,6 @@@ static struct inode 
  nfs_get_root(struct super_block *sb, struct nfs_fh *rootfh, struct nfs_fsinfo *fsinfo)
  {
        struct nfs_server       *server = NFS_SB(sb);
-       struct inode *rooti;
        int                     error;
  
        error = server->rpc_ops->getroot(server, rootfh, fsinfo);
                return ERR_PTR(error);
        }
  
-       rooti = nfs_fhget(sb, rootfh, fsinfo->fattr);
-       if (!rooti)
-               return ERR_PTR(-ENOMEM);
-       return rooti;
+       return nfs_fhget(sb, rootfh, fsinfo->fattr);
  }
  
  /*
@@@ -277,6 -277,10 +277,10 @@@ nfs_sb_init(struct super_block *sb, rpc
  
        sb->s_magic      = NFS_SUPER_MAGIC;
  
+       server->io_stats = nfs_alloc_iostats();
+       if (server->io_stats == NULL)
+               return -ENOMEM;
        root_inode = nfs_get_root(sb, &server->fh, &fsinfo);
        /* Did getting the root inode fail? */
        if (IS_ERR(root_inode)) {
        }
        sb->s_root->d_op = server->rpc_ops->dentry_ops;
  
+       /* mount time stamp, in seconds */
+       server->mount_time = jiffies;
        /* Get some general file system info */
        if (server->namelen == 0 &&
            server->rpc_ops->pathconf(server, &server->fh, &pathinfo) >= 0)
@@@ -396,6 -403,9 +403,9 @@@ nfs_create_client(struct nfs_server *se
  
        nfs_init_timeout_values(&timeparms, proto, data->timeo, data->retrans);
  
+       server->retrans_timeo = timeparms.to_initval;
+       server->retrans_count = timeparms.to_retries;
        /* create transport and client */
        xprt = xprt_create_proto(proto, &server->addr, &timeparms);
        if (IS_ERR(xprt)) {
@@@ -579,7 -589,7 +589,7 @@@ nfs_statfs(struct super_block *sb, stru
  
  }
  
- static int nfs_show_options(struct seq_file *m, struct vfsmount *mnt)
+ static void nfs_show_mount_options(struct seq_file *m, struct nfs_server *nfss, int showdefaults)
  {
        static struct proc_nfs_info {
                int flag;
        } nfs_info[] = {
                { NFS_MOUNT_SOFT, ",soft", ",hard" },
                { NFS_MOUNT_INTR, ",intr", "" },
-               { NFS_MOUNT_POSIX, ",posix", "" },
                { NFS_MOUNT_NOCTO, ",nocto", "" },
                { NFS_MOUNT_NOAC, ",noac", "" },
-               { NFS_MOUNT_NONLM, ",nolock", ",lock" },
+               { NFS_MOUNT_NONLM, ",nolock", "" },
                { NFS_MOUNT_NOACL, ",noacl", "" },
                { 0, NULL, NULL }
        };
        struct proc_nfs_info *nfs_infop;
        char buf[12];
        char *proto;
  
-       seq_printf(m, ",v%d", nfss->rpc_ops->version);
+       seq_printf(m, ",vers=%d", nfss->rpc_ops->version);
        seq_printf(m, ",rsize=%d", nfss->rsize);
        seq_printf(m, ",wsize=%d", nfss->wsize);
-       if (nfss->acregmin != 3*HZ)
+       if (nfss->acregmin != 3*HZ || showdefaults)
                seq_printf(m, ",acregmin=%d", nfss->acregmin/HZ);
-       if (nfss->acregmax != 60*HZ)
+       if (nfss->acregmax != 60*HZ || showdefaults)
                seq_printf(m, ",acregmax=%d", nfss->acregmax/HZ);
-       if (nfss->acdirmin != 30*HZ)
+       if (nfss->acdirmin != 30*HZ || showdefaults)
                seq_printf(m, ",acdirmin=%d", nfss->acdirmin/HZ);
-       if (nfss->acdirmax != 60*HZ)
+       if (nfss->acdirmax != 60*HZ || showdefaults)
                seq_printf(m, ",acdirmax=%d", nfss->acdirmax/HZ);
        for (nfs_infop = nfs_info; nfs_infop->flag; nfs_infop++) {
                if (nfss->flags & nfs_infop->flag)
                        proto = buf;
        }
        seq_printf(m, ",proto=%s", proto);
+       seq_printf(m, ",timeo=%lu", 10U * nfss->retrans_timeo / HZ);
+       seq_printf(m, ",retrans=%u", nfss->retrans_count);
+ }
+ static int nfs_show_options(struct seq_file *m, struct vfsmount *mnt)
+ {
+       struct nfs_server *nfss = NFS_SB(mnt->mnt_sb);
+       nfs_show_mount_options(m, nfss, 0);
        seq_puts(m, ",addr=");
        seq_escape(m, nfss->hostname, " \t\n\\");
+       return 0;
+ }
+ static int nfs_show_stats(struct seq_file *m, struct vfsmount *mnt)
+ {
+       int i, cpu;
+       struct nfs_server *nfss = NFS_SB(mnt->mnt_sb);
+       struct rpc_auth *auth = nfss->client->cl_auth;
+       struct nfs_iostats totals = { };
+       seq_printf(m, "statvers=%s", NFS_IOSTAT_VERS);
+       /*
+        * Display all mount option settings
+        */
+       seq_printf(m, "\n\topts:\t");
+       seq_puts(m, mnt->mnt_sb->s_flags & MS_RDONLY ? "ro" : "rw");
+       seq_puts(m, mnt->mnt_sb->s_flags & MS_SYNCHRONOUS ? ",sync" : "");
+       seq_puts(m, mnt->mnt_sb->s_flags & MS_NOATIME ? ",noatime" : "");
+       seq_puts(m, mnt->mnt_sb->s_flags & MS_NODIRATIME ? ",nodiratime" : "");
+       nfs_show_mount_options(m, nfss, 1);
+       seq_printf(m, "\n\tage:\t%lu", (jiffies - nfss->mount_time) / HZ);
+       seq_printf(m, "\n\tcaps:\t");
+       seq_printf(m, "caps=0x%x", nfss->caps);
+       seq_printf(m, ",wtmult=%d", nfss->wtmult);
+       seq_printf(m, ",dtsize=%d", nfss->dtsize);
+       seq_printf(m, ",bsize=%d", nfss->bsize);
+       seq_printf(m, ",namelen=%d", nfss->namelen);
+ #ifdef CONFIG_NFS_V4
+       if (nfss->rpc_ops->version == 4) {
+               seq_printf(m, "\n\tnfsv4:\t");
+               seq_printf(m, "bm0=0x%x", nfss->attr_bitmask[0]);
+               seq_printf(m, ",bm1=0x%x", nfss->attr_bitmask[1]);
+               seq_printf(m, ",acl=0x%x", nfss->acl_bitmask);
+       }
+ #endif
+       /*
+        * Display security flavor in effect for this mount
+        */
+       seq_printf(m, "\n\tsec:\tflavor=%d", auth->au_ops->au_flavor);
+       if (auth->au_flavor)
+               seq_printf(m, ",pseudoflavor=%d", auth->au_flavor);
+       /*
+        * Display superblock I/O counters
+        */
+       for (cpu = 0; cpu < NR_CPUS; cpu++) {
+               struct nfs_iostats *stats;
+               if (!cpu_possible(cpu))
+                       continue;
+               preempt_disable();
+               stats = per_cpu_ptr(nfss->io_stats, cpu);
+               for (i = 0; i < __NFSIOS_COUNTSMAX; i++)
+                       totals.events[i] += stats->events[i];
+               for (i = 0; i < __NFSIOS_BYTESMAX; i++)
+                       totals.bytes[i] += stats->bytes[i];
+               preempt_enable();
+       }
+       seq_printf(m, "\n\tevents:\t");
+       for (i = 0; i < __NFSIOS_COUNTSMAX; i++)
+               seq_printf(m, "%lu ", totals.events[i]);
+       seq_printf(m, "\n\tbytes:\t");
+       for (i = 0; i < __NFSIOS_BYTESMAX; i++)
+               seq_printf(m, "%Lu ", totals.bytes[i]);
+       seq_printf(m, "\n");
+       rpc_print_iostats(m, nfss->client);
        return 0;
  }
  
@@@ -660,6 -756,8 +756,8 @@@ static void nfs_zap_caches_locked(struc
        struct nfs_inode *nfsi = NFS_I(inode);
        int mode = inode->i_mode;
  
+       nfs_inc_stats(inode, NFSIOS_ATTRINVALIDATE);
        NFS_ATTRTIMEO(inode) = NFS_MINATTRTIMEO(inode);
        NFS_ATTRTIMEO_UPDATE(inode) = jiffies;
  
@@@ -751,7 -849,7 +849,7 @@@ nfs_fhget(struct super_block *sb, struc
                .fh     = fh,
                .fattr  = fattr
        };
-       struct inode *inode = NULL;
+       struct inode *inode = ERR_PTR(-ENOENT);
        unsigned long hash;
  
        if ((fattr->valid & NFS_ATTR_FATTR) == 0)
  
        hash = nfs_fattr_to_ino_t(fattr);
  
-       if (!(inode = iget5_locked(sb, hash, nfs_find_actor, nfs_init_locked, &desc)))
+       inode = iget5_locked(sb, hash, nfs_find_actor, nfs_init_locked, &desc);
+       if (inode == NULL) {
+               inode = ERR_PTR(-ENOMEM);
                goto out_no_inode;
+       }
  
        if (inode->i_state & I_NEW) {
                struct nfs_inode *nfsi = NFS_I(inode);
@@@ -834,7 -935,7 +935,7 @@@ out
        return inode;
  
  out_no_inode:
-       printk("nfs_fhget: iget failed\n");
+       dprintk("nfs_fhget: iget failed with error %ld\n", PTR_ERR(inode));
        goto out;
  }
  
@@@ -847,6 -948,8 +948,8 @@@ nfs_setattr(struct dentry *dentry, stru
        struct nfs_fattr fattr;
        int error;
  
+       nfs_inc_stats(inode, NFSIOS_VFSSETATTR);
        if (attr->ia_valid & ATTR_SIZE) {
                if (!S_ISREG(inode->i_mode) || attr->ia_size == i_size_read(inode))
                        attr->ia_valid &= ~ATTR_SIZE;
  
        lock_kernel();
        nfs_begin_data_update(inode);
-       /* Write all dirty data if we're changing file permissions or size */
-       if ((attr->ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID|ATTR_SIZE)) != 0) {
-               filemap_write_and_wait(inode->i_mapping);
-               nfs_wb_all(inode);
-       }
+       /* Write all dirty data */
+       filemap_write_and_wait(inode->i_mapping);
+       nfs_wb_all(inode);
        /*
         * Return any delegations if we're going to change ACLs
         */
@@@ -902,6 -1003,7 +1003,7 @@@ void nfs_setattr_update_inode(struct in
                spin_unlock(&inode->i_lock);
        }
        if ((attr->ia_valid & ATTR_SIZE) != 0) {
+               nfs_inc_stats(inode, NFSIOS_SETATTRTRUNC);
                inode->i_size = attr->ia_size;
                vmtruncate(inode, attr->ia_size);
        }
@@@ -949,7 -1051,7 +1051,7 @@@ int nfs_getattr(struct vfsmount *mnt, s
        int err;
  
        /* Flush out writes to the server in order to update c/mtime */
-       nfs_sync_inode(inode, 0, 0, FLUSH_WAIT|FLUSH_NOCOMMIT);
+       nfs_sync_inode_wait(inode, 0, 0, FLUSH_NOCOMMIT);
  
        /*
         * We may force a getattr if the user cares about atime.
        return err;
  }
  
- struct nfs_open_context *alloc_nfs_open_context(struct dentry *dentry, struct rpc_cred *cred)
+ static struct nfs_open_context *alloc_nfs_open_context(struct vfsmount *mnt, struct dentry *dentry, struct rpc_cred *cred)
  {
        struct nfs_open_context *ctx;
  
        if (ctx != NULL) {
                atomic_set(&ctx->count, 1);
                ctx->dentry = dget(dentry);
+               ctx->vfsmnt = mntget(mnt);
                ctx->cred = get_rpccred(cred);
                ctx->state = NULL;
                ctx->lockowner = current->files;
@@@ -1011,6 -1114,7 +1114,7 @@@ void put_nfs_open_context(struct nfs_op
                if (ctx->cred != NULL)
                        put_rpccred(ctx->cred);
                dput(ctx->dentry);
+               mntput(ctx->vfsmnt);
                kfree(ctx);
        }
  }
   * Ensure that mmap has a recent RPC credential for use when writing out
   * shared pages
   */
- void nfs_file_set_open_context(struct file *filp, struct nfs_open_context *ctx)
+ static void nfs_file_set_open_context(struct file *filp, struct nfs_open_context *ctx)
  {
        struct inode *inode = filp->f_dentry->d_inode;
        struct nfs_inode *nfsi = NFS_I(inode);
@@@ -1051,7 -1155,7 +1155,7 @@@ struct nfs_open_context *nfs_find_open_
        return ctx;
  }
  
- void nfs_file_clear_open_context(struct file *filp)
+ static void nfs_file_clear_open_context(struct file *filp)
  {
        struct inode *inode = filp->f_dentry->d_inode;
        struct nfs_open_context *ctx = (struct nfs_open_context *)filp->private_data;
@@@ -1076,7 -1180,7 +1180,7 @@@ int nfs_open(struct inode *inode, struc
        cred = rpcauth_lookupcred(NFS_CLIENT(inode)->cl_auth, 0);
        if (IS_ERR(cred))
                return PTR_ERR(cred);
-       ctx = alloc_nfs_open_context(filp->f_dentry, cred);
+       ctx = alloc_nfs_open_context(filp->f_vfsmnt, filp->f_dentry, cred);
        put_rpccred(cred);
        if (ctx == NULL)
                return -ENOMEM;
@@@ -1185,6 -1289,7 +1289,7 @@@ int nfs_attribute_timeout(struct inode 
   */
  int nfs_revalidate_inode(struct nfs_server *server, struct inode *inode)
  {
+       nfs_inc_stats(inode, NFSIOS_INODEREVALIDATE);
        if (!(NFS_I(inode)->cache_validity & (NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA))
                        && !nfs_attribute_timeout(inode))
                return NFS_STALE(inode) ? -ESTALE : 0;
@@@ -1201,6 -1306,7 +1306,7 @@@ void nfs_revalidate_mapping(struct inod
        struct nfs_inode *nfsi = NFS_I(inode);
  
        if (nfsi->cache_validity & NFS_INO_INVALID_DATA) {
+               nfs_inc_stats(inode, NFSIOS_DATAINVALIDATE);
                if (S_ISREG(inode->i_mode))
                        nfs_sync_mapping(mapping);
                invalidate_inode_pages2(mapping);
@@@ -1299,39 -1405,37 +1405,37 @@@ static int nfs_check_inode_attributes(s
        if ((fattr->valid & NFS_ATTR_FATTR) == 0)
                return 0;
  
+       /* Has the inode gone and changed behind our back? */
+       if (nfsi->fileid != fattr->fileid
+                       || (inode->i_mode & S_IFMT) != (fattr->mode & S_IFMT)) {
+               return -EIO;
+       }
        /* Are we in the process of updating data on the server? */
        data_unstable = nfs_caches_unstable(inode);
  
        /* Do atomic weak cache consistency updates */
        nfs_wcc_update_inode(inode, fattr);
  
-       if ((fattr->valid & NFS_ATTR_FATTR_V4) != 0 &&
-                       nfsi->change_attr != fattr->change_attr) {
+       if ((fattr->valid & NFS_ATTR_FATTR_V4) != 0) {
+               if (nfsi->change_attr == fattr->change_attr)
+                       goto out;
                nfsi->cache_validity |= NFS_INO_INVALID_ATTR;
                if (!data_unstable)
                        nfsi->cache_validity |= NFS_INO_REVAL_PAGECACHE;
        }
  
-       /* Has the inode gone and changed behind our back? */
-       if (nfsi->fileid != fattr->fileid
-                       || (inode->i_mode & S_IFMT) != (fattr->mode & S_IFMT)) {
-               return -EIO;
-       }
-       cur_size = i_size_read(inode);
-       new_isize = nfs_size_to_loff_t(fattr->size);
        /* Verify a few of the more important attributes */
        if (!timespec_equal(&inode->i_mtime, &fattr->mtime)) {
                nfsi->cache_validity |= NFS_INO_INVALID_ATTR;
                if (!data_unstable)
                        nfsi->cache_validity |= NFS_INO_REVAL_PAGECACHE;
        }
-       if (cur_size != new_isize) {
-               nfsi->cache_validity |= NFS_INO_INVALID_ATTR;
-               if (nfsi->npages == 0)
-                       nfsi->cache_validity |= NFS_INO_REVAL_PAGECACHE;
-       }
+       cur_size = i_size_read(inode);
+       new_isize = nfs_size_to_loff_t(fattr->size);
+       if (cur_size != new_isize && nfsi->npages == 0)
+               nfsi->cache_validity |= NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE;
  
        /* Have any file permissions changed? */
        if ((inode->i_mode & S_IALLUGO) != (fattr->mode & S_IALLUGO)
        if (inode->i_nlink != fattr->nlink)
                nfsi->cache_validity |= NFS_INO_INVALID_ATTR;
  
+ out:
        if (!timespec_equal(&inode->i_atime, &fattr->atime))
                nfsi->cache_validity |= NFS_INO_INVALID_ATIME;
  
@@@ -1481,15 -1586,6 +1586,6 @@@ static int nfs_update_inode(struct inod
                nfsi->cache_change_attribute = jiffies;
        }
  
-       if ((fattr->valid & NFS_ATTR_FATTR_V4)
-           && nfsi->change_attr != fattr->change_attr) {
-               dprintk("NFS: change_attr change on server for file %s/%ld\n",
-                      inode->i_sb->s_id, inode->i_ino);
-               nfsi->change_attr = fattr->change_attr;
-               invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA|NFS_INO_INVALID_ACCESS|NFS_INO_INVALID_ACL;
-               nfsi->cache_change_attribute = jiffies;
-       }
        /* If ctime has changed we should definitely clear access+acl caches */
        if (!timespec_equal(&inode->i_ctime, &fattr->ctime)) {
                invalid |= NFS_INO_INVALID_ACCESS|NFS_INO_INVALID_ACL;
                inode->i_blksize = fattr->du.nfs2.blocksize;
        }
  
+       if ((fattr->valid & NFS_ATTR_FATTR_V4)) {
+               if (nfsi->change_attr != fattr->change_attr) {
+                       dprintk("NFS: change_attr change on server for file %s/%ld\n",
+                                       inode->i_sb->s_id, inode->i_ino);
+                       nfsi->change_attr = fattr->change_attr;
+                       invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA|NFS_INO_INVALID_ACCESS|NFS_INO_INVALID_ACL;
+                       nfsi->cache_change_attribute = jiffies;
+               } else
+                       invalid &= ~(NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA);
+       }
        /* Update attrtimeo value if we're out of the unstable period */
        if (invalid & NFS_INO_INVALID_ATTR) {
+               nfs_inc_stats(inode, NFSIOS_ATTRINVALIDATE);
                nfsi->attrtimeo = NFS_MINATTRTIMEO(inode);
                nfsi->attrtimeo_timestamp = jiffies;
        } else if (time_after(jiffies, nfsi->attrtimeo_timestamp+nfsi->attrtimeo)) {
@@@ -1637,10 -1745,9 +1745,9 @@@ static struct super_block *nfs_get_sb(s
  #endif /* CONFIG_NFS_V3 */
  
        s = ERR_PTR(-ENOMEM);
-       server = kmalloc(sizeof(struct nfs_server), GFP_KERNEL);
+       server = kzalloc(sizeof(struct nfs_server), GFP_KERNEL);
        if (!server)
                goto out_err;
-       memset(server, 0, sizeof(struct nfs_server));
        /* Zero out the NFS state stuff */
        init_nfsv4_state(server);
        server->client = server->client_sys = server->client_acl = ERR_PTR(-EINVAL);
  
        s->s_flags = flags;
  
 -      error = nfs_fill_super(s, data, flags & MS_VERBOSE ? 1 : 0);
 +      error = nfs_fill_super(s, data, flags & MS_SILENT ? 1 : 0);
        if (error) {
                up_write(&s->s_umount);
                deactivate_super(s);
@@@ -1712,6 -1819,7 +1819,7 @@@ static void nfs_kill_super(struct super
  
        rpciod_down();          /* release rpciod */
  
+       nfs_free_iostats(server->io_stats);
        kfree(server->hostname);
        kfree(server);
  }
@@@ -1738,6 -1846,7 +1846,7 @@@ static struct super_operations nfs4_sop
        .clear_inode    = nfs4_clear_inode,
        .umount_begin   = nfs_umount_begin,
        .show_options   = nfs_show_options,
+       .show_stats     = nfs_show_stats,
  };
  
  /*
@@@ -1800,6 -1909,9 +1909,9 @@@ static int nfs4_fill_super(struct super
  
        nfs_init_timeout_values(&timeparms, data->proto, data->timeo, data->retrans);
  
+       server->retrans_timeo = timeparms.to_initval;
+       server->retrans_count = timeparms.to_retries;
        clp = nfs4_get_client(&server->addr.sin_addr);
        if (!clp) {
                dprintk("%s: failed to create NFS4 client.\n", __FUNCTION__);
@@@ -1941,10 -2053,9 +2053,9 @@@ static struct super_block *nfs4_get_sb(
                return ERR_PTR(-EINVAL);
        }
  
-       server = kmalloc(sizeof(struct nfs_server), GFP_KERNEL);
+       server = kzalloc(sizeof(struct nfs_server), GFP_KERNEL);
        if (!server)
                return ERR_PTR(-ENOMEM);
-       memset(server, 0, sizeof(struct nfs_server));
        /* Zero out the NFS state stuff */
        init_nfsv4_state(server);
        server->client = server->client_sys = server->client_acl = ERR_PTR(-EINVAL);
  
        s->s_flags = flags;
  
 -      error = nfs4_fill_super(s, data, flags & MS_VERBOSE ? 1 : 0);
 +      error = nfs4_fill_super(s, data, flags & MS_SILENT ? 1 : 0);
        if (error) {
                up_write(&s->s_umount);
                deactivate_super(s);
@@@ -2024,10 -2135,12 +2135,12 @@@ static void nfs4_kill_super(struct supe
  
        if (server->client != NULL && !IS_ERR(server->client))
                rpc_shutdown_client(server->client);
  
        destroy_nfsv4_state(server);
  
+       rpciod_down();
+       nfs_free_iostats(server->io_stats);
        kfree(server->hostname);
        kfree(server);
  }
@@@ -2163,8 -2276,7 +2276,8 @@@ static int nfs_init_inodecache(void
  {
        nfs_inode_cachep = kmem_cache_create("nfs_inode_cache",
                                             sizeof(struct nfs_inode),
 -                                           0, SLAB_RECLAIM_ACCOUNT,
 +                                           0, (SLAB_RECLAIM_ACCOUNT|
 +                                              SLAB_MEM_SPREAD),
                                             init_once, NULL);
        if (nfs_inode_cachep == NULL)
                return -ENOMEM;
diff --combined fs/nfs/mount_clnt.c
@@@ -49,9 -49,12 +49,12 @@@ nfsroot_mount(struct sockaddr_in *addr
        struct mnt_fhstatus     result = {
                .fh             = fh
        };
+       struct rpc_message msg  = {
+               .rpc_argp       = path,
+               .rpc_resp       = &result,
+       };
        char                    hostname[32];
        int                     status;
-       int                     call;
  
        dprintk("NFS:      nfs_mount(%08x:%s)\n",
                        (unsigned)ntohl(addr->sin_addr.s_addr), path);
        if (IS_ERR(mnt_clnt))
                return PTR_ERR(mnt_clnt);
  
-       call = (version == NFS_MNT3_VERSION) ? MOUNTPROC3_MNT : MNTPROC_MNT;
-       status = rpc_call(mnt_clnt, call, path, &result, 0);
+       if (version == NFS_MNT3_VERSION)
+               msg.rpc_proc = &mnt_clnt->cl_procinfo[MOUNTPROC3_MNT];
+       else
+               msg.rpc_proc = &mnt_clnt->cl_procinfo[MNTPROC_MNT];
+       status = rpc_call_sync(mnt_clnt, &msg, 0);
        return status < 0? status : (result.status? -EACCES : 0);
  }
  
@@@ -137,6 -144,8 +144,8 @@@ static struct rpc_procinfo mnt_procedur
          .p_encode             = (kxdrproc_t) xdr_encode_dirpath,      
          .p_decode             = (kxdrproc_t) xdr_decode_fhstatus,
          .p_bufsiz             = MNT_dirpath_sz << 2,
+         .p_statidx            = MNTPROC_MNT,
+         .p_name               = "MOUNT",
        },
  };
  
@@@ -146,6 -155,8 +155,8 @@@ static struct rpc_procinfo mnt3_procedu
          .p_encode             = (kxdrproc_t) xdr_encode_dirpath,
          .p_decode             = (kxdrproc_t) xdr_decode_fhstatus3,
          .p_bufsiz             = MNT_dirpath_sz << 2,
+         .p_statidx            = MOUNTPROC3_MNT,
+         .p_name               = "MOUNT",
        },
  };
  
@@@ -174,7 -185,7 +185,7 @@@ static struct rpc_stat             mnt_stats
  static struct rpc_program     mnt_program = {
        .name           = "mount",
        .number         = NFS_MNT_PROGRAM,
 -      .nrvers         = sizeof(mnt_version)/sizeof(mnt_version[0]),
 +      .nrvers         = ARRAY_SIZE(mnt_version),
        .version        = mnt_version,
        .stats          = &mnt_stats,
  };
diff --combined fs/nfs/nfs2xdr.c
@@@ -682,7 -682,9 +682,9 @@@ nfs_stat_to_errno(int stat
        .p_encode   =  (kxdrproc_t) nfs_xdr_##argtype,                  \
        .p_decode   =  (kxdrproc_t) nfs_xdr_##restype,                  \
        .p_bufsiz   =  MAX(NFS_##argtype##_sz,NFS_##restype##_sz) << 2, \
-       .p_timer    =  timer                                            \
+       .p_timer    =  timer,                                           \
+       .p_statidx  =  NFSPROC_##proc,                                  \
+       .p_name     =  #proc,                                           \
        }
  struct rpc_procinfo   nfs_procedures[] = {
      PROC(GETATTR,     fhandle,        attrstat, 1),
  
  struct rpc_version            nfs_version2 = {
        .number                 = 2,
 -      .nrprocs                = sizeof(nfs_procedures)/sizeof(nfs_procedures[0]),
 +      .nrprocs                = ARRAY_SIZE(nfs_procedures),
        .procs                  = nfs_procedures
  };
diff --combined fs/nfs/nfs3xdr.c
@@@ -1109,7 -1109,9 +1109,9 @@@ nfs3_xdr_setaclres(struct rpc_rqst *req
        .p_encode    = (kxdrproc_t) nfs3_xdr_##argtype,                 \
        .p_decode    = (kxdrproc_t) nfs3_xdr_##restype,                 \
        .p_bufsiz    = MAX(NFS3_##argtype##_sz,NFS3_##restype##_sz) << 2,       \
-       .p_timer     = timer                                            \
+       .p_timer     = timer,                                           \
+       .p_statidx   = NFS3PROC_##proc,                                 \
+       .p_name      = #proc,                                           \
        }
  
  struct rpc_procinfo   nfs3_procedures[] = {
  
  struct rpc_version            nfs_version3 = {
        .number                 = 3,
 -      .nrprocs                = sizeof(nfs3_procedures)/sizeof(nfs3_procedures[0]),
 +      .nrprocs                = ARRAY_SIZE(nfs3_procedures),
        .procs                  = nfs3_procedures
  };
  
@@@ -1150,6 -1152,7 +1152,7 @@@ static struct rpc_procinfo      nfs3_acl_pro
                .p_decode = (kxdrproc_t) nfs3_xdr_getaclres,
                .p_bufsiz = MAX(ACL3_getaclargs_sz, ACL3_getaclres_sz) << 2,
                .p_timer = 1,
+               .p_name = "GETACL",
        },
        [ACLPROC3_SETACL] = {
                .p_proc = ACLPROC3_SETACL,
                .p_decode = (kxdrproc_t) nfs3_xdr_setaclres,
                .p_bufsiz = MAX(ACL3_setaclargs_sz, ACL3_setaclres_sz) << 2,
                .p_timer = 0,
+               .p_name = "SETACL",
        },
  };
  
diff --combined fs/nfs/nfs4xdr.c
@@@ -4344,6 -4344,8 +4344,8 @@@ nfs_stat_to_errno(int stat
        .p_encode = (kxdrproc_t) nfs4_xdr_##argtype,            \
        .p_decode = (kxdrproc_t) nfs4_xdr_##restype,            \
        .p_bufsiz = MAX(NFS4_##argtype##_sz,NFS4_##restype##_sz) << 2,  \
+       .p_statidx = NFSPROC4_CLNT_##proc,                      \
+       .p_name   = #proc,                                      \
      }
  
  struct rpc_procinfo   nfs4_procedures[] = {
  
  struct rpc_version            nfs_version4 = {
        .number                 = 4,
 -      .nrprocs                = sizeof(nfs4_procedures)/sizeof(nfs4_procedures[0]),
 +      .nrprocs                = ARRAY_SIZE(nfs4_procedures),
        .procs                  = nfs4_procedures
  };
  
diff --combined fs/nfsd/nfs4callback.c
@@@ -326,6 -326,8 +326,8 @@@ out
          .p_encode = (kxdrproc_t) nfs4_xdr_##argtype,                    \
          .p_decode = (kxdrproc_t) nfs4_xdr_##restype,                    \
          .p_bufsiz = MAX(NFS4_##argtype##_sz,NFS4_##restype##_sz) << 2,  \
+         .p_statidx = NFSPROC4_CB_##call,                              \
+       .p_name   = #proc,                                              \
  }
  
  static struct rpc_procinfo     nfs4_cb_procedures[] = {
  
  static struct rpc_version       nfs_cb_version4 = {
          .number                 = 1,
 -        .nrprocs                = sizeof(nfs4_cb_procedures)/sizeof(nfs4_cb_procedures[0]),
 +        .nrprocs                = ARRAY_SIZE(nfs4_cb_procedures),
          .procs                  = nfs4_cb_procedures
  };
  
@@@ -411,7 -413,7 +413,7 @@@ nfsd4_probe_callback(struct nfs4_clien
        /* Initialize rpc_program */
        program->name = "nfs4_cb";
        program->number = cb->cb_prog;
 -      program->nrvers = sizeof(nfs_cb_version)/sizeof(nfs_cb_version[0]);
 +      program->nrvers = ARRAY_SIZE(nfs_cb_version);
        program->version = nfs_cb_version;
        program->stats = stat;
  
diff --combined include/linux/fs.h
@@@ -65,11 -65,6 +65,11 @@@ extern int dir_notify_enable
  #define FMODE_PREAD   8
  #define FMODE_PWRITE  FMODE_PREAD     /* These go hand in hand */
  
 +/* File is being opened for execution. Primary users of this flag are
 +   distributed filesystems that can use it to achieve correct ETXTBUSY
 +   behavior for cross-node execution/opening_for_writing of files */
 +#define FMODE_EXEC    16
 +
  #define RW_MASK               1
  #define RWA_MASK      2
  #define READ 0
  #define MS_BIND               4096
  #define MS_MOVE               8192
  #define MS_REC                16384
 -#define MS_VERBOSE    32768
 +#define MS_VERBOSE    32768   /* War is peace. Verbosity is silence.
 +                                 MS_VERBOSE is deprecated. */
 +#define MS_SILENT     32768
  #define MS_POSIXACL   (1<<16) /* VFS does not apply the umask */
  #define MS_UNBINDABLE (1<<17) /* change to unbindable */
  #define MS_PRIVATE    (1<<18) /* change to private */
@@@ -355,7 -348,7 +355,7 @@@ struct address_space_operations 
        /* Write back some dirty pages from this mapping. */
        int (*writepages)(struct address_space *, struct writeback_control *);
  
 -      /* Set a page dirty */
 +      /* Set a page dirty.  Return true if this dirtied it */
        int (*set_page_dirty)(struct page *page);
  
        int (*readpages)(struct file *filp, struct address_space *mapping,
@@@ -678,7 -671,6 +678,6 @@@ extern spinlock_t files_lock
  #define FL_POSIX      1
  #define FL_FLOCK      2
  #define FL_ACCESS     8       /* not trying to lock, just looking */
- #define FL_LOCKD      16      /* lock held by rpc.lockd */
  #define FL_LEASE      32      /* lease held on this file */
  #define FL_SLEEP      128     /* A blocking lock */
  
@@@ -742,8 -734,6 +741,6 @@@ struct file_lock 
  #define OFFT_OFFSET_MAX       INT_LIMIT(off_t)
  #endif
  
- extern struct list_head file_lock_list;
  #include <linux/fcntl.h>
  
  extern int fcntl_getlk(struct file *, struct flock __user *);
@@@ -765,10 -755,9 +762,9 @@@ extern void locks_init_lock(struct file
  extern void locks_copy_lock(struct file_lock *, struct file_lock *);
  extern void locks_remove_posix(struct file *, fl_owner_t);
  extern void locks_remove_flock(struct file *);
- extern struct file_lock *posix_test_lock(struct file *, struct file_lock *);
+ extern int posix_test_lock(struct file *, struct file_lock *, struct file_lock *);
  extern int posix_lock_file(struct file *, struct file_lock *);
  extern int posix_lock_file_wait(struct file *, struct file_lock *);
- extern void posix_block_lock(struct file_lock *, struct file_lock *);
  extern int posix_unblock_lock(struct file *, struct file_lock *);
  extern int posix_locks_deadlock(struct file_lock *, struct file_lock *);
  extern int flock_lock_file_wait(struct file *filp, struct file_lock *fl);
@@@ -1097,6 -1086,7 +1093,7 @@@ struct super_operations 
        void (*umount_begin) (struct super_block *);
  
        int (*show_options)(struct seq_file *, struct vfsmount *);
+       int (*show_stats)(struct seq_file *, struct vfsmount *);
  
        ssize_t (*quota_read)(struct super_block *, int, char *, size_t, loff_t);
        ssize_t (*quota_write)(struct super_block *, int, const char *, size_t, loff_t);
@@@ -1478,12 -1468,6 +1475,12 @@@ extern int filemap_fdatawait(struct add
  extern int filemap_write_and_wait(struct address_space *mapping);
  extern int filemap_write_and_wait_range(struct address_space *mapping,
                                        loff_t lstart, loff_t lend);
 +extern int wait_on_page_writeback_range(struct address_space *mapping,
 +                              pgoff_t start, pgoff_t end);
 +extern int __filemap_fdatawrite_range(struct address_space *mapping,
 +                              loff_t start, loff_t end, int sync_mode);
 +
 +extern long do_fsync(struct file *file, int datasync);
  extern void sync_supers(void);
  extern void sync_filesystems(int wait);
  extern void emergency_sync(void);
@@@ -1563,6 -1547,7 +1560,6 @@@ extern void destroy_inode(struct inode 
  extern struct inode *new_inode(struct super_block *);
  extern int remove_suid(struct dentry *);
  extern void remove_dquot_ref(struct super_block *, int, struct list_head *);
 -extern struct mutex iprune_mutex;
  
  extern void __insert_inode_hash(struct inode *, unsigned long hashval);
  extern void remove_inode_hash(struct inode *);
diff --combined net/sunrpc/rpc_pipe.c
@@@ -91,7 -91,8 +91,8 @@@ rpc_queue_upcall(struct inode *inode, s
                res = 0;
        } else if (rpci->flags & RPC_PIPE_WAIT_FOR_OPEN) {
                if (list_empty(&rpci->pipe))
-                       schedule_delayed_work(&rpci->queue_timeout,
+                       queue_delayed_work(rpciod_workqueue,
+                                       &rpci->queue_timeout,
                                        RPC_UPCALL_TIMEOUT);
                list_add_tail(&msg->list, &rpci->pipe);
                rpci->pipelen += msg->len;
@@@ -132,7 -133,7 +133,7 @@@ rpc_close_pipes(struct inode *inode
                if (ops->release_pipe)
                        ops->release_pipe(inode);
                cancel_delayed_work(&rpci->queue_timeout);
-               flush_scheduled_work();
+               flush_workqueue(rpciod_workqueue);
        }
        rpc_inode_setowner(inode, NULL);
        mutex_unlock(&inode->i_mutex);
@@@ -434,14 -435,17 +435,17 @@@ static struct rpc_filelist authfiles[] 
        },
  };
  
- static int
- rpc_get_mount(void)
+ struct vfsmount *rpc_get_mount(void)
  {
-       return simple_pin_fs("rpc_pipefs", &rpc_mount, &rpc_mount_count);
+       int err;
+       err = simple_pin_fs("rpc_pipefs", &rpc_mount, &rpc_mount_count);
+       if (err != 0)
+               return ERR_PTR(err);
+       return rpc_mount;
  }
  
- static void
- rpc_put_mount(void)
+ void rpc_put_mount(void)
  {
        simple_release_fs(&rpc_mount, &rpc_mount_count);
  }
@@@ -451,12 -455,13 +455,13 @@@ rpc_lookup_parent(char *path, struct na
  {
        if (path[0] == '\0')
                return -ENOENT;
-       if (rpc_get_mount()) {
+       nd->mnt = rpc_get_mount();
+       if (IS_ERR(nd->mnt)) {
                printk(KERN_WARNING "%s: %s failed to mount "
                               "pseudofilesystem \n", __FILE__, __FUNCTION__);
-               return -ENODEV;
+               return PTR_ERR(nd->mnt);
        }
-       nd->mnt = mntget(rpc_mount);
+       mntget(nd->mnt);
        nd->dentry = dget(rpc_mount->mnt_root);
        nd->last_type = LAST_ROOT;
        nd->flags = LOOKUP_PARENT;
@@@ -593,7 -598,6 +598,6 @@@ __rpc_mkdir(struct inode *dir, struct d
        d_instantiate(dentry, inode);
        dir->i_nlink++;
        inode_dir_notify(dir, DN_CREATE);
-       rpc_get_mount();
        return 0;
  out_err:
        printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %s\n",
@@@ -614,7 -618,6 +618,6 @@@ __rpc_rmdir(struct inode *dir, struct d
        if (!error) {
                inode_dir_notify(dir, DN_DELETE);
                d_drop(dentry);
-               rpc_put_mount();
        }
        return 0;
  }
@@@ -668,7 -671,7 +671,7 @@@ rpc_mkdir(char *path, struct rpc_clnt *
  out:
        mutex_unlock(&dir->i_mutex);
        rpc_release_path(&nd);
-       return dentry;
+       return dget(dentry);
  err_depopulate:
        rpc_depopulate(dentry);
        __rpc_rmdir(dir, dentry);
@@@ -732,7 -735,7 +735,7 @@@ rpc_mkpipe(char *path, void *private, s
  out:
        mutex_unlock(&dir->i_mutex);
        rpc_release_path(&nd);
-       return dentry;
+       return dget(dentry);
  err_dput:
        dput(dentry);
        dentry = ERR_PTR(-ENOMEM);
@@@ -849,10 -852,9 +852,10 @@@ init_once(void * foo, kmem_cache_t * ca
  int register_rpc_pipefs(void)
  {
        rpc_inode_cachep = kmem_cache_create("rpc_inode_cache",
 -                                             sizeof(struct rpc_inode),
 -                                             0, SLAB_HWCACHE_ALIGN|SLAB_RECLAIM_ACCOUNT,
 -                                             init_once, NULL);
 +                              sizeof(struct rpc_inode),
 +                              0, (SLAB_HWCACHE_ALIGN|SLAB_RECLAIM_ACCOUNT|
 +                                              SLAB_MEM_SPREAD),
 +                              init_once, NULL);
        if (!rpc_inode_cachep)
                return -ENOMEM;
        register_filesystem(&rpc_pipe_fs_type);