Merge branch 'devel-for-2.6.31' into for-2.6.31
[linux-2.6.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/xprtsock.h>
36 #include <linux/file.h>
37 #ifdef CONFIG_NFS_V4_1
38 #include <linux/sunrpc/bc_xprt.h>
39 #endif
40
41 #include <net/sock.h>
42 #include <net/checksum.h>
43 #include <net/udp.h>
44 #include <net/tcp.h>
45
46 /*
47  * xprtsock tunables
48  */
49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
51
52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
54
55 #define XS_TCP_LINGER_TO        (15U * HZ)
56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
57
58 /*
59  * We can register our own files under /proc/sys/sunrpc by
60  * calling register_sysctl_table() again.  The files in that
61  * directory become the union of all files registered there.
62  *
63  * We simply need to make sure that we don't collide with
64  * someone else's file names!
65  */
66
67 #ifdef RPC_DEBUG
68
69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
73
74 static struct ctl_table_header *sunrpc_table_header;
75
76 /*
77  * FIXME: changing the UDP slot table size should also resize the UDP
78  *        socket buffers for existing UDP transports
79  */
80 static ctl_table xs_tunables_table[] = {
81         {
82                 .ctl_name       = CTL_SLOTTABLE_UDP,
83                 .procname       = "udp_slot_table_entries",
84                 .data           = &xprt_udp_slot_table_entries,
85                 .maxlen         = sizeof(unsigned int),
86                 .mode           = 0644,
87                 .proc_handler   = &proc_dointvec_minmax,
88                 .strategy       = &sysctl_intvec,
89                 .extra1         = &min_slot_table_size,
90                 .extra2         = &max_slot_table_size
91         },
92         {
93                 .ctl_name       = CTL_SLOTTABLE_TCP,
94                 .procname       = "tcp_slot_table_entries",
95                 .data           = &xprt_tcp_slot_table_entries,
96                 .maxlen         = sizeof(unsigned int),
97                 .mode           = 0644,
98                 .proc_handler   = &proc_dointvec_minmax,
99                 .strategy       = &sysctl_intvec,
100                 .extra1         = &min_slot_table_size,
101                 .extra2         = &max_slot_table_size
102         },
103         {
104                 .ctl_name       = CTL_MIN_RESVPORT,
105                 .procname       = "min_resvport",
106                 .data           = &xprt_min_resvport,
107                 .maxlen         = sizeof(unsigned int),
108                 .mode           = 0644,
109                 .proc_handler   = &proc_dointvec_minmax,
110                 .strategy       = &sysctl_intvec,
111                 .extra1         = &xprt_min_resvport_limit,
112                 .extra2         = &xprt_max_resvport_limit
113         },
114         {
115                 .ctl_name       = CTL_MAX_RESVPORT,
116                 .procname       = "max_resvport",
117                 .data           = &xprt_max_resvport,
118                 .maxlen         = sizeof(unsigned int),
119                 .mode           = 0644,
120                 .proc_handler   = &proc_dointvec_minmax,
121                 .strategy       = &sysctl_intvec,
122                 .extra1         = &xprt_min_resvport_limit,
123                 .extra2         = &xprt_max_resvport_limit
124         },
125         {
126                 .procname       = "tcp_fin_timeout",
127                 .data           = &xs_tcp_fin_timeout,
128                 .maxlen         = sizeof(xs_tcp_fin_timeout),
129                 .mode           = 0644,
130                 .proc_handler   = &proc_dointvec_jiffies,
131                 .strategy       = sysctl_jiffies
132         },
133         {
134                 .ctl_name = 0,
135         },
136 };
137
138 static ctl_table sunrpc_table[] = {
139         {
140                 .ctl_name       = CTL_SUNRPC,
141                 .procname       = "sunrpc",
142                 .mode           = 0555,
143                 .child          = xs_tunables_table
144         },
145         {
146                 .ctl_name = 0,
147         },
148 };
149
150 #endif
151
152 /*
153  * Time out for an RPC UDP socket connect.  UDP socket connects are
154  * synchronous, but we set a timeout anyway in case of resource
155  * exhaustion on the local host.
156  */
157 #define XS_UDP_CONN_TO          (5U * HZ)
158
159 /*
160  * Wait duration for an RPC TCP connection to be established.  Solaris
161  * NFS over TCP uses 60 seconds, for example, which is in line with how
162  * long a server takes to reboot.
163  */
164 #define XS_TCP_CONN_TO          (60U * HZ)
165
166 /*
167  * Wait duration for a reply from the RPC portmapper.
168  */
169 #define XS_BIND_TO              (60U * HZ)
170
171 /*
172  * Delay if a UDP socket connect error occurs.  This is most likely some
173  * kind of resource problem on the local host.
174  */
175 #define XS_UDP_REEST_TO         (2U * HZ)
176
177 /*
178  * The reestablish timeout allows clients to delay for a bit before attempting
179  * to reconnect to a server that just dropped our connection.
180  *
181  * We implement an exponential backoff when trying to reestablish a TCP
182  * transport connection with the server.  Some servers like to drop a TCP
183  * connection when they are overworked, so we start with a short timeout and
184  * increase over time if the server is down or not responding.
185  */
186 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
187 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
188
189 /*
190  * TCP idle timeout; client drops the transport socket if it is idle
191  * for this long.  Note that we also timeout UDP sockets to prevent
192  * holding port numbers when there is no RPC traffic.
193  */
194 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
195
196 #ifdef RPC_DEBUG
197 # undef  RPC_DEBUG_DATA
198 # define RPCDBG_FACILITY        RPCDBG_TRANS
199 #endif
200
201 #ifdef RPC_DEBUG_DATA
202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
203 {
204         u8 *buf = (u8 *) packet;
205         int j;
206
207         dprintk("RPC:       %s\n", msg);
208         for (j = 0; j < count && j < 128; j += 4) {
209                 if (!(j & 31)) {
210                         if (j)
211                                 dprintk("\n");
212                         dprintk("0x%04x ", j);
213                 }
214                 dprintk("%02x%02x%02x%02x ",
215                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
216         }
217         dprintk("\n");
218 }
219 #else
220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
221 {
222         /* NOP */
223 }
224 #endif
225
226 struct sock_xprt {
227         struct rpc_xprt         xprt;
228
229         /*
230          * Network layer
231          */
232         struct socket *         sock;
233         struct sock *           inet;
234
235         /*
236          * State of TCP reply receive
237          */
238         __be32                  tcp_fraghdr,
239                                 tcp_xid;
240
241         u32                     tcp_offset,
242                                 tcp_reclen;
243
244         unsigned long           tcp_copied,
245                                 tcp_flags;
246
247         /*
248          * Connection of transports
249          */
250         struct delayed_work     connect_worker;
251         struct sockaddr_storage addr;
252         unsigned short          port;
253
254         /*
255          * UDP socket buffer size parameters
256          */
257         size_t                  rcvsize,
258                                 sndsize;
259
260         /*
261          * Saved socket callback addresses
262          */
263         void                    (*old_data_ready)(struct sock *, int);
264         void                    (*old_state_change)(struct sock *);
265         void                    (*old_write_space)(struct sock *);
266         void                    (*old_error_report)(struct sock *);
267 };
268
269 /*
270  * TCP receive state flags
271  */
272 #define TCP_RCV_LAST_FRAG       (1UL << 0)
273 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
274 #define TCP_RCV_COPY_XID        (1UL << 2)
275 #define TCP_RCV_COPY_DATA       (1UL << 3)
276 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
277 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
278
279 /*
280  * TCP RPC flags
281  */
282 #define TCP_RPC_REPLY           (1UL << 6)
283
284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
285 {
286         return (struct sockaddr *) &xprt->addr;
287 }
288
289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
290 {
291         return (struct sockaddr_in *) &xprt->addr;
292 }
293
294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
295 {
296         return (struct sockaddr_in6 *) &xprt->addr;
297 }
298
299 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
300                                           const char *protocol,
301                                           const char *netid)
302 {
303         struct sockaddr_in *addr = xs_addr_in(xprt);
304         char *buf;
305
306         buf = kzalloc(20, GFP_KERNEL);
307         if (buf) {
308                 snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
309         }
310         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
311
312         buf = kzalloc(8, GFP_KERNEL);
313         if (buf) {
314                 snprintf(buf, 8, "%u",
315                                 ntohs(addr->sin_port));
316         }
317         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
318
319         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
320
321         buf = kzalloc(48, GFP_KERNEL);
322         if (buf) {
323                 snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
324                         &addr->sin_addr.s_addr,
325                         ntohs(addr->sin_port),
326                         protocol);
327         }
328         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
329
330         buf = kzalloc(10, GFP_KERNEL);
331         if (buf) {
332                 snprintf(buf, 10, "%02x%02x%02x%02x",
333                                 NIPQUAD(addr->sin_addr.s_addr));
334         }
335         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
336
337         buf = kzalloc(8, GFP_KERNEL);
338         if (buf) {
339                 snprintf(buf, 8, "%4hx",
340                                 ntohs(addr->sin_port));
341         }
342         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
343
344         buf = kzalloc(30, GFP_KERNEL);
345         if (buf) {
346                 snprintf(buf, 30, "%pI4.%u.%u",
347                                 &addr->sin_addr.s_addr,
348                                 ntohs(addr->sin_port) >> 8,
349                                 ntohs(addr->sin_port) & 0xff);
350         }
351         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
352
353         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
354 }
355
356 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
357                                           const char *protocol,
358                                           const char *netid)
359 {
360         struct sockaddr_in6 *addr = xs_addr_in6(xprt);
361         char *buf;
362
363         buf = kzalloc(40, GFP_KERNEL);
364         if (buf) {
365                 snprintf(buf, 40, "%pI6",&addr->sin6_addr);
366         }
367         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
368
369         buf = kzalloc(8, GFP_KERNEL);
370         if (buf) {
371                 snprintf(buf, 8, "%u",
372                                 ntohs(addr->sin6_port));
373         }
374         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
375
376         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
377
378         buf = kzalloc(64, GFP_KERNEL);
379         if (buf) {
380                 snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
381                                 &addr->sin6_addr,
382                                 ntohs(addr->sin6_port),
383                                 protocol);
384         }
385         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
386
387         buf = kzalloc(36, GFP_KERNEL);
388         if (buf)
389                 snprintf(buf, 36, "%pi6", &addr->sin6_addr);
390
391         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
392
393         buf = kzalloc(8, GFP_KERNEL);
394         if (buf) {
395                 snprintf(buf, 8, "%4hx",
396                                 ntohs(addr->sin6_port));
397         }
398         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
399
400         buf = kzalloc(50, GFP_KERNEL);
401         if (buf) {
402                 snprintf(buf, 50, "%pI6.%u.%u",
403                          &addr->sin6_addr,
404                          ntohs(addr->sin6_port) >> 8,
405                          ntohs(addr->sin6_port) & 0xff);
406         }
407         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
408
409         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
410 }
411
412 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
413 {
414         unsigned int i;
415
416         for (i = 0; i < RPC_DISPLAY_MAX; i++)
417                 switch (i) {
418                 case RPC_DISPLAY_PROTO:
419                 case RPC_DISPLAY_NETID:
420                         continue;
421                 default:
422                         kfree(xprt->address_strings[i]);
423                 }
424 }
425
426 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
427
428 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
429 {
430         struct msghdr msg = {
431                 .msg_name       = addr,
432                 .msg_namelen    = addrlen,
433                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
434         };
435         struct kvec iov = {
436                 .iov_base       = vec->iov_base + base,
437                 .iov_len        = vec->iov_len - base,
438         };
439
440         if (iov.iov_len != 0)
441                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
442         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
443 }
444
445 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
446 {
447         struct page **ppage;
448         unsigned int remainder;
449         int err, sent = 0;
450
451         remainder = xdr->page_len - base;
452         base += xdr->page_base;
453         ppage = xdr->pages + (base >> PAGE_SHIFT);
454         base &= ~PAGE_MASK;
455         for(;;) {
456                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
457                 int flags = XS_SENDMSG_FLAGS;
458
459                 remainder -= len;
460                 if (remainder != 0 || more)
461                         flags |= MSG_MORE;
462                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
463                 if (remainder == 0 || err != len)
464                         break;
465                 sent += err;
466                 ppage++;
467                 base = 0;
468         }
469         if (sent == 0)
470                 return err;
471         if (err > 0)
472                 sent += err;
473         return sent;
474 }
475
476 /**
477  * xs_sendpages - write pages directly to a socket
478  * @sock: socket to send on
479  * @addr: UDP only -- address of destination
480  * @addrlen: UDP only -- length of destination address
481  * @xdr: buffer containing this request
482  * @base: starting position in the buffer
483  *
484  */
485 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
486 {
487         unsigned int remainder = xdr->len - base;
488         int err, sent = 0;
489
490         if (unlikely(!sock))
491                 return -ENOTSOCK;
492
493         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
494         if (base != 0) {
495                 addr = NULL;
496                 addrlen = 0;
497         }
498
499         if (base < xdr->head[0].iov_len || addr != NULL) {
500                 unsigned int len = xdr->head[0].iov_len - base;
501                 remainder -= len;
502                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
503                 if (remainder == 0 || err != len)
504                         goto out;
505                 sent += err;
506                 base = 0;
507         } else
508                 base -= xdr->head[0].iov_len;
509
510         if (base < xdr->page_len) {
511                 unsigned int len = xdr->page_len - base;
512                 remainder -= len;
513                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
514                 if (remainder == 0 || err != len)
515                         goto out;
516                 sent += err;
517                 base = 0;
518         } else
519                 base -= xdr->page_len;
520
521         if (base >= xdr->tail[0].iov_len)
522                 return sent;
523         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
524 out:
525         if (sent == 0)
526                 return err;
527         if (err > 0)
528                 sent += err;
529         return sent;
530 }
531
532 static void xs_nospace_callback(struct rpc_task *task)
533 {
534         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
535
536         transport->inet->sk_write_pending--;
537         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
538 }
539
540 /**
541  * xs_nospace - place task on wait queue if transmit was incomplete
542  * @task: task to put to sleep
543  *
544  */
545 static int xs_nospace(struct rpc_task *task)
546 {
547         struct rpc_rqst *req = task->tk_rqstp;
548         struct rpc_xprt *xprt = req->rq_xprt;
549         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
550         int ret = 0;
551
552         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
553                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
554                         req->rq_slen);
555
556         /* Protect against races with write_space */
557         spin_lock_bh(&xprt->transport_lock);
558
559         /* Don't race with disconnect */
560         if (xprt_connected(xprt)) {
561                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
562                         ret = -EAGAIN;
563                         /*
564                          * Notify TCP that we're limited by the application
565                          * window size
566                          */
567                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
568                         transport->inet->sk_write_pending++;
569                         /* ...and wait for more buffer space */
570                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
571                 }
572         } else {
573                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
574                 ret = -ENOTCONN;
575         }
576
577         spin_unlock_bh(&xprt->transport_lock);
578         return ret;
579 }
580
581 /**
582  * xs_udp_send_request - write an RPC request to a UDP socket
583  * @task: address of RPC task that manages the state of an RPC request
584  *
585  * Return values:
586  *        0:    The request has been sent
587  *   EAGAIN:    The socket was blocked, please call again later to
588  *              complete the request
589  * ENOTCONN:    Caller needs to invoke connect logic then call again
590  *    other:    Some other error occured, the request was not sent
591  */
592 static int xs_udp_send_request(struct rpc_task *task)
593 {
594         struct rpc_rqst *req = task->tk_rqstp;
595         struct rpc_xprt *xprt = req->rq_xprt;
596         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
597         struct xdr_buf *xdr = &req->rq_snd_buf;
598         int status;
599
600         xs_pktdump("packet data:",
601                                 req->rq_svec->iov_base,
602                                 req->rq_svec->iov_len);
603
604         if (!xprt_bound(xprt))
605                 return -ENOTCONN;
606         status = xs_sendpages(transport->sock,
607                               xs_addr(xprt),
608                               xprt->addrlen, xdr,
609                               req->rq_bytes_sent);
610
611         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
612                         xdr->len - req->rq_bytes_sent, status);
613
614         if (status >= 0) {
615                 task->tk_bytes_sent += status;
616                 if (status >= req->rq_slen)
617                         return 0;
618                 /* Still some bytes left; set up for a retry later. */
619                 status = -EAGAIN;
620         }
621         if (!transport->sock)
622                 goto out;
623
624         switch (status) {
625         case -ENOTSOCK:
626                 status = -ENOTCONN;
627                 /* Should we call xs_close() here? */
628                 break;
629         case -EAGAIN:
630                 status = xs_nospace(task);
631                 break;
632         default:
633                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
634                         -status);
635         case -ENETUNREACH:
636         case -EPIPE:
637         case -ECONNREFUSED:
638                 /* When the server has died, an ICMP port unreachable message
639                  * prompts ECONNREFUSED. */
640                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
641         }
642 out:
643         return status;
644 }
645
646 /**
647  * xs_tcp_shutdown - gracefully shut down a TCP socket
648  * @xprt: transport
649  *
650  * Initiates a graceful shutdown of the TCP socket by calling the
651  * equivalent of shutdown(SHUT_WR);
652  */
653 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
654 {
655         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
656         struct socket *sock = transport->sock;
657
658         if (sock != NULL)
659                 kernel_sock_shutdown(sock, SHUT_WR);
660 }
661
662 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
663 {
664         u32 reclen = buf->len - sizeof(rpc_fraghdr);
665         rpc_fraghdr *base = buf->head[0].iov_base;
666         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
667 }
668
669 /**
670  * xs_tcp_send_request - write an RPC request to a TCP socket
671  * @task: address of RPC task that manages the state of an RPC request
672  *
673  * Return values:
674  *        0:    The request has been sent
675  *   EAGAIN:    The socket was blocked, please call again later to
676  *              complete the request
677  * ENOTCONN:    Caller needs to invoke connect logic then call again
678  *    other:    Some other error occured, the request was not sent
679  *
680  * XXX: In the case of soft timeouts, should we eventually give up
681  *      if sendmsg is not able to make progress?
682  */
683 static int xs_tcp_send_request(struct rpc_task *task)
684 {
685         struct rpc_rqst *req = task->tk_rqstp;
686         struct rpc_xprt *xprt = req->rq_xprt;
687         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
688         struct xdr_buf *xdr = &req->rq_snd_buf;
689         int status;
690
691         xs_encode_tcp_record_marker(&req->rq_snd_buf);
692
693         xs_pktdump("packet data:",
694                                 req->rq_svec->iov_base,
695                                 req->rq_svec->iov_len);
696
697         /* Continue transmitting the packet/record. We must be careful
698          * to cope with writespace callbacks arriving _after_ we have
699          * called sendmsg(). */
700         while (1) {
701                 status = xs_sendpages(transport->sock,
702                                         NULL, 0, xdr, req->rq_bytes_sent);
703
704                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
705                                 xdr->len - req->rq_bytes_sent, status);
706
707                 if (unlikely(status < 0))
708                         break;
709
710                 /* If we've sent the entire packet, immediately
711                  * reset the count of bytes sent. */
712                 req->rq_bytes_sent += status;
713                 task->tk_bytes_sent += status;
714                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
715                         req->rq_bytes_sent = 0;
716                         return 0;
717                 }
718
719                 if (status != 0)
720                         continue;
721                 status = -EAGAIN;
722                 break;
723         }
724         if (!transport->sock)
725                 goto out;
726
727         switch (status) {
728         case -ENOTSOCK:
729                 status = -ENOTCONN;
730                 /* Should we call xs_close() here? */
731                 break;
732         case -EAGAIN:
733                 status = xs_nospace(task);
734                 break;
735         default:
736                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
737                         -status);
738         case -ECONNRESET:
739         case -EPIPE:
740                 xs_tcp_shutdown(xprt);
741         case -ECONNREFUSED:
742         case -ENOTCONN:
743                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
744         }
745 out:
746         return status;
747 }
748
749 /**
750  * xs_tcp_release_xprt - clean up after a tcp transmission
751  * @xprt: transport
752  * @task: rpc task
753  *
754  * This cleans up if an error causes us to abort the transmission of a request.
755  * In this case, the socket may need to be reset in order to avoid confusing
756  * the server.
757  */
758 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
759 {
760         struct rpc_rqst *req;
761
762         if (task != xprt->snd_task)
763                 return;
764         if (task == NULL)
765                 goto out_release;
766         req = task->tk_rqstp;
767         if (req->rq_bytes_sent == 0)
768                 goto out_release;
769         if (req->rq_bytes_sent == req->rq_snd_buf.len)
770                 goto out_release;
771         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
772 out_release:
773         xprt_release_xprt(xprt, task);
774 }
775
776 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
777 {
778         transport->old_data_ready = sk->sk_data_ready;
779         transport->old_state_change = sk->sk_state_change;
780         transport->old_write_space = sk->sk_write_space;
781         transport->old_error_report = sk->sk_error_report;
782 }
783
784 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
785 {
786         sk->sk_data_ready = transport->old_data_ready;
787         sk->sk_state_change = transport->old_state_change;
788         sk->sk_write_space = transport->old_write_space;
789         sk->sk_error_report = transport->old_error_report;
790 }
791
792 static void xs_reset_transport(struct sock_xprt *transport)
793 {
794         struct socket *sock = transport->sock;
795         struct sock *sk = transport->inet;
796
797         if (sk == NULL)
798                 return;
799
800         write_lock_bh(&sk->sk_callback_lock);
801         transport->inet = NULL;
802         transport->sock = NULL;
803
804         sk->sk_user_data = NULL;
805
806         xs_restore_old_callbacks(transport, sk);
807         write_unlock_bh(&sk->sk_callback_lock);
808
809         sk->sk_no_check = 0;
810
811         sock_release(sock);
812 }
813
814 /**
815  * xs_close - close a socket
816  * @xprt: transport
817  *
818  * This is used when all requests are complete; ie, no DRC state remains
819  * on the server we want to save.
820  *
821  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
822  * xs_reset_transport() zeroing the socket from underneath a writer.
823  */
824 static void xs_close(struct rpc_xprt *xprt)
825 {
826         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
827
828         dprintk("RPC:       xs_close xprt %p\n", xprt);
829
830         xs_reset_transport(transport);
831
832         smp_mb__before_clear_bit();
833         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
834         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
835         clear_bit(XPRT_CLOSING, &xprt->state);
836         smp_mb__after_clear_bit();
837         xprt_disconnect_done(xprt);
838 }
839
840 static void xs_tcp_close(struct rpc_xprt *xprt)
841 {
842         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
843                 xs_close(xprt);
844         else
845                 xs_tcp_shutdown(xprt);
846 }
847
848 /**
849  * xs_destroy - prepare to shutdown a transport
850  * @xprt: doomed transport
851  *
852  */
853 static void xs_destroy(struct rpc_xprt *xprt)
854 {
855         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
856
857         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
858
859         cancel_rearming_delayed_work(&transport->connect_worker);
860
861         xs_close(xprt);
862         xs_free_peer_addresses(xprt);
863         kfree(xprt->slot);
864         kfree(xprt);
865         module_put(THIS_MODULE);
866 }
867
868 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
869 {
870         return (struct rpc_xprt *) sk->sk_user_data;
871 }
872
873 /**
874  * xs_udp_data_ready - "data ready" callback for UDP sockets
875  * @sk: socket with data to read
876  * @len: how much data to read
877  *
878  */
879 static void xs_udp_data_ready(struct sock *sk, int len)
880 {
881         struct rpc_task *task;
882         struct rpc_xprt *xprt;
883         struct rpc_rqst *rovr;
884         struct sk_buff *skb;
885         int err, repsize, copied;
886         u32 _xid;
887         __be32 *xp;
888
889         read_lock(&sk->sk_callback_lock);
890         dprintk("RPC:       xs_udp_data_ready...\n");
891         if (!(xprt = xprt_from_sock(sk)))
892                 goto out;
893
894         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
895                 goto out;
896
897         if (xprt->shutdown)
898                 goto dropit;
899
900         repsize = skb->len - sizeof(struct udphdr);
901         if (repsize < 4) {
902                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
903                 goto dropit;
904         }
905
906         /* Copy the XID from the skb... */
907         xp = skb_header_pointer(skb, sizeof(struct udphdr),
908                                 sizeof(_xid), &_xid);
909         if (xp == NULL)
910                 goto dropit;
911
912         /* Look up and lock the request corresponding to the given XID */
913         spin_lock(&xprt->transport_lock);
914         rovr = xprt_lookup_rqst(xprt, *xp);
915         if (!rovr)
916                 goto out_unlock;
917         task = rovr->rq_task;
918
919         if ((copied = rovr->rq_private_buf.buflen) > repsize)
920                 copied = repsize;
921
922         /* Suck it into the iovec, verify checksum if not done by hw. */
923         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
924                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
925                 goto out_unlock;
926         }
927
928         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
929
930         /* Something worked... */
931         dst_confirm(skb_dst(skb));
932
933         xprt_adjust_cwnd(task, copied);
934         xprt_update_rtt(task);
935         xprt_complete_rqst(task, copied);
936
937  out_unlock:
938         spin_unlock(&xprt->transport_lock);
939  dropit:
940         skb_free_datagram(sk, skb);
941  out:
942         read_unlock(&sk->sk_callback_lock);
943 }
944
945 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
946 {
947         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
948         size_t len, used;
949         char *p;
950
951         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
952         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
953         used = xdr_skb_read_bits(desc, p, len);
954         transport->tcp_offset += used;
955         if (used != len)
956                 return;
957
958         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
959         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
960                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
961         else
962                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
963         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
964
965         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
966         transport->tcp_offset = 0;
967
968         /* Sanity check of the record length */
969         if (unlikely(transport->tcp_reclen < 8)) {
970                 dprintk("RPC:       invalid TCP record fragment length\n");
971                 xprt_force_disconnect(xprt);
972                 return;
973         }
974         dprintk("RPC:       reading TCP record fragment of length %d\n",
975                         transport->tcp_reclen);
976 }
977
978 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
979 {
980         if (transport->tcp_offset == transport->tcp_reclen) {
981                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
982                 transport->tcp_offset = 0;
983                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
984                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
985                         transport->tcp_flags |= TCP_RCV_COPY_XID;
986                         transport->tcp_copied = 0;
987                 }
988         }
989 }
990
991 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
992 {
993         size_t len, used;
994         char *p;
995
996         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
997         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
998         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
999         used = xdr_skb_read_bits(desc, p, len);
1000         transport->tcp_offset += used;
1001         if (used != len)
1002                 return;
1003         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1004         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1005         transport->tcp_copied = 4;
1006         dprintk("RPC:       reading %s XID %08x\n",
1007                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1008                                                               : "request with",
1009                         ntohl(transport->tcp_xid));
1010         xs_tcp_check_fraghdr(transport);
1011 }
1012
1013 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1014                                        struct xdr_skb_reader *desc)
1015 {
1016         size_t len, used;
1017         u32 offset;
1018         __be32  calldir;
1019
1020         /*
1021          * We want transport->tcp_offset to be 8 at the end of this routine
1022          * (4 bytes for the xid and 4 bytes for the call/reply flag).
1023          * When this function is called for the first time,
1024          * transport->tcp_offset is 4 (after having already read the xid).
1025          */
1026         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1027         len = sizeof(calldir) - offset;
1028         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1029         used = xdr_skb_read_bits(desc, &calldir, len);
1030         transport->tcp_offset += used;
1031         if (used != len)
1032                 return;
1033         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1034         transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1035         transport->tcp_flags |= TCP_RCV_COPY_DATA;
1036         /*
1037          * We don't yet have the XDR buffer, so we will write the calldir
1038          * out after we get the buffer from the 'struct rpc_rqst'
1039          */
1040         if (ntohl(calldir) == RPC_REPLY)
1041                 transport->tcp_flags |= TCP_RPC_REPLY;
1042         else
1043                 transport->tcp_flags &= ~TCP_RPC_REPLY;
1044         dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
1045                         (transport->tcp_flags & TCP_RPC_REPLY) ?
1046                                 "reply for" : "request with", calldir);
1047         xs_tcp_check_fraghdr(transport);
1048 }
1049
1050 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1051                                      struct xdr_skb_reader *desc,
1052                                      struct rpc_rqst *req)
1053 {
1054         struct sock_xprt *transport =
1055                                 container_of(xprt, struct sock_xprt, xprt);
1056         struct xdr_buf *rcvbuf;
1057         size_t len;
1058         ssize_t r;
1059
1060         rcvbuf = &req->rq_private_buf;
1061
1062         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1063                 /*
1064                  * Save the RPC direction in the XDR buffer
1065                  */
1066                 __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
1067                                         htonl(RPC_REPLY) : 0;
1068
1069                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1070                         &calldir, sizeof(calldir));
1071                 transport->tcp_copied += sizeof(calldir);
1072                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1073         }
1074
1075         len = desc->count;
1076         if (len > transport->tcp_reclen - transport->tcp_offset) {
1077                 struct xdr_skb_reader my_desc;
1078
1079                 len = transport->tcp_reclen - transport->tcp_offset;
1080                 memcpy(&my_desc, desc, sizeof(my_desc));
1081                 my_desc.count = len;
1082                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1083                                           &my_desc, xdr_skb_read_bits);
1084                 desc->count -= r;
1085                 desc->offset += r;
1086         } else
1087                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1088                                           desc, xdr_skb_read_bits);
1089
1090         if (r > 0) {
1091                 transport->tcp_copied += r;
1092                 transport->tcp_offset += r;
1093         }
1094         if (r != len) {
1095                 /* Error when copying to the receive buffer,
1096                  * usually because we weren't able to allocate
1097                  * additional buffer pages. All we can do now
1098                  * is turn off TCP_RCV_COPY_DATA, so the request
1099                  * will not receive any additional updates,
1100                  * and time out.
1101                  * Any remaining data from this record will
1102                  * be discarded.
1103                  */
1104                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1105                 dprintk("RPC:       XID %08x truncated request\n",
1106                                 ntohl(transport->tcp_xid));
1107                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1108                                 "tcp_offset = %u, tcp_reclen = %u\n",
1109                                 xprt, transport->tcp_copied,
1110                                 transport->tcp_offset, transport->tcp_reclen);
1111                 return;
1112         }
1113
1114         dprintk("RPC:       XID %08x read %Zd bytes\n",
1115                         ntohl(transport->tcp_xid), r);
1116         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1117                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1118                         transport->tcp_offset, transport->tcp_reclen);
1119
1120         if (transport->tcp_copied == req->rq_private_buf.buflen)
1121                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1122         else if (transport->tcp_offset == transport->tcp_reclen) {
1123                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1124                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1125         }
1126
1127         return;
1128 }
1129
1130 /*
1131  * Finds the request corresponding to the RPC xid and invokes the common
1132  * tcp read code to read the data.
1133  */
1134 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1135                                     struct xdr_skb_reader *desc)
1136 {
1137         struct sock_xprt *transport =
1138                                 container_of(xprt, struct sock_xprt, xprt);
1139         struct rpc_rqst *req;
1140
1141         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1142
1143         /* Find and lock the request corresponding to this xid */
1144         spin_lock(&xprt->transport_lock);
1145         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1146         if (!req) {
1147                 dprintk("RPC:       XID %08x request not found!\n",
1148                                 ntohl(transport->tcp_xid));
1149                 spin_unlock(&xprt->transport_lock);
1150                 return -1;
1151         }
1152
1153         xs_tcp_read_common(xprt, desc, req);
1154
1155         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1156                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1157
1158         spin_unlock(&xprt->transport_lock);
1159         return 0;
1160 }
1161
1162 #if defined(CONFIG_NFS_V4_1)
1163 /*
1164  * Obtains an rpc_rqst previously allocated and invokes the common
1165  * tcp read code to read the data.  The result is placed in the callback
1166  * queue.
1167  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1168  * connection and return -1.
1169  */
1170 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1171                                        struct xdr_skb_reader *desc)
1172 {
1173         struct sock_xprt *transport =
1174                                 container_of(xprt, struct sock_xprt, xprt);
1175         struct rpc_rqst *req;
1176
1177         req = xprt_alloc_bc_request(xprt);
1178         if (req == NULL) {
1179                 printk(KERN_WARNING "Callback slot table overflowed\n");
1180                 xprt_force_disconnect(xprt);
1181                 return -1;
1182         }
1183
1184         req->rq_xid = transport->tcp_xid;
1185         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1186         xs_tcp_read_common(xprt, desc, req);
1187
1188         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1189                 struct svc_serv *bc_serv = xprt->bc_serv;
1190
1191                 /*
1192                  * Add callback request to callback list.  The callback
1193                  * service sleeps on the sv_cb_waitq waiting for new
1194                  * requests.  Wake it up after adding enqueing the
1195                  * request.
1196                  */
1197                 dprintk("RPC:       add callback request to list\n");
1198                 spin_lock(&bc_serv->sv_cb_lock);
1199                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1200                 spin_unlock(&bc_serv->sv_cb_lock);
1201                 wake_up(&bc_serv->sv_cb_waitq);
1202         }
1203
1204         req->rq_private_buf.len = transport->tcp_copied;
1205
1206         return 0;
1207 }
1208
1209 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1210                                         struct xdr_skb_reader *desc)
1211 {
1212         struct sock_xprt *transport =
1213                                 container_of(xprt, struct sock_xprt, xprt);
1214
1215         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1216                 xs_tcp_read_reply(xprt, desc) :
1217                 xs_tcp_read_callback(xprt, desc);
1218 }
1219 #else
1220 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1221                                         struct xdr_skb_reader *desc)
1222 {
1223         return xs_tcp_read_reply(xprt, desc);
1224 }
1225 #endif /* CONFIG_NFS_V4_1 */
1226
1227 /*
1228  * Read data off the transport.  This can be either an RPC_CALL or an
1229  * RPC_REPLY.  Relay the processing to helper functions.
1230  */
1231 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1232                                     struct xdr_skb_reader *desc)
1233 {
1234         struct sock_xprt *transport =
1235                                 container_of(xprt, struct sock_xprt, xprt);
1236
1237         if (_xs_tcp_read_data(xprt, desc) == 0)
1238                 xs_tcp_check_fraghdr(transport);
1239         else {
1240                 /*
1241                  * The transport_lock protects the request handling.
1242                  * There's no need to hold it to update the tcp_flags.
1243                  */
1244                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1245         }
1246 }
1247
1248 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1249 {
1250         size_t len;
1251
1252         len = transport->tcp_reclen - transport->tcp_offset;
1253         if (len > desc->count)
1254                 len = desc->count;
1255         desc->count -= len;
1256         desc->offset += len;
1257         transport->tcp_offset += len;
1258         dprintk("RPC:       discarded %Zu bytes\n", len);
1259         xs_tcp_check_fraghdr(transport);
1260 }
1261
1262 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1263 {
1264         struct rpc_xprt *xprt = rd_desc->arg.data;
1265         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1266         struct xdr_skb_reader desc = {
1267                 .skb    = skb,
1268                 .offset = offset,
1269                 .count  = len,
1270         };
1271
1272         dprintk("RPC:       xs_tcp_data_recv started\n");
1273         do {
1274                 /* Read in a new fragment marker if necessary */
1275                 /* Can we ever really expect to get completely empty fragments? */
1276                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1277                         xs_tcp_read_fraghdr(xprt, &desc);
1278                         continue;
1279                 }
1280                 /* Read in the xid if necessary */
1281                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1282                         xs_tcp_read_xid(transport, &desc);
1283                         continue;
1284                 }
1285                 /* Read in the call/reply flag */
1286                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1287                         xs_tcp_read_calldir(transport, &desc);
1288                         continue;
1289                 }
1290                 /* Read in the request data */
1291                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1292                         xs_tcp_read_data(xprt, &desc);
1293                         continue;
1294                 }
1295                 /* Skip over any trailing bytes on short reads */
1296                 xs_tcp_read_discard(transport, &desc);
1297         } while (desc.count);
1298         dprintk("RPC:       xs_tcp_data_recv done\n");
1299         return len - desc.count;
1300 }
1301
1302 /**
1303  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1304  * @sk: socket with data to read
1305  * @bytes: how much data to read
1306  *
1307  */
1308 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1309 {
1310         struct rpc_xprt *xprt;
1311         read_descriptor_t rd_desc;
1312         int read;
1313
1314         dprintk("RPC:       xs_tcp_data_ready...\n");
1315
1316         read_lock(&sk->sk_callback_lock);
1317         if (!(xprt = xprt_from_sock(sk)))
1318                 goto out;
1319         if (xprt->shutdown)
1320                 goto out;
1321
1322         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1323         rd_desc.arg.data = xprt;
1324         do {
1325                 rd_desc.count = 65536;
1326                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1327         } while (read > 0);
1328 out:
1329         read_unlock(&sk->sk_callback_lock);
1330 }
1331
1332 /*
1333  * Do the equivalent of linger/linger2 handling for dealing with
1334  * broken servers that don't close the socket in a timely
1335  * fashion
1336  */
1337 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1338                 unsigned long timeout)
1339 {
1340         struct sock_xprt *transport;
1341
1342         if (xprt_test_and_set_connecting(xprt))
1343                 return;
1344         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1345         transport = container_of(xprt, struct sock_xprt, xprt);
1346         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1347                            timeout);
1348 }
1349
1350 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1351 {
1352         struct sock_xprt *transport;
1353
1354         transport = container_of(xprt, struct sock_xprt, xprt);
1355
1356         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1357             !cancel_delayed_work(&transport->connect_worker))
1358                 return;
1359         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1360         xprt_clear_connecting(xprt);
1361 }
1362
1363 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1364 {
1365         smp_mb__before_clear_bit();
1366         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1367         clear_bit(XPRT_CLOSING, &xprt->state);
1368         smp_mb__after_clear_bit();
1369         /* Mark transport as closed and wake up all pending tasks */
1370         xprt_disconnect_done(xprt);
1371 }
1372
1373 /**
1374  * xs_tcp_state_change - callback to handle TCP socket state changes
1375  * @sk: socket whose state has changed
1376  *
1377  */
1378 static void xs_tcp_state_change(struct sock *sk)
1379 {
1380         struct rpc_xprt *xprt;
1381
1382         read_lock(&sk->sk_callback_lock);
1383         if (!(xprt = xprt_from_sock(sk)))
1384                 goto out;
1385         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1386         dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1387                         sk->sk_state, xprt_connected(xprt),
1388                         sock_flag(sk, SOCK_DEAD),
1389                         sock_flag(sk, SOCK_ZAPPED));
1390
1391         switch (sk->sk_state) {
1392         case TCP_ESTABLISHED:
1393                 spin_lock_bh(&xprt->transport_lock);
1394                 if (!xprt_test_and_set_connected(xprt)) {
1395                         struct sock_xprt *transport = container_of(xprt,
1396                                         struct sock_xprt, xprt);
1397
1398                         /* Reset TCP record info */
1399                         transport->tcp_offset = 0;
1400                         transport->tcp_reclen = 0;
1401                         transport->tcp_copied = 0;
1402                         transport->tcp_flags =
1403                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1404
1405                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1406                 }
1407                 spin_unlock_bh(&xprt->transport_lock);
1408                 break;
1409         case TCP_FIN_WAIT1:
1410                 /* The client initiated a shutdown of the socket */
1411                 xprt->connect_cookie++;
1412                 xprt->reestablish_timeout = 0;
1413                 set_bit(XPRT_CLOSING, &xprt->state);
1414                 smp_mb__before_clear_bit();
1415                 clear_bit(XPRT_CONNECTED, &xprt->state);
1416                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1417                 smp_mb__after_clear_bit();
1418                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1419                 break;
1420         case TCP_CLOSE_WAIT:
1421                 /* The server initiated a shutdown of the socket */
1422                 xprt_force_disconnect(xprt);
1423         case TCP_SYN_SENT:
1424                 xprt->connect_cookie++;
1425         case TCP_CLOSING:
1426                 /*
1427                  * If the server closed down the connection, make sure that
1428                  * we back off before reconnecting
1429                  */
1430                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1431                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1432                 break;
1433         case TCP_LAST_ACK:
1434                 set_bit(XPRT_CLOSING, &xprt->state);
1435                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1436                 smp_mb__before_clear_bit();
1437                 clear_bit(XPRT_CONNECTED, &xprt->state);
1438                 smp_mb__after_clear_bit();
1439                 break;
1440         case TCP_CLOSE:
1441                 xs_tcp_cancel_linger_timeout(xprt);
1442                 xs_sock_mark_closed(xprt);
1443         }
1444  out:
1445         read_unlock(&sk->sk_callback_lock);
1446 }
1447
1448 /**
1449  * xs_error_report - callback mainly for catching socket errors
1450  * @sk: socket
1451  */
1452 static void xs_error_report(struct sock *sk)
1453 {
1454         struct rpc_xprt *xprt;
1455
1456         read_lock(&sk->sk_callback_lock);
1457         if (!(xprt = xprt_from_sock(sk)))
1458                 goto out;
1459         dprintk("RPC:       %s client %p...\n"
1460                         "RPC:       error %d\n",
1461                         __func__, xprt, sk->sk_err);
1462         xprt_wake_pending_tasks(xprt, -EAGAIN);
1463 out:
1464         read_unlock(&sk->sk_callback_lock);
1465 }
1466
1467 static void xs_write_space(struct sock *sk)
1468 {
1469         struct socket *sock;
1470         struct rpc_xprt *xprt;
1471
1472         if (unlikely(!(sock = sk->sk_socket)))
1473                 return;
1474         clear_bit(SOCK_NOSPACE, &sock->flags);
1475
1476         if (unlikely(!(xprt = xprt_from_sock(sk))))
1477                 return;
1478         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1479                 return;
1480
1481         xprt_write_space(xprt);
1482 }
1483
1484 /**
1485  * xs_udp_write_space - callback invoked when socket buffer space
1486  *                             becomes available
1487  * @sk: socket whose state has changed
1488  *
1489  * Called when more output buffer space is available for this socket.
1490  * We try not to wake our writers until they can make "significant"
1491  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1492  * with a bunch of small requests.
1493  */
1494 static void xs_udp_write_space(struct sock *sk)
1495 {
1496         read_lock(&sk->sk_callback_lock);
1497
1498         /* from net/core/sock.c:sock_def_write_space */
1499         if (sock_writeable(sk))
1500                 xs_write_space(sk);
1501
1502         read_unlock(&sk->sk_callback_lock);
1503 }
1504
1505 /**
1506  * xs_tcp_write_space - callback invoked when socket buffer space
1507  *                             becomes available
1508  * @sk: socket whose state has changed
1509  *
1510  * Called when more output buffer space is available for this socket.
1511  * We try not to wake our writers until they can make "significant"
1512  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1513  * with a bunch of small requests.
1514  */
1515 static void xs_tcp_write_space(struct sock *sk)
1516 {
1517         read_lock(&sk->sk_callback_lock);
1518
1519         /* from net/core/stream.c:sk_stream_write_space */
1520         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1521                 xs_write_space(sk);
1522
1523         read_unlock(&sk->sk_callback_lock);
1524 }
1525
1526 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1527 {
1528         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1529         struct sock *sk = transport->inet;
1530
1531         if (transport->rcvsize) {
1532                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1533                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1534         }
1535         if (transport->sndsize) {
1536                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1537                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1538                 sk->sk_write_space(sk);
1539         }
1540 }
1541
1542 /**
1543  * xs_udp_set_buffer_size - set send and receive limits
1544  * @xprt: generic transport
1545  * @sndsize: requested size of send buffer, in bytes
1546  * @rcvsize: requested size of receive buffer, in bytes
1547  *
1548  * Set socket send and receive buffer size limits.
1549  */
1550 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1551 {
1552         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1553
1554         transport->sndsize = 0;
1555         if (sndsize)
1556                 transport->sndsize = sndsize + 1024;
1557         transport->rcvsize = 0;
1558         if (rcvsize)
1559                 transport->rcvsize = rcvsize + 1024;
1560
1561         xs_udp_do_set_buffer_size(xprt);
1562 }
1563
1564 /**
1565  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1566  * @task: task that timed out
1567  *
1568  * Adjust the congestion window after a retransmit timeout has occurred.
1569  */
1570 static void xs_udp_timer(struct rpc_task *task)
1571 {
1572         xprt_adjust_cwnd(task, -ETIMEDOUT);
1573 }
1574
1575 static unsigned short xs_get_random_port(void)
1576 {
1577         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1578         unsigned short rand = (unsigned short) net_random() % range;
1579         return rand + xprt_min_resvport;
1580 }
1581
1582 /**
1583  * xs_set_port - reset the port number in the remote endpoint address
1584  * @xprt: generic transport
1585  * @port: new port number
1586  *
1587  */
1588 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1589 {
1590         struct sockaddr *addr = xs_addr(xprt);
1591
1592         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1593
1594         switch (addr->sa_family) {
1595         case AF_INET:
1596                 ((struct sockaddr_in *)addr)->sin_port = htons(port);
1597                 break;
1598         case AF_INET6:
1599                 ((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1600                 break;
1601         default:
1602                 BUG();
1603         }
1604 }
1605
1606 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1607 {
1608         unsigned short port = transport->port;
1609
1610         if (port == 0 && transport->xprt.resvport)
1611                 port = xs_get_random_port();
1612         return port;
1613 }
1614
1615 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1616 {
1617         if (transport->port != 0)
1618                 transport->port = 0;
1619         if (!transport->xprt.resvport)
1620                 return 0;
1621         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1622                 return xprt_max_resvport;
1623         return --port;
1624 }
1625
1626 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1627 {
1628         struct sockaddr_in myaddr = {
1629                 .sin_family = AF_INET,
1630         };
1631         struct sockaddr_in *sa;
1632         int err, nloop = 0;
1633         unsigned short port = xs_get_srcport(transport, sock);
1634         unsigned short last;
1635
1636         sa = (struct sockaddr_in *)&transport->addr;
1637         myaddr.sin_addr = sa->sin_addr;
1638         do {
1639                 myaddr.sin_port = htons(port);
1640                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1641                                                 sizeof(myaddr));
1642                 if (port == 0)
1643                         break;
1644                 if (err == 0) {
1645                         transport->port = port;
1646                         break;
1647                 }
1648                 last = port;
1649                 port = xs_next_srcport(transport, sock, port);
1650                 if (port > last)
1651                         nloop++;
1652         } while (err == -EADDRINUSE && nloop != 2);
1653         dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1654                         __func__, &myaddr.sin_addr,
1655                         port, err ? "failed" : "ok", err);
1656         return err;
1657 }
1658
1659 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1660 {
1661         struct sockaddr_in6 myaddr = {
1662                 .sin6_family = AF_INET6,
1663         };
1664         struct sockaddr_in6 *sa;
1665         int err, nloop = 0;
1666         unsigned short port = xs_get_srcport(transport, sock);
1667         unsigned short last;
1668
1669         sa = (struct sockaddr_in6 *)&transport->addr;
1670         myaddr.sin6_addr = sa->sin6_addr;
1671         do {
1672                 myaddr.sin6_port = htons(port);
1673                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1674                                                 sizeof(myaddr));
1675                 if (port == 0)
1676                         break;
1677                 if (err == 0) {
1678                         transport->port = port;
1679                         break;
1680                 }
1681                 last = port;
1682                 port = xs_next_srcport(transport, sock, port);
1683                 if (port > last)
1684                         nloop++;
1685         } while (err == -EADDRINUSE && nloop != 2);
1686         dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1687                 &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1688         return err;
1689 }
1690
1691 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1692 static struct lock_class_key xs_key[2];
1693 static struct lock_class_key xs_slock_key[2];
1694
1695 static inline void xs_reclassify_socket4(struct socket *sock)
1696 {
1697         struct sock *sk = sock->sk;
1698
1699         BUG_ON(sock_owned_by_user(sk));
1700         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1701                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1702 }
1703
1704 static inline void xs_reclassify_socket6(struct socket *sock)
1705 {
1706         struct sock *sk = sock->sk;
1707
1708         BUG_ON(sock_owned_by_user(sk));
1709         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1710                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1711 }
1712 #else
1713 static inline void xs_reclassify_socket4(struct socket *sock)
1714 {
1715 }
1716
1717 static inline void xs_reclassify_socket6(struct socket *sock)
1718 {
1719 }
1720 #endif
1721
1722 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1723 {
1724         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1725
1726         if (!transport->inet) {
1727                 struct sock *sk = sock->sk;
1728
1729                 write_lock_bh(&sk->sk_callback_lock);
1730
1731                 xs_save_old_callbacks(transport, sk);
1732
1733                 sk->sk_user_data = xprt;
1734                 sk->sk_data_ready = xs_udp_data_ready;
1735                 sk->sk_write_space = xs_udp_write_space;
1736                 sk->sk_error_report = xs_error_report;
1737                 sk->sk_no_check = UDP_CSUM_NORCV;
1738                 sk->sk_allocation = GFP_ATOMIC;
1739
1740                 xprt_set_connected(xprt);
1741
1742                 /* Reset to new socket */
1743                 transport->sock = sock;
1744                 transport->inet = sk;
1745
1746                 write_unlock_bh(&sk->sk_callback_lock);
1747         }
1748         xs_udp_do_set_buffer_size(xprt);
1749 }
1750
1751 /**
1752  * xs_udp_connect_worker4 - set up a UDP socket
1753  * @work: RPC transport to connect
1754  *
1755  * Invoked by a work queue tasklet.
1756  */
1757 static void xs_udp_connect_worker4(struct work_struct *work)
1758 {
1759         struct sock_xprt *transport =
1760                 container_of(work, struct sock_xprt, connect_worker.work);
1761         struct rpc_xprt *xprt = &transport->xprt;
1762         struct socket *sock = transport->sock;
1763         int err, status = -EIO;
1764
1765         if (xprt->shutdown)
1766                 goto out;
1767
1768         /* Start by resetting any existing state */
1769         xs_reset_transport(transport);
1770
1771         err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1772         if (err < 0) {
1773                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1774                 goto out;
1775         }
1776         xs_reclassify_socket4(sock);
1777
1778         if (xs_bind4(transport, sock)) {
1779                 sock_release(sock);
1780                 goto out;
1781         }
1782
1783         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1784                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1785
1786         xs_udp_finish_connecting(xprt, sock);
1787         status = 0;
1788 out:
1789         xprt_clear_connecting(xprt);
1790         xprt_wake_pending_tasks(xprt, status);
1791 }
1792
1793 /**
1794  * xs_udp_connect_worker6 - set up a UDP socket
1795  * @work: RPC transport to connect
1796  *
1797  * Invoked by a work queue tasklet.
1798  */
1799 static void xs_udp_connect_worker6(struct work_struct *work)
1800 {
1801         struct sock_xprt *transport =
1802                 container_of(work, struct sock_xprt, connect_worker.work);
1803         struct rpc_xprt *xprt = &transport->xprt;
1804         struct socket *sock = transport->sock;
1805         int err, status = -EIO;
1806
1807         if (xprt->shutdown)
1808                 goto out;
1809
1810         /* Start by resetting any existing state */
1811         xs_reset_transport(transport);
1812
1813         err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1814         if (err < 0) {
1815                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1816                 goto out;
1817         }
1818         xs_reclassify_socket6(sock);
1819
1820         if (xs_bind6(transport, sock) < 0) {
1821                 sock_release(sock);
1822                 goto out;
1823         }
1824
1825         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1826                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1827
1828         xs_udp_finish_connecting(xprt, sock);
1829         status = 0;
1830 out:
1831         xprt_clear_connecting(xprt);
1832         xprt_wake_pending_tasks(xprt, status);
1833 }
1834
1835 /*
1836  * We need to preserve the port number so the reply cache on the server can
1837  * find our cached RPC replies when we get around to reconnecting.
1838  */
1839 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1840 {
1841         int result;
1842         struct sockaddr any;
1843
1844         dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1845
1846         /*
1847          * Disconnect the transport socket by doing a connect operation
1848          * with AF_UNSPEC.  This should return immediately...
1849          */
1850         memset(&any, 0, sizeof(any));
1851         any.sa_family = AF_UNSPEC;
1852         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1853         if (!result)
1854                 xs_sock_mark_closed(xprt);
1855         else
1856                 dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1857                                 result);
1858 }
1859
1860 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1861 {
1862         unsigned int state = transport->inet->sk_state;
1863
1864         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1865                 return;
1866         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1867                 return;
1868         xs_abort_connection(xprt, transport);
1869 }
1870
1871 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1872 {
1873         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1874
1875         if (!transport->inet) {
1876                 struct sock *sk = sock->sk;
1877
1878                 write_lock_bh(&sk->sk_callback_lock);
1879
1880                 xs_save_old_callbacks(transport, sk);
1881
1882                 sk->sk_user_data = xprt;
1883                 sk->sk_data_ready = xs_tcp_data_ready;
1884                 sk->sk_state_change = xs_tcp_state_change;
1885                 sk->sk_write_space = xs_tcp_write_space;
1886                 sk->sk_error_report = xs_error_report;
1887                 sk->sk_allocation = GFP_ATOMIC;
1888
1889                 /* socket options */
1890                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1891                 sock_reset_flag(sk, SOCK_LINGER);
1892                 tcp_sk(sk)->linger2 = 0;
1893                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1894
1895                 xprt_clear_connected(xprt);
1896
1897                 /* Reset to new socket */
1898                 transport->sock = sock;
1899                 transport->inet = sk;
1900
1901                 write_unlock_bh(&sk->sk_callback_lock);
1902         }
1903
1904         if (!xprt_bound(xprt))
1905                 return -ENOTCONN;
1906
1907         /* Tell the socket layer to start connecting... */
1908         xprt->stat.connect_count++;
1909         xprt->stat.connect_start = jiffies;
1910         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1911 }
1912
1913 /**
1914  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1915  * @xprt: RPC transport to connect
1916  * @transport: socket transport to connect
1917  * @create_sock: function to create a socket of the correct type
1918  *
1919  * Invoked by a work queue tasklet.
1920  */
1921 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1922                 struct sock_xprt *transport,
1923                 struct socket *(*create_sock)(struct rpc_xprt *,
1924                         struct sock_xprt *))
1925 {
1926         struct socket *sock = transport->sock;
1927         int status = -EIO;
1928
1929         if (xprt->shutdown)
1930                 goto out;
1931
1932         if (!sock) {
1933                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1934                 sock = create_sock(xprt, transport);
1935                 if (IS_ERR(sock)) {
1936                         status = PTR_ERR(sock);
1937                         goto out;
1938                 }
1939         } else {
1940                 int abort_and_exit;
1941
1942                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1943                                 &xprt->state);
1944                 /* "close" the socket, preserving the local port */
1945                 xs_tcp_reuse_connection(xprt, transport);
1946
1947                 if (abort_and_exit)
1948                         goto out_eagain;
1949         }
1950
1951         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1952                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1953
1954         status = xs_tcp_finish_connecting(xprt, sock);
1955         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1956                         xprt, -status, xprt_connected(xprt),
1957                         sock->sk->sk_state);
1958         switch (status) {
1959         default:
1960                 printk("%s: connect returned unhandled error %d\n",
1961                         __func__, status);
1962         case -EADDRNOTAVAIL:
1963                 /* We're probably in TIME_WAIT. Get rid of existing socket,
1964                  * and retry
1965                  */
1966                 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1967                 xprt_force_disconnect(xprt);
1968                 break;
1969         case -ECONNREFUSED:
1970         case -ECONNRESET:
1971         case -ENETUNREACH:
1972                 /* retry with existing socket, after a delay */
1973         case 0:
1974         case -EINPROGRESS:
1975         case -EALREADY:
1976                 xprt_clear_connecting(xprt);
1977                 return;
1978         }
1979 out_eagain:
1980         status = -EAGAIN;
1981 out:
1982         xprt_clear_connecting(xprt);
1983         xprt_wake_pending_tasks(xprt, status);
1984 }
1985
1986 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1987                 struct sock_xprt *transport)
1988 {
1989         struct socket *sock;
1990         int err;
1991
1992         /* start from scratch */
1993         err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1994         if (err < 0) {
1995                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1996                                 -err);
1997                 goto out_err;
1998         }
1999         xs_reclassify_socket4(sock);
2000
2001         if (xs_bind4(transport, sock) < 0) {
2002                 sock_release(sock);
2003                 goto out_err;
2004         }
2005         return sock;
2006 out_err:
2007         return ERR_PTR(-EIO);
2008 }
2009
2010 /**
2011  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
2012  * @work: RPC transport to connect
2013  *
2014  * Invoked by a work queue tasklet.
2015  */
2016 static void xs_tcp_connect_worker4(struct work_struct *work)
2017 {
2018         struct sock_xprt *transport =
2019                 container_of(work, struct sock_xprt, connect_worker.work);
2020         struct rpc_xprt *xprt = &transport->xprt;
2021
2022         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
2023 }
2024
2025 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
2026                 struct sock_xprt *transport)
2027 {
2028         struct socket *sock;
2029         int err;
2030
2031         /* start from scratch */
2032         err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
2033         if (err < 0) {
2034                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
2035                                 -err);
2036                 goto out_err;
2037         }
2038         xs_reclassify_socket6(sock);
2039
2040         if (xs_bind6(transport, sock) < 0) {
2041                 sock_release(sock);
2042                 goto out_err;
2043         }
2044         return sock;
2045 out_err:
2046         return ERR_PTR(-EIO);
2047 }
2048
2049 /**
2050  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
2051  * @work: RPC transport to connect
2052  *
2053  * Invoked by a work queue tasklet.
2054  */
2055 static void xs_tcp_connect_worker6(struct work_struct *work)
2056 {
2057         struct sock_xprt *transport =
2058                 container_of(work, struct sock_xprt, connect_worker.work);
2059         struct rpc_xprt *xprt = &transport->xprt;
2060
2061         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
2062 }
2063
2064 /**
2065  * xs_connect - connect a socket to a remote endpoint
2066  * @task: address of RPC task that manages state of connect request
2067  *
2068  * TCP: If the remote end dropped the connection, delay reconnecting.
2069  *
2070  * UDP socket connects are synchronous, but we use a work queue anyway
2071  * to guarantee that even unprivileged user processes can set up a
2072  * socket on a privileged port.
2073  *
2074  * If a UDP socket connect fails, the delay behavior here prevents
2075  * retry floods (hard mounts).
2076  */
2077 static void xs_connect(struct rpc_task *task)
2078 {
2079         struct rpc_xprt *xprt = task->tk_xprt;
2080         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2081
2082         if (xprt_test_and_set_connecting(xprt))
2083                 return;
2084
2085         if (transport->sock != NULL) {
2086                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2087                                 "seconds\n",
2088                                 xprt, xprt->reestablish_timeout / HZ);
2089                 queue_delayed_work(rpciod_workqueue,
2090                                    &transport->connect_worker,
2091                                    xprt->reestablish_timeout);
2092                 xprt->reestablish_timeout <<= 1;
2093                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2094                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2095         } else {
2096                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2097                 queue_delayed_work(rpciod_workqueue,
2098                                    &transport->connect_worker, 0);
2099         }
2100 }
2101
2102 static void xs_tcp_connect(struct rpc_task *task)
2103 {
2104         struct rpc_xprt *xprt = task->tk_xprt;
2105
2106         /* Exit if we need to wait for socket shutdown to complete */
2107         if (test_bit(XPRT_CLOSING, &xprt->state))
2108                 return;
2109         xs_connect(task);
2110 }
2111
2112 /**
2113  * xs_udp_print_stats - display UDP socket-specifc stats
2114  * @xprt: rpc_xprt struct containing statistics
2115  * @seq: output file
2116  *
2117  */
2118 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2119 {
2120         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2121
2122         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2123                         transport->port,
2124                         xprt->stat.bind_count,
2125                         xprt->stat.sends,
2126                         xprt->stat.recvs,
2127                         xprt->stat.bad_xids,
2128                         xprt->stat.req_u,
2129                         xprt->stat.bklog_u);
2130 }
2131
2132 /**
2133  * xs_tcp_print_stats - display TCP socket-specifc stats
2134  * @xprt: rpc_xprt struct containing statistics
2135  * @seq: output file
2136  *
2137  */
2138 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2139 {
2140         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2141         long idle_time = 0;
2142
2143         if (xprt_connected(xprt))
2144                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2145
2146         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2147                         transport->port,
2148                         xprt->stat.bind_count,
2149                         xprt->stat.connect_count,
2150                         xprt->stat.connect_time,
2151                         idle_time,
2152                         xprt->stat.sends,
2153                         xprt->stat.recvs,
2154                         xprt->stat.bad_xids,
2155                         xprt->stat.req_u,
2156                         xprt->stat.bklog_u);
2157 }
2158
2159 static struct rpc_xprt_ops xs_udp_ops = {
2160         .set_buffer_size        = xs_udp_set_buffer_size,
2161         .reserve_xprt           = xprt_reserve_xprt_cong,
2162         .release_xprt           = xprt_release_xprt_cong,
2163         .rpcbind                = rpcb_getport_async,
2164         .set_port               = xs_set_port,
2165         .connect                = xs_connect,
2166         .buf_alloc              = rpc_malloc,
2167         .buf_free               = rpc_free,
2168         .send_request           = xs_udp_send_request,
2169         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2170         .timer                  = xs_udp_timer,
2171         .release_request        = xprt_release_rqst_cong,
2172         .close                  = xs_close,
2173         .destroy                = xs_destroy,
2174         .print_stats            = xs_udp_print_stats,
2175 };
2176
2177 static struct rpc_xprt_ops xs_tcp_ops = {
2178         .reserve_xprt           = xprt_reserve_xprt,
2179         .release_xprt           = xs_tcp_release_xprt,
2180         .rpcbind                = rpcb_getport_async,
2181         .set_port               = xs_set_port,
2182         .connect                = xs_tcp_connect,
2183         .buf_alloc              = rpc_malloc,
2184         .buf_free               = rpc_free,
2185         .send_request           = xs_tcp_send_request,
2186         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2187 #if defined(CONFIG_NFS_V4_1)
2188         .release_request        = bc_release_request,
2189 #endif /* CONFIG_NFS_V4_1 */
2190         .close                  = xs_tcp_close,
2191         .destroy                = xs_destroy,
2192         .print_stats            = xs_tcp_print_stats,
2193 };
2194
2195 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2196                                       unsigned int slot_table_size)
2197 {
2198         struct rpc_xprt *xprt;
2199         struct sock_xprt *new;
2200
2201         if (args->addrlen > sizeof(xprt->addr)) {
2202                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2203                 return ERR_PTR(-EBADF);
2204         }
2205
2206         new = kzalloc(sizeof(*new), GFP_KERNEL);
2207         if (new == NULL) {
2208                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2209                                 "rpc_xprt\n");
2210                 return ERR_PTR(-ENOMEM);
2211         }
2212         xprt = &new->xprt;
2213
2214         xprt->max_reqs = slot_table_size;
2215         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2216         if (xprt->slot == NULL) {
2217                 kfree(xprt);
2218                 dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2219                                 "table\n");
2220                 return ERR_PTR(-ENOMEM);
2221         }
2222
2223         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2224         xprt->addrlen = args->addrlen;
2225         if (args->srcaddr)
2226                 memcpy(&new->addr, args->srcaddr, args->addrlen);
2227
2228         return xprt;
2229 }
2230
2231 static const struct rpc_timeout xs_udp_default_timeout = {
2232         .to_initval = 5 * HZ,
2233         .to_maxval = 30 * HZ,
2234         .to_increment = 5 * HZ,
2235         .to_retries = 5,
2236 };
2237
2238 /**
2239  * xs_setup_udp - Set up transport to use a UDP socket
2240  * @args: rpc transport creation arguments
2241  *
2242  */
2243 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2244 {
2245         struct sockaddr *addr = args->dstaddr;
2246         struct rpc_xprt *xprt;
2247         struct sock_xprt *transport;
2248
2249         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2250         if (IS_ERR(xprt))
2251                 return xprt;
2252         transport = container_of(xprt, struct sock_xprt, xprt);
2253
2254         xprt->prot = IPPROTO_UDP;
2255         xprt->tsh_size = 0;
2256         /* XXX: header size can vary due to auth type, IPv6, etc. */
2257         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2258
2259         xprt->bind_timeout = XS_BIND_TO;
2260         xprt->connect_timeout = XS_UDP_CONN_TO;
2261         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2262         xprt->idle_timeout = XS_IDLE_DISC_TO;
2263
2264         xprt->ops = &xs_udp_ops;
2265
2266         xprt->timeout = &xs_udp_default_timeout;
2267
2268         switch (addr->sa_family) {
2269         case AF_INET:
2270                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2271                         xprt_set_bound(xprt);
2272
2273                 INIT_DELAYED_WORK(&transport->connect_worker,
2274                                         xs_udp_connect_worker4);
2275                 xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2276                 break;
2277         case AF_INET6:
2278                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2279                         xprt_set_bound(xprt);
2280
2281                 INIT_DELAYED_WORK(&transport->connect_worker,
2282                                         xs_udp_connect_worker6);
2283                 xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2284                 break;
2285         default:
2286                 kfree(xprt);
2287                 return ERR_PTR(-EAFNOSUPPORT);
2288         }
2289
2290         dprintk("RPC:       set up transport to address %s\n",
2291                         xprt->address_strings[RPC_DISPLAY_ALL]);
2292
2293         if (try_module_get(THIS_MODULE))
2294                 return xprt;
2295
2296         kfree(xprt->slot);
2297         kfree(xprt);
2298         return ERR_PTR(-EINVAL);
2299 }
2300
2301 static const struct rpc_timeout xs_tcp_default_timeout = {
2302         .to_initval = 60 * HZ,
2303         .to_maxval = 60 * HZ,
2304         .to_retries = 2,
2305 };
2306
2307 /**
2308  * xs_setup_tcp - Set up transport to use a TCP socket
2309  * @args: rpc transport creation arguments
2310  *
2311  */
2312 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2313 {
2314         struct sockaddr *addr = args->dstaddr;
2315         struct rpc_xprt *xprt;
2316         struct sock_xprt *transport;
2317
2318         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2319         if (IS_ERR(xprt))
2320                 return xprt;
2321         transport = container_of(xprt, struct sock_xprt, xprt);
2322
2323         xprt->prot = IPPROTO_TCP;
2324         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2325         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2326
2327         xprt->bind_timeout = XS_BIND_TO;
2328         xprt->connect_timeout = XS_TCP_CONN_TO;
2329         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2330         xprt->idle_timeout = XS_IDLE_DISC_TO;
2331
2332         xprt->ops = &xs_tcp_ops;
2333         xprt->timeout = &xs_tcp_default_timeout;
2334
2335         switch (addr->sa_family) {
2336         case AF_INET:
2337                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2338                         xprt_set_bound(xprt);
2339
2340                 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2341                 xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2342                 break;
2343         case AF_INET6:
2344                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2345                         xprt_set_bound(xprt);
2346
2347                 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2348                 xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2349                 break;
2350         default:
2351                 kfree(xprt);
2352                 return ERR_PTR(-EAFNOSUPPORT);
2353         }
2354
2355         dprintk("RPC:       set up transport to address %s\n",
2356                         xprt->address_strings[RPC_DISPLAY_ALL]);
2357
2358         if (try_module_get(THIS_MODULE))
2359                 return xprt;
2360
2361         kfree(xprt->slot);
2362         kfree(xprt);
2363         return ERR_PTR(-EINVAL);
2364 }
2365
2366 static struct xprt_class        xs_udp_transport = {
2367         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2368         .name           = "udp",
2369         .owner          = THIS_MODULE,
2370         .ident          = IPPROTO_UDP,
2371         .setup          = xs_setup_udp,
2372 };
2373
2374 static struct xprt_class        xs_tcp_transport = {
2375         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2376         .name           = "tcp",
2377         .owner          = THIS_MODULE,
2378         .ident          = IPPROTO_TCP,
2379         .setup          = xs_setup_tcp,
2380 };
2381
2382 /**
2383  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2384  *
2385  */
2386 int init_socket_xprt(void)
2387 {
2388 #ifdef RPC_DEBUG
2389         if (!sunrpc_table_header)
2390                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2391 #endif
2392
2393         xprt_register_transport(&xs_udp_transport);
2394         xprt_register_transport(&xs_tcp_transport);
2395
2396         return 0;
2397 }
2398
2399 /**
2400  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2401  *
2402  */
2403 void cleanup_socket_xprt(void)
2404 {
2405 #ifdef RPC_DEBUG
2406         if (sunrpc_table_header) {
2407                 unregister_sysctl_table(sunrpc_table_header);
2408                 sunrpc_table_header = NULL;
2409         }
2410 #endif
2411
2412         xprt_unregister_transport(&xs_udp_transport);
2413         xprt_unregister_transport(&xs_tcp_transport);
2414 }