Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/sparc-2.6
[linux-2.6.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/svcsock.h>
36 #include <linux/sunrpc/xprtsock.h>
37 #include <linux/file.h>
38 #ifdef CONFIG_NFS_V4_1
39 #include <linux/sunrpc/bc_xprt.h>
40 #endif
41
42 #include <net/sock.h>
43 #include <net/checksum.h>
44 #include <net/udp.h>
45 #include <net/tcp.h>
46
47 #include "sunrpc.h"
48 /*
49  * xprtsock tunables
50  */
51 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
52 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
53
54 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
55 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
56
57 #define XS_TCP_LINGER_TO        (15U * HZ)
58 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
59
60 /*
61  * We can register our own files under /proc/sys/sunrpc by
62  * calling register_sysctl_table() again.  The files in that
63  * directory become the union of all files registered there.
64  *
65  * We simply need to make sure that we don't collide with
66  * someone else's file names!
67  */
68
69 #ifdef RPC_DEBUG
70
71 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
72 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
73 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
74 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
75
76 static struct ctl_table_header *sunrpc_table_header;
77
78 /*
79  * FIXME: changing the UDP slot table size should also resize the UDP
80  *        socket buffers for existing UDP transports
81  */
82 static ctl_table xs_tunables_table[] = {
83         {
84                 .procname       = "udp_slot_table_entries",
85                 .data           = &xprt_udp_slot_table_entries,
86                 .maxlen         = sizeof(unsigned int),
87                 .mode           = 0644,
88                 .proc_handler   = proc_dointvec_minmax,
89                 .extra1         = &min_slot_table_size,
90                 .extra2         = &max_slot_table_size
91         },
92         {
93                 .procname       = "tcp_slot_table_entries",
94                 .data           = &xprt_tcp_slot_table_entries,
95                 .maxlen         = sizeof(unsigned int),
96                 .mode           = 0644,
97                 .proc_handler   = proc_dointvec_minmax,
98                 .extra1         = &min_slot_table_size,
99                 .extra2         = &max_slot_table_size
100         },
101         {
102                 .procname       = "min_resvport",
103                 .data           = &xprt_min_resvport,
104                 .maxlen         = sizeof(unsigned int),
105                 .mode           = 0644,
106                 .proc_handler   = proc_dointvec_minmax,
107                 .extra1         = &xprt_min_resvport_limit,
108                 .extra2         = &xprt_max_resvport_limit
109         },
110         {
111                 .procname       = "max_resvport",
112                 .data           = &xprt_max_resvport,
113                 .maxlen         = sizeof(unsigned int),
114                 .mode           = 0644,
115                 .proc_handler   = proc_dointvec_minmax,
116                 .extra1         = &xprt_min_resvport_limit,
117                 .extra2         = &xprt_max_resvport_limit
118         },
119         {
120                 .procname       = "tcp_fin_timeout",
121                 .data           = &xs_tcp_fin_timeout,
122                 .maxlen         = sizeof(xs_tcp_fin_timeout),
123                 .mode           = 0644,
124                 .proc_handler   = proc_dointvec_jiffies,
125         },
126         { },
127 };
128
129 static ctl_table sunrpc_table[] = {
130         {
131                 .procname       = "sunrpc",
132                 .mode           = 0555,
133                 .child          = xs_tunables_table
134         },
135         { },
136 };
137
138 #endif
139
140 /*
141  * Time out for an RPC UDP socket connect.  UDP socket connects are
142  * synchronous, but we set a timeout anyway in case of resource
143  * exhaustion on the local host.
144  */
145 #define XS_UDP_CONN_TO          (5U * HZ)
146
147 /*
148  * Wait duration for an RPC TCP connection to be established.  Solaris
149  * NFS over TCP uses 60 seconds, for example, which is in line with how
150  * long a server takes to reboot.
151  */
152 #define XS_TCP_CONN_TO          (60U * HZ)
153
154 /*
155  * Wait duration for a reply from the RPC portmapper.
156  */
157 #define XS_BIND_TO              (60U * HZ)
158
159 /*
160  * Delay if a UDP socket connect error occurs.  This is most likely some
161  * kind of resource problem on the local host.
162  */
163 #define XS_UDP_REEST_TO         (2U * HZ)
164
165 /*
166  * The reestablish timeout allows clients to delay for a bit before attempting
167  * to reconnect to a server that just dropped our connection.
168  *
169  * We implement an exponential backoff when trying to reestablish a TCP
170  * transport connection with the server.  Some servers like to drop a TCP
171  * connection when they are overworked, so we start with a short timeout and
172  * increase over time if the server is down or not responding.
173  */
174 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
175 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
176
177 /*
178  * TCP idle timeout; client drops the transport socket if it is idle
179  * for this long.  Note that we also timeout UDP sockets to prevent
180  * holding port numbers when there is no RPC traffic.
181  */
182 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
183
184 #ifdef RPC_DEBUG
185 # undef  RPC_DEBUG_DATA
186 # define RPCDBG_FACILITY        RPCDBG_TRANS
187 #endif
188
189 #ifdef RPC_DEBUG_DATA
190 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
191 {
192         u8 *buf = (u8 *) packet;
193         int j;
194
195         dprintk("RPC:       %s\n", msg);
196         for (j = 0; j < count && j < 128; j += 4) {
197                 if (!(j & 31)) {
198                         if (j)
199                                 dprintk("\n");
200                         dprintk("0x%04x ", j);
201                 }
202                 dprintk("%02x%02x%02x%02x ",
203                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
204         }
205         dprintk("\n");
206 }
207 #else
208 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
209 {
210         /* NOP */
211 }
212 #endif
213
214 struct sock_xprt {
215         struct rpc_xprt         xprt;
216
217         /*
218          * Network layer
219          */
220         struct socket *         sock;
221         struct sock *           inet;
222
223         /*
224          * State of TCP reply receive
225          */
226         __be32                  tcp_fraghdr,
227                                 tcp_xid;
228
229         u32                     tcp_offset,
230                                 tcp_reclen;
231
232         unsigned long           tcp_copied,
233                                 tcp_flags;
234
235         /*
236          * Connection of transports
237          */
238         struct delayed_work     connect_worker;
239         struct sockaddr_storage srcaddr;
240         unsigned short          srcport;
241
242         /*
243          * UDP socket buffer size parameters
244          */
245         size_t                  rcvsize,
246                                 sndsize;
247
248         /*
249          * Saved socket callback addresses
250          */
251         void                    (*old_data_ready)(struct sock *, int);
252         void                    (*old_state_change)(struct sock *);
253         void                    (*old_write_space)(struct sock *);
254         void                    (*old_error_report)(struct sock *);
255 };
256
257 /*
258  * TCP receive state flags
259  */
260 #define TCP_RCV_LAST_FRAG       (1UL << 0)
261 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
262 #define TCP_RCV_COPY_XID        (1UL << 2)
263 #define TCP_RCV_COPY_DATA       (1UL << 3)
264 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
265 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
266
267 /*
268  * TCP RPC flags
269  */
270 #define TCP_RPC_REPLY           (1UL << 6)
271
272 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
273 {
274         return (struct sockaddr *) &xprt->addr;
275 }
276
277 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
278 {
279         return (struct sockaddr_in *) &xprt->addr;
280 }
281
282 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
283 {
284         return (struct sockaddr_in6 *) &xprt->addr;
285 }
286
287 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
288 {
289         struct sockaddr *sap = xs_addr(xprt);
290         struct sockaddr_in6 *sin6;
291         struct sockaddr_in *sin;
292         char buf[128];
293
294         (void)rpc_ntop(sap, buf, sizeof(buf));
295         xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
296
297         switch (sap->sa_family) {
298         case AF_INET:
299                 sin = xs_addr_in(xprt);
300                 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
301                 break;
302         case AF_INET6:
303                 sin6 = xs_addr_in6(xprt);
304                 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
305                 break;
306         default:
307                 BUG();
308         }
309         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
310 }
311
312 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
313 {
314         struct sockaddr *sap = xs_addr(xprt);
315         char buf[128];
316
317         snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
318         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
319
320         snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
321         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
322 }
323
324 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
325                                      const char *protocol,
326                                      const char *netid)
327 {
328         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
329         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
330         xs_format_common_peer_addresses(xprt);
331         xs_format_common_peer_ports(xprt);
332 }
333
334 static void xs_update_peer_port(struct rpc_xprt *xprt)
335 {
336         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
337         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
338
339         xs_format_common_peer_ports(xprt);
340 }
341
342 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
343 {
344         unsigned int i;
345
346         for (i = 0; i < RPC_DISPLAY_MAX; i++)
347                 switch (i) {
348                 case RPC_DISPLAY_PROTO:
349                 case RPC_DISPLAY_NETID:
350                         continue;
351                 default:
352                         kfree(xprt->address_strings[i]);
353                 }
354 }
355
356 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
357
358 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
359 {
360         struct msghdr msg = {
361                 .msg_name       = addr,
362                 .msg_namelen    = addrlen,
363                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
364         };
365         struct kvec iov = {
366                 .iov_base       = vec->iov_base + base,
367                 .iov_len        = vec->iov_len - base,
368         };
369
370         if (iov.iov_len != 0)
371                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
372         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
373 }
374
375 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
376 {
377         struct page **ppage;
378         unsigned int remainder;
379         int err, sent = 0;
380
381         remainder = xdr->page_len - base;
382         base += xdr->page_base;
383         ppage = xdr->pages + (base >> PAGE_SHIFT);
384         base &= ~PAGE_MASK;
385         for(;;) {
386                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
387                 int flags = XS_SENDMSG_FLAGS;
388
389                 remainder -= len;
390                 if (remainder != 0 || more)
391                         flags |= MSG_MORE;
392                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
393                 if (remainder == 0 || err != len)
394                         break;
395                 sent += err;
396                 ppage++;
397                 base = 0;
398         }
399         if (sent == 0)
400                 return err;
401         if (err > 0)
402                 sent += err;
403         return sent;
404 }
405
406 /**
407  * xs_sendpages - write pages directly to a socket
408  * @sock: socket to send on
409  * @addr: UDP only -- address of destination
410  * @addrlen: UDP only -- length of destination address
411  * @xdr: buffer containing this request
412  * @base: starting position in the buffer
413  *
414  */
415 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
416 {
417         unsigned int remainder = xdr->len - base;
418         int err, sent = 0;
419
420         if (unlikely(!sock))
421                 return -ENOTSOCK;
422
423         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
424         if (base != 0) {
425                 addr = NULL;
426                 addrlen = 0;
427         }
428
429         if (base < xdr->head[0].iov_len || addr != NULL) {
430                 unsigned int len = xdr->head[0].iov_len - base;
431                 remainder -= len;
432                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
433                 if (remainder == 0 || err != len)
434                         goto out;
435                 sent += err;
436                 base = 0;
437         } else
438                 base -= xdr->head[0].iov_len;
439
440         if (base < xdr->page_len) {
441                 unsigned int len = xdr->page_len - base;
442                 remainder -= len;
443                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
444                 if (remainder == 0 || err != len)
445                         goto out;
446                 sent += err;
447                 base = 0;
448         } else
449                 base -= xdr->page_len;
450
451         if (base >= xdr->tail[0].iov_len)
452                 return sent;
453         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
454 out:
455         if (sent == 0)
456                 return err;
457         if (err > 0)
458                 sent += err;
459         return sent;
460 }
461
462 static void xs_nospace_callback(struct rpc_task *task)
463 {
464         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
465
466         transport->inet->sk_write_pending--;
467         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
468 }
469
470 /**
471  * xs_nospace - place task on wait queue if transmit was incomplete
472  * @task: task to put to sleep
473  *
474  */
475 static int xs_nospace(struct rpc_task *task)
476 {
477         struct rpc_rqst *req = task->tk_rqstp;
478         struct rpc_xprt *xprt = req->rq_xprt;
479         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
480         int ret = 0;
481
482         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
483                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
484                         req->rq_slen);
485
486         /* Protect against races with write_space */
487         spin_lock_bh(&xprt->transport_lock);
488
489         /* Don't race with disconnect */
490         if (xprt_connected(xprt)) {
491                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
492                         ret = -EAGAIN;
493                         /*
494                          * Notify TCP that we're limited by the application
495                          * window size
496                          */
497                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
498                         transport->inet->sk_write_pending++;
499                         /* ...and wait for more buffer space */
500                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
501                 }
502         } else {
503                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
504                 ret = -ENOTCONN;
505         }
506
507         spin_unlock_bh(&xprt->transport_lock);
508         return ret;
509 }
510
511 /**
512  * xs_udp_send_request - write an RPC request to a UDP socket
513  * @task: address of RPC task that manages the state of an RPC request
514  *
515  * Return values:
516  *        0:    The request has been sent
517  *   EAGAIN:    The socket was blocked, please call again later to
518  *              complete the request
519  * ENOTCONN:    Caller needs to invoke connect logic then call again
520  *    other:    Some other error occured, the request was not sent
521  */
522 static int xs_udp_send_request(struct rpc_task *task)
523 {
524         struct rpc_rqst *req = task->tk_rqstp;
525         struct rpc_xprt *xprt = req->rq_xprt;
526         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
527         struct xdr_buf *xdr = &req->rq_snd_buf;
528         int status;
529
530         xs_pktdump("packet data:",
531                                 req->rq_svec->iov_base,
532                                 req->rq_svec->iov_len);
533
534         if (!xprt_bound(xprt))
535                 return -ENOTCONN;
536         status = xs_sendpages(transport->sock,
537                               xs_addr(xprt),
538                               xprt->addrlen, xdr,
539                               req->rq_bytes_sent);
540
541         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
542                         xdr->len - req->rq_bytes_sent, status);
543
544         if (status >= 0) {
545                 task->tk_bytes_sent += status;
546                 if (status >= req->rq_slen)
547                         return 0;
548                 /* Still some bytes left; set up for a retry later. */
549                 status = -EAGAIN;
550         }
551
552         switch (status) {
553         case -ENOTSOCK:
554                 status = -ENOTCONN;
555                 /* Should we call xs_close() here? */
556                 break;
557         case -EAGAIN:
558                 status = xs_nospace(task);
559                 break;
560         default:
561                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
562                         -status);
563         case -ENETUNREACH:
564         case -EPIPE:
565         case -ECONNREFUSED:
566                 /* When the server has died, an ICMP port unreachable message
567                  * prompts ECONNREFUSED. */
568                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
569         }
570
571         return status;
572 }
573
574 /**
575  * xs_tcp_shutdown - gracefully shut down a TCP socket
576  * @xprt: transport
577  *
578  * Initiates a graceful shutdown of the TCP socket by calling the
579  * equivalent of shutdown(SHUT_WR);
580  */
581 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
582 {
583         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
584         struct socket *sock = transport->sock;
585
586         if (sock != NULL)
587                 kernel_sock_shutdown(sock, SHUT_WR);
588 }
589
590 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
591 {
592         u32 reclen = buf->len - sizeof(rpc_fraghdr);
593         rpc_fraghdr *base = buf->head[0].iov_base;
594         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
595 }
596
597 /**
598  * xs_tcp_send_request - write an RPC request to a TCP socket
599  * @task: address of RPC task that manages the state of an RPC request
600  *
601  * Return values:
602  *        0:    The request has been sent
603  *   EAGAIN:    The socket was blocked, please call again later to
604  *              complete the request
605  * ENOTCONN:    Caller needs to invoke connect logic then call again
606  *    other:    Some other error occured, the request was not sent
607  *
608  * XXX: In the case of soft timeouts, should we eventually give up
609  *      if sendmsg is not able to make progress?
610  */
611 static int xs_tcp_send_request(struct rpc_task *task)
612 {
613         struct rpc_rqst *req = task->tk_rqstp;
614         struct rpc_xprt *xprt = req->rq_xprt;
615         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
616         struct xdr_buf *xdr = &req->rq_snd_buf;
617         int status;
618
619         xs_encode_tcp_record_marker(&req->rq_snd_buf);
620
621         xs_pktdump("packet data:",
622                                 req->rq_svec->iov_base,
623                                 req->rq_svec->iov_len);
624
625         /* Continue transmitting the packet/record. We must be careful
626          * to cope with writespace callbacks arriving _after_ we have
627          * called sendmsg(). */
628         while (1) {
629                 status = xs_sendpages(transport->sock,
630                                         NULL, 0, xdr, req->rq_bytes_sent);
631
632                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
633                                 xdr->len - req->rq_bytes_sent, status);
634
635                 if (unlikely(status < 0))
636                         break;
637
638                 /* If we've sent the entire packet, immediately
639                  * reset the count of bytes sent. */
640                 req->rq_bytes_sent += status;
641                 task->tk_bytes_sent += status;
642                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
643                         req->rq_bytes_sent = 0;
644                         return 0;
645                 }
646
647                 if (status != 0)
648                         continue;
649                 status = -EAGAIN;
650                 break;
651         }
652
653         switch (status) {
654         case -ENOTSOCK:
655                 status = -ENOTCONN;
656                 /* Should we call xs_close() here? */
657                 break;
658         case -EAGAIN:
659                 status = xs_nospace(task);
660                 break;
661         default:
662                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
663                         -status);
664         case -ECONNRESET:
665         case -EPIPE:
666                 xs_tcp_shutdown(xprt);
667         case -ECONNREFUSED:
668         case -ENOTCONN:
669                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
670         }
671
672         return status;
673 }
674
675 /**
676  * xs_tcp_release_xprt - clean up after a tcp transmission
677  * @xprt: transport
678  * @task: rpc task
679  *
680  * This cleans up if an error causes us to abort the transmission of a request.
681  * In this case, the socket may need to be reset in order to avoid confusing
682  * the server.
683  */
684 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
685 {
686         struct rpc_rqst *req;
687
688         if (task != xprt->snd_task)
689                 return;
690         if (task == NULL)
691                 goto out_release;
692         req = task->tk_rqstp;
693         if (req->rq_bytes_sent == 0)
694                 goto out_release;
695         if (req->rq_bytes_sent == req->rq_snd_buf.len)
696                 goto out_release;
697         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
698 out_release:
699         xprt_release_xprt(xprt, task);
700 }
701
702 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
703 {
704         transport->old_data_ready = sk->sk_data_ready;
705         transport->old_state_change = sk->sk_state_change;
706         transport->old_write_space = sk->sk_write_space;
707         transport->old_error_report = sk->sk_error_report;
708 }
709
710 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
711 {
712         sk->sk_data_ready = transport->old_data_ready;
713         sk->sk_state_change = transport->old_state_change;
714         sk->sk_write_space = transport->old_write_space;
715         sk->sk_error_report = transport->old_error_report;
716 }
717
718 static void xs_reset_transport(struct sock_xprt *transport)
719 {
720         struct socket *sock = transport->sock;
721         struct sock *sk = transport->inet;
722
723         if (sk == NULL)
724                 return;
725
726         write_lock_bh(&sk->sk_callback_lock);
727         transport->inet = NULL;
728         transport->sock = NULL;
729
730         sk->sk_user_data = NULL;
731
732         xs_restore_old_callbacks(transport, sk);
733         write_unlock_bh(&sk->sk_callback_lock);
734
735         sk->sk_no_check = 0;
736
737         sock_release(sock);
738 }
739
740 /**
741  * xs_close - close a socket
742  * @xprt: transport
743  *
744  * This is used when all requests are complete; ie, no DRC state remains
745  * on the server we want to save.
746  *
747  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
748  * xs_reset_transport() zeroing the socket from underneath a writer.
749  */
750 static void xs_close(struct rpc_xprt *xprt)
751 {
752         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
753
754         dprintk("RPC:       xs_close xprt %p\n", xprt);
755
756         xs_reset_transport(transport);
757         xprt->reestablish_timeout = 0;
758
759         smp_mb__before_clear_bit();
760         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
761         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
762         clear_bit(XPRT_CLOSING, &xprt->state);
763         smp_mb__after_clear_bit();
764         xprt_disconnect_done(xprt);
765 }
766
767 static void xs_tcp_close(struct rpc_xprt *xprt)
768 {
769         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
770                 xs_close(xprt);
771         else
772                 xs_tcp_shutdown(xprt);
773 }
774
775 /**
776  * xs_destroy - prepare to shutdown a transport
777  * @xprt: doomed transport
778  *
779  */
780 static void xs_destroy(struct rpc_xprt *xprt)
781 {
782         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
783
784         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
785
786         cancel_rearming_delayed_work(&transport->connect_worker);
787
788         xs_close(xprt);
789         xs_free_peer_addresses(xprt);
790         kfree(xprt->slot);
791         kfree(xprt);
792         module_put(THIS_MODULE);
793 }
794
795 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
796 {
797         return (struct rpc_xprt *) sk->sk_user_data;
798 }
799
800 /**
801  * xs_udp_data_ready - "data ready" callback for UDP sockets
802  * @sk: socket with data to read
803  * @len: how much data to read
804  *
805  */
806 static void xs_udp_data_ready(struct sock *sk, int len)
807 {
808         struct rpc_task *task;
809         struct rpc_xprt *xprt;
810         struct rpc_rqst *rovr;
811         struct sk_buff *skb;
812         int err, repsize, copied;
813         u32 _xid;
814         __be32 *xp;
815
816         read_lock(&sk->sk_callback_lock);
817         dprintk("RPC:       xs_udp_data_ready...\n");
818         if (!(xprt = xprt_from_sock(sk)))
819                 goto out;
820
821         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
822                 goto out;
823
824         if (xprt->shutdown)
825                 goto dropit;
826
827         repsize = skb->len - sizeof(struct udphdr);
828         if (repsize < 4) {
829                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
830                 goto dropit;
831         }
832
833         /* Copy the XID from the skb... */
834         xp = skb_header_pointer(skb, sizeof(struct udphdr),
835                                 sizeof(_xid), &_xid);
836         if (xp == NULL)
837                 goto dropit;
838
839         /* Look up and lock the request corresponding to the given XID */
840         spin_lock(&xprt->transport_lock);
841         rovr = xprt_lookup_rqst(xprt, *xp);
842         if (!rovr)
843                 goto out_unlock;
844         task = rovr->rq_task;
845
846         if ((copied = rovr->rq_private_buf.buflen) > repsize)
847                 copied = repsize;
848
849         /* Suck it into the iovec, verify checksum if not done by hw. */
850         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
851                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
852                 goto out_unlock;
853         }
854
855         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
856
857         /* Something worked... */
858         dst_confirm(skb_dst(skb));
859
860         xprt_adjust_cwnd(task, copied);
861         xprt_update_rtt(task);
862         xprt_complete_rqst(task, copied);
863
864  out_unlock:
865         spin_unlock(&xprt->transport_lock);
866  dropit:
867         skb_free_datagram(sk, skb);
868  out:
869         read_unlock(&sk->sk_callback_lock);
870 }
871
872 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
873 {
874         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
875         size_t len, used;
876         char *p;
877
878         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
879         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
880         used = xdr_skb_read_bits(desc, p, len);
881         transport->tcp_offset += used;
882         if (used != len)
883                 return;
884
885         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
886         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
887                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
888         else
889                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
890         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
891
892         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
893         transport->tcp_offset = 0;
894
895         /* Sanity check of the record length */
896         if (unlikely(transport->tcp_reclen < 8)) {
897                 dprintk("RPC:       invalid TCP record fragment length\n");
898                 xprt_force_disconnect(xprt);
899                 return;
900         }
901         dprintk("RPC:       reading TCP record fragment of length %d\n",
902                         transport->tcp_reclen);
903 }
904
905 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
906 {
907         if (transport->tcp_offset == transport->tcp_reclen) {
908                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
909                 transport->tcp_offset = 0;
910                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
911                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
912                         transport->tcp_flags |= TCP_RCV_COPY_XID;
913                         transport->tcp_copied = 0;
914                 }
915         }
916 }
917
918 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
919 {
920         size_t len, used;
921         char *p;
922
923         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
924         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
925         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
926         used = xdr_skb_read_bits(desc, p, len);
927         transport->tcp_offset += used;
928         if (used != len)
929                 return;
930         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
931         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
932         transport->tcp_copied = 4;
933         dprintk("RPC:       reading %s XID %08x\n",
934                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
935                                                               : "request with",
936                         ntohl(transport->tcp_xid));
937         xs_tcp_check_fraghdr(transport);
938 }
939
940 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
941                                        struct xdr_skb_reader *desc)
942 {
943         size_t len, used;
944         u32 offset;
945         __be32  calldir;
946
947         /*
948          * We want transport->tcp_offset to be 8 at the end of this routine
949          * (4 bytes for the xid and 4 bytes for the call/reply flag).
950          * When this function is called for the first time,
951          * transport->tcp_offset is 4 (after having already read the xid).
952          */
953         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
954         len = sizeof(calldir) - offset;
955         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
956         used = xdr_skb_read_bits(desc, &calldir, len);
957         transport->tcp_offset += used;
958         if (used != len)
959                 return;
960         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
961         transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
962         transport->tcp_flags |= TCP_RCV_COPY_DATA;
963         /*
964          * We don't yet have the XDR buffer, so we will write the calldir
965          * out after we get the buffer from the 'struct rpc_rqst'
966          */
967         if (ntohl(calldir) == RPC_REPLY)
968                 transport->tcp_flags |= TCP_RPC_REPLY;
969         else
970                 transport->tcp_flags &= ~TCP_RPC_REPLY;
971         dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
972                         (transport->tcp_flags & TCP_RPC_REPLY) ?
973                                 "reply for" : "request with", calldir);
974         xs_tcp_check_fraghdr(transport);
975 }
976
977 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
978                                      struct xdr_skb_reader *desc,
979                                      struct rpc_rqst *req)
980 {
981         struct sock_xprt *transport =
982                                 container_of(xprt, struct sock_xprt, xprt);
983         struct xdr_buf *rcvbuf;
984         size_t len;
985         ssize_t r;
986
987         rcvbuf = &req->rq_private_buf;
988
989         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
990                 /*
991                  * Save the RPC direction in the XDR buffer
992                  */
993                 __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
994                                         htonl(RPC_REPLY) : 0;
995
996                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
997                         &calldir, sizeof(calldir));
998                 transport->tcp_copied += sizeof(calldir);
999                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1000         }
1001
1002         len = desc->count;
1003         if (len > transport->tcp_reclen - transport->tcp_offset) {
1004                 struct xdr_skb_reader my_desc;
1005
1006                 len = transport->tcp_reclen - transport->tcp_offset;
1007                 memcpy(&my_desc, desc, sizeof(my_desc));
1008                 my_desc.count = len;
1009                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1010                                           &my_desc, xdr_skb_read_bits);
1011                 desc->count -= r;
1012                 desc->offset += r;
1013         } else
1014                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1015                                           desc, xdr_skb_read_bits);
1016
1017         if (r > 0) {
1018                 transport->tcp_copied += r;
1019                 transport->tcp_offset += r;
1020         }
1021         if (r != len) {
1022                 /* Error when copying to the receive buffer,
1023                  * usually because we weren't able to allocate
1024                  * additional buffer pages. All we can do now
1025                  * is turn off TCP_RCV_COPY_DATA, so the request
1026                  * will not receive any additional updates,
1027                  * and time out.
1028                  * Any remaining data from this record will
1029                  * be discarded.
1030                  */
1031                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1032                 dprintk("RPC:       XID %08x truncated request\n",
1033                                 ntohl(transport->tcp_xid));
1034                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1035                                 "tcp_offset = %u, tcp_reclen = %u\n",
1036                                 xprt, transport->tcp_copied,
1037                                 transport->tcp_offset, transport->tcp_reclen);
1038                 return;
1039         }
1040
1041         dprintk("RPC:       XID %08x read %Zd bytes\n",
1042                         ntohl(transport->tcp_xid), r);
1043         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1044                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1045                         transport->tcp_offset, transport->tcp_reclen);
1046
1047         if (transport->tcp_copied == req->rq_private_buf.buflen)
1048                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1049         else if (transport->tcp_offset == transport->tcp_reclen) {
1050                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1051                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1052         }
1053
1054         return;
1055 }
1056
1057 /*
1058  * Finds the request corresponding to the RPC xid and invokes the common
1059  * tcp read code to read the data.
1060  */
1061 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1062                                     struct xdr_skb_reader *desc)
1063 {
1064         struct sock_xprt *transport =
1065                                 container_of(xprt, struct sock_xprt, xprt);
1066         struct rpc_rqst *req;
1067
1068         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1069
1070         /* Find and lock the request corresponding to this xid */
1071         spin_lock(&xprt->transport_lock);
1072         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1073         if (!req) {
1074                 dprintk("RPC:       XID %08x request not found!\n",
1075                                 ntohl(transport->tcp_xid));
1076                 spin_unlock(&xprt->transport_lock);
1077                 return -1;
1078         }
1079
1080         xs_tcp_read_common(xprt, desc, req);
1081
1082         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1083                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1084
1085         spin_unlock(&xprt->transport_lock);
1086         return 0;
1087 }
1088
1089 #if defined(CONFIG_NFS_V4_1)
1090 /*
1091  * Obtains an rpc_rqst previously allocated and invokes the common
1092  * tcp read code to read the data.  The result is placed in the callback
1093  * queue.
1094  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1095  * connection and return -1.
1096  */
1097 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1098                                        struct xdr_skb_reader *desc)
1099 {
1100         struct sock_xprt *transport =
1101                                 container_of(xprt, struct sock_xprt, xprt);
1102         struct rpc_rqst *req;
1103
1104         req = xprt_alloc_bc_request(xprt);
1105         if (req == NULL) {
1106                 printk(KERN_WARNING "Callback slot table overflowed\n");
1107                 xprt_force_disconnect(xprt);
1108                 return -1;
1109         }
1110
1111         req->rq_xid = transport->tcp_xid;
1112         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1113         xs_tcp_read_common(xprt, desc, req);
1114
1115         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1116                 struct svc_serv *bc_serv = xprt->bc_serv;
1117
1118                 /*
1119                  * Add callback request to callback list.  The callback
1120                  * service sleeps on the sv_cb_waitq waiting for new
1121                  * requests.  Wake it up after adding enqueing the
1122                  * request.
1123                  */
1124                 dprintk("RPC:       add callback request to list\n");
1125                 spin_lock(&bc_serv->sv_cb_lock);
1126                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1127                 spin_unlock(&bc_serv->sv_cb_lock);
1128                 wake_up(&bc_serv->sv_cb_waitq);
1129         }
1130
1131         req->rq_private_buf.len = transport->tcp_copied;
1132
1133         return 0;
1134 }
1135
1136 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1137                                         struct xdr_skb_reader *desc)
1138 {
1139         struct sock_xprt *transport =
1140                                 container_of(xprt, struct sock_xprt, xprt);
1141
1142         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1143                 xs_tcp_read_reply(xprt, desc) :
1144                 xs_tcp_read_callback(xprt, desc);
1145 }
1146 #else
1147 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1148                                         struct xdr_skb_reader *desc)
1149 {
1150         return xs_tcp_read_reply(xprt, desc);
1151 }
1152 #endif /* CONFIG_NFS_V4_1 */
1153
1154 /*
1155  * Read data off the transport.  This can be either an RPC_CALL or an
1156  * RPC_REPLY.  Relay the processing to helper functions.
1157  */
1158 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1159                                     struct xdr_skb_reader *desc)
1160 {
1161         struct sock_xprt *transport =
1162                                 container_of(xprt, struct sock_xprt, xprt);
1163
1164         if (_xs_tcp_read_data(xprt, desc) == 0)
1165                 xs_tcp_check_fraghdr(transport);
1166         else {
1167                 /*
1168                  * The transport_lock protects the request handling.
1169                  * There's no need to hold it to update the tcp_flags.
1170                  */
1171                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1172         }
1173 }
1174
1175 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1176 {
1177         size_t len;
1178
1179         len = transport->tcp_reclen - transport->tcp_offset;
1180         if (len > desc->count)
1181                 len = desc->count;
1182         desc->count -= len;
1183         desc->offset += len;
1184         transport->tcp_offset += len;
1185         dprintk("RPC:       discarded %Zu bytes\n", len);
1186         xs_tcp_check_fraghdr(transport);
1187 }
1188
1189 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1190 {
1191         struct rpc_xprt *xprt = rd_desc->arg.data;
1192         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1193         struct xdr_skb_reader desc = {
1194                 .skb    = skb,
1195                 .offset = offset,
1196                 .count  = len,
1197         };
1198
1199         dprintk("RPC:       xs_tcp_data_recv started\n");
1200         do {
1201                 /* Read in a new fragment marker if necessary */
1202                 /* Can we ever really expect to get completely empty fragments? */
1203                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1204                         xs_tcp_read_fraghdr(xprt, &desc);
1205                         continue;
1206                 }
1207                 /* Read in the xid if necessary */
1208                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1209                         xs_tcp_read_xid(transport, &desc);
1210                         continue;
1211                 }
1212                 /* Read in the call/reply flag */
1213                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1214                         xs_tcp_read_calldir(transport, &desc);
1215                         continue;
1216                 }
1217                 /* Read in the request data */
1218                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1219                         xs_tcp_read_data(xprt, &desc);
1220                         continue;
1221                 }
1222                 /* Skip over any trailing bytes on short reads */
1223                 xs_tcp_read_discard(transport, &desc);
1224         } while (desc.count);
1225         dprintk("RPC:       xs_tcp_data_recv done\n");
1226         return len - desc.count;
1227 }
1228
1229 /**
1230  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1231  * @sk: socket with data to read
1232  * @bytes: how much data to read
1233  *
1234  */
1235 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1236 {
1237         struct rpc_xprt *xprt;
1238         read_descriptor_t rd_desc;
1239         int read;
1240
1241         dprintk("RPC:       xs_tcp_data_ready...\n");
1242
1243         read_lock(&sk->sk_callback_lock);
1244         if (!(xprt = xprt_from_sock(sk)))
1245                 goto out;
1246         if (xprt->shutdown)
1247                 goto out;
1248
1249         /* Any data means we had a useful conversation, so
1250          * the we don't need to delay the next reconnect
1251          */
1252         if (xprt->reestablish_timeout)
1253                 xprt->reestablish_timeout = 0;
1254
1255         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1256         rd_desc.arg.data = xprt;
1257         do {
1258                 rd_desc.count = 65536;
1259                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1260         } while (read > 0);
1261 out:
1262         read_unlock(&sk->sk_callback_lock);
1263 }
1264
1265 /*
1266  * Do the equivalent of linger/linger2 handling for dealing with
1267  * broken servers that don't close the socket in a timely
1268  * fashion
1269  */
1270 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1271                 unsigned long timeout)
1272 {
1273         struct sock_xprt *transport;
1274
1275         if (xprt_test_and_set_connecting(xprt))
1276                 return;
1277         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1278         transport = container_of(xprt, struct sock_xprt, xprt);
1279         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1280                            timeout);
1281 }
1282
1283 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1284 {
1285         struct sock_xprt *transport;
1286
1287         transport = container_of(xprt, struct sock_xprt, xprt);
1288
1289         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1290             !cancel_delayed_work(&transport->connect_worker))
1291                 return;
1292         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1293         xprt_clear_connecting(xprt);
1294 }
1295
1296 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1297 {
1298         smp_mb__before_clear_bit();
1299         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1300         clear_bit(XPRT_CLOSING, &xprt->state);
1301         smp_mb__after_clear_bit();
1302         /* Mark transport as closed and wake up all pending tasks */
1303         xprt_disconnect_done(xprt);
1304 }
1305
1306 /**
1307  * xs_tcp_state_change - callback to handle TCP socket state changes
1308  * @sk: socket whose state has changed
1309  *
1310  */
1311 static void xs_tcp_state_change(struct sock *sk)
1312 {
1313         struct rpc_xprt *xprt;
1314
1315         read_lock(&sk->sk_callback_lock);
1316         if (!(xprt = xprt_from_sock(sk)))
1317                 goto out;
1318         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1319         dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1320                         sk->sk_state, xprt_connected(xprt),
1321                         sock_flag(sk, SOCK_DEAD),
1322                         sock_flag(sk, SOCK_ZAPPED));
1323
1324         switch (sk->sk_state) {
1325         case TCP_ESTABLISHED:
1326                 spin_lock_bh(&xprt->transport_lock);
1327                 if (!xprt_test_and_set_connected(xprt)) {
1328                         struct sock_xprt *transport = container_of(xprt,
1329                                         struct sock_xprt, xprt);
1330
1331                         /* Reset TCP record info */
1332                         transport->tcp_offset = 0;
1333                         transport->tcp_reclen = 0;
1334                         transport->tcp_copied = 0;
1335                         transport->tcp_flags =
1336                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1337
1338                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1339                 }
1340                 spin_unlock_bh(&xprt->transport_lock);
1341                 break;
1342         case TCP_FIN_WAIT1:
1343                 /* The client initiated a shutdown of the socket */
1344                 xprt->connect_cookie++;
1345                 xprt->reestablish_timeout = 0;
1346                 set_bit(XPRT_CLOSING, &xprt->state);
1347                 smp_mb__before_clear_bit();
1348                 clear_bit(XPRT_CONNECTED, &xprt->state);
1349                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1350                 smp_mb__after_clear_bit();
1351                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1352                 break;
1353         case TCP_CLOSE_WAIT:
1354                 /* The server initiated a shutdown of the socket */
1355                 xprt_force_disconnect(xprt);
1356         case TCP_SYN_SENT:
1357                 xprt->connect_cookie++;
1358         case TCP_CLOSING:
1359                 /*
1360                  * If the server closed down the connection, make sure that
1361                  * we back off before reconnecting
1362                  */
1363                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1364                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1365                 break;
1366         case TCP_LAST_ACK:
1367                 set_bit(XPRT_CLOSING, &xprt->state);
1368                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1369                 smp_mb__before_clear_bit();
1370                 clear_bit(XPRT_CONNECTED, &xprt->state);
1371                 smp_mb__after_clear_bit();
1372                 break;
1373         case TCP_CLOSE:
1374                 xs_tcp_cancel_linger_timeout(xprt);
1375                 xs_sock_mark_closed(xprt);
1376         }
1377  out:
1378         read_unlock(&sk->sk_callback_lock);
1379 }
1380
1381 /**
1382  * xs_error_report - callback mainly for catching socket errors
1383  * @sk: socket
1384  */
1385 static void xs_error_report(struct sock *sk)
1386 {
1387         struct rpc_xprt *xprt;
1388
1389         read_lock(&sk->sk_callback_lock);
1390         if (!(xprt = xprt_from_sock(sk)))
1391                 goto out;
1392         dprintk("RPC:       %s client %p...\n"
1393                         "RPC:       error %d\n",
1394                         __func__, xprt, sk->sk_err);
1395         xprt_wake_pending_tasks(xprt, -EAGAIN);
1396 out:
1397         read_unlock(&sk->sk_callback_lock);
1398 }
1399
1400 static void xs_write_space(struct sock *sk)
1401 {
1402         struct socket *sock;
1403         struct rpc_xprt *xprt;
1404
1405         if (unlikely(!(sock = sk->sk_socket)))
1406                 return;
1407         clear_bit(SOCK_NOSPACE, &sock->flags);
1408
1409         if (unlikely(!(xprt = xprt_from_sock(sk))))
1410                 return;
1411         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1412                 return;
1413
1414         xprt_write_space(xprt);
1415 }
1416
1417 /**
1418  * xs_udp_write_space - callback invoked when socket buffer space
1419  *                             becomes available
1420  * @sk: socket whose state has changed
1421  *
1422  * Called when more output buffer space is available for this socket.
1423  * We try not to wake our writers until they can make "significant"
1424  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1425  * with a bunch of small requests.
1426  */
1427 static void xs_udp_write_space(struct sock *sk)
1428 {
1429         read_lock(&sk->sk_callback_lock);
1430
1431         /* from net/core/sock.c:sock_def_write_space */
1432         if (sock_writeable(sk))
1433                 xs_write_space(sk);
1434
1435         read_unlock(&sk->sk_callback_lock);
1436 }
1437
1438 /**
1439  * xs_tcp_write_space - callback invoked when socket buffer space
1440  *                             becomes available
1441  * @sk: socket whose state has changed
1442  *
1443  * Called when more output buffer space is available for this socket.
1444  * We try not to wake our writers until they can make "significant"
1445  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1446  * with a bunch of small requests.
1447  */
1448 static void xs_tcp_write_space(struct sock *sk)
1449 {
1450         read_lock(&sk->sk_callback_lock);
1451
1452         /* from net/core/stream.c:sk_stream_write_space */
1453         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1454                 xs_write_space(sk);
1455
1456         read_unlock(&sk->sk_callback_lock);
1457 }
1458
1459 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1460 {
1461         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1462         struct sock *sk = transport->inet;
1463
1464         if (transport->rcvsize) {
1465                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1466                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1467         }
1468         if (transport->sndsize) {
1469                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1470                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1471                 sk->sk_write_space(sk);
1472         }
1473 }
1474
1475 /**
1476  * xs_udp_set_buffer_size - set send and receive limits
1477  * @xprt: generic transport
1478  * @sndsize: requested size of send buffer, in bytes
1479  * @rcvsize: requested size of receive buffer, in bytes
1480  *
1481  * Set socket send and receive buffer size limits.
1482  */
1483 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1484 {
1485         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1486
1487         transport->sndsize = 0;
1488         if (sndsize)
1489                 transport->sndsize = sndsize + 1024;
1490         transport->rcvsize = 0;
1491         if (rcvsize)
1492                 transport->rcvsize = rcvsize + 1024;
1493
1494         xs_udp_do_set_buffer_size(xprt);
1495 }
1496
1497 /**
1498  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1499  * @task: task that timed out
1500  *
1501  * Adjust the congestion window after a retransmit timeout has occurred.
1502  */
1503 static void xs_udp_timer(struct rpc_task *task)
1504 {
1505         xprt_adjust_cwnd(task, -ETIMEDOUT);
1506 }
1507
1508 static unsigned short xs_get_random_port(void)
1509 {
1510         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1511         unsigned short rand = (unsigned short) net_random() % range;
1512         return rand + xprt_min_resvport;
1513 }
1514
1515 /**
1516  * xs_set_port - reset the port number in the remote endpoint address
1517  * @xprt: generic transport
1518  * @port: new port number
1519  *
1520  */
1521 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1522 {
1523         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1524
1525         rpc_set_port(xs_addr(xprt), port);
1526         xs_update_peer_port(xprt);
1527 }
1528
1529 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1530 {
1531         unsigned short port = transport->srcport;
1532
1533         if (port == 0 && transport->xprt.resvport)
1534                 port = xs_get_random_port();
1535         return port;
1536 }
1537
1538 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1539 {
1540         if (transport->srcport != 0)
1541                 transport->srcport = 0;
1542         if (!transport->xprt.resvport)
1543                 return 0;
1544         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1545                 return xprt_max_resvport;
1546         return --port;
1547 }
1548
1549 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1550 {
1551         struct sockaddr_in myaddr = {
1552                 .sin_family = AF_INET,
1553         };
1554         struct sockaddr_in *sa;
1555         int err, nloop = 0;
1556         unsigned short port = xs_get_srcport(transport, sock);
1557         unsigned short last;
1558
1559         sa = (struct sockaddr_in *)&transport->srcaddr;
1560         myaddr.sin_addr = sa->sin_addr;
1561         do {
1562                 myaddr.sin_port = htons(port);
1563                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1564                                                 sizeof(myaddr));
1565                 if (port == 0)
1566                         break;
1567                 if (err == 0) {
1568                         transport->srcport = port;
1569                         break;
1570                 }
1571                 last = port;
1572                 port = xs_next_srcport(transport, sock, port);
1573                 if (port > last)
1574                         nloop++;
1575         } while (err == -EADDRINUSE && nloop != 2);
1576         dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1577                         __func__, &myaddr.sin_addr,
1578                         port, err ? "failed" : "ok", err);
1579         return err;
1580 }
1581
1582 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1583 {
1584         struct sockaddr_in6 myaddr = {
1585                 .sin6_family = AF_INET6,
1586         };
1587         struct sockaddr_in6 *sa;
1588         int err, nloop = 0;
1589         unsigned short port = xs_get_srcport(transport, sock);
1590         unsigned short last;
1591
1592         sa = (struct sockaddr_in6 *)&transport->srcaddr;
1593         myaddr.sin6_addr = sa->sin6_addr;
1594         do {
1595                 myaddr.sin6_port = htons(port);
1596                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1597                                                 sizeof(myaddr));
1598                 if (port == 0)
1599                         break;
1600                 if (err == 0) {
1601                         transport->srcport = port;
1602                         break;
1603                 }
1604                 last = port;
1605                 port = xs_next_srcport(transport, sock, port);
1606                 if (port > last)
1607                         nloop++;
1608         } while (err == -EADDRINUSE && nloop != 2);
1609         dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1610                 &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1611         return err;
1612 }
1613
1614 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1615 static struct lock_class_key xs_key[2];
1616 static struct lock_class_key xs_slock_key[2];
1617
1618 static inline void xs_reclassify_socket4(struct socket *sock)
1619 {
1620         struct sock *sk = sock->sk;
1621
1622         BUG_ON(sock_owned_by_user(sk));
1623         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1624                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1625 }
1626
1627 static inline void xs_reclassify_socket6(struct socket *sock)
1628 {
1629         struct sock *sk = sock->sk;
1630
1631         BUG_ON(sock_owned_by_user(sk));
1632         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1633                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1634 }
1635 #else
1636 static inline void xs_reclassify_socket4(struct socket *sock)
1637 {
1638 }
1639
1640 static inline void xs_reclassify_socket6(struct socket *sock)
1641 {
1642 }
1643 #endif
1644
1645 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1646 {
1647         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1648
1649         if (!transport->inet) {
1650                 struct sock *sk = sock->sk;
1651
1652                 write_lock_bh(&sk->sk_callback_lock);
1653
1654                 xs_save_old_callbacks(transport, sk);
1655
1656                 sk->sk_user_data = xprt;
1657                 sk->sk_data_ready = xs_udp_data_ready;
1658                 sk->sk_write_space = xs_udp_write_space;
1659                 sk->sk_error_report = xs_error_report;
1660                 sk->sk_no_check = UDP_CSUM_NORCV;
1661                 sk->sk_allocation = GFP_ATOMIC;
1662
1663                 xprt_set_connected(xprt);
1664
1665                 /* Reset to new socket */
1666                 transport->sock = sock;
1667                 transport->inet = sk;
1668
1669                 write_unlock_bh(&sk->sk_callback_lock);
1670         }
1671         xs_udp_do_set_buffer_size(xprt);
1672 }
1673
1674 /**
1675  * xs_udp_connect_worker4 - set up a UDP socket
1676  * @work: RPC transport to connect
1677  *
1678  * Invoked by a work queue tasklet.
1679  */
1680 static void xs_udp_connect_worker4(struct work_struct *work)
1681 {
1682         struct sock_xprt *transport =
1683                 container_of(work, struct sock_xprt, connect_worker.work);
1684         struct rpc_xprt *xprt = &transport->xprt;
1685         struct socket *sock = transport->sock;
1686         int err, status = -EIO;
1687
1688         if (xprt->shutdown)
1689                 goto out;
1690
1691         /* Start by resetting any existing state */
1692         xs_reset_transport(transport);
1693
1694         err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1695         if (err < 0) {
1696                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1697                 goto out;
1698         }
1699         xs_reclassify_socket4(sock);
1700
1701         if (xs_bind4(transport, sock)) {
1702                 sock_release(sock);
1703                 goto out;
1704         }
1705
1706         dprintk("RPC:       worker connecting xprt %p via %s to "
1707                                 "%s (port %s)\n", xprt,
1708                         xprt->address_strings[RPC_DISPLAY_PROTO],
1709                         xprt->address_strings[RPC_DISPLAY_ADDR],
1710                         xprt->address_strings[RPC_DISPLAY_PORT]);
1711
1712         xs_udp_finish_connecting(xprt, sock);
1713         status = 0;
1714 out:
1715         xprt_clear_connecting(xprt);
1716         xprt_wake_pending_tasks(xprt, status);
1717 }
1718
1719 /**
1720  * xs_udp_connect_worker6 - set up a UDP socket
1721  * @work: RPC transport to connect
1722  *
1723  * Invoked by a work queue tasklet.
1724  */
1725 static void xs_udp_connect_worker6(struct work_struct *work)
1726 {
1727         struct sock_xprt *transport =
1728                 container_of(work, struct sock_xprt, connect_worker.work);
1729         struct rpc_xprt *xprt = &transport->xprt;
1730         struct socket *sock = transport->sock;
1731         int err, status = -EIO;
1732
1733         if (xprt->shutdown)
1734                 goto out;
1735
1736         /* Start by resetting any existing state */
1737         xs_reset_transport(transport);
1738
1739         err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1740         if (err < 0) {
1741                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1742                 goto out;
1743         }
1744         xs_reclassify_socket6(sock);
1745
1746         if (xs_bind6(transport, sock) < 0) {
1747                 sock_release(sock);
1748                 goto out;
1749         }
1750
1751         dprintk("RPC:       worker connecting xprt %p via %s to "
1752                                 "%s (port %s)\n", xprt,
1753                         xprt->address_strings[RPC_DISPLAY_PROTO],
1754                         xprt->address_strings[RPC_DISPLAY_ADDR],
1755                         xprt->address_strings[RPC_DISPLAY_PORT]);
1756
1757         xs_udp_finish_connecting(xprt, sock);
1758         status = 0;
1759 out:
1760         xprt_clear_connecting(xprt);
1761         xprt_wake_pending_tasks(xprt, status);
1762 }
1763
1764 /*
1765  * We need to preserve the port number so the reply cache on the server can
1766  * find our cached RPC replies when we get around to reconnecting.
1767  */
1768 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1769 {
1770         int result;
1771         struct sockaddr any;
1772
1773         dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1774
1775         /*
1776          * Disconnect the transport socket by doing a connect operation
1777          * with AF_UNSPEC.  This should return immediately...
1778          */
1779         memset(&any, 0, sizeof(any));
1780         any.sa_family = AF_UNSPEC;
1781         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1782         if (!result)
1783                 xs_sock_mark_closed(xprt);
1784         else
1785                 dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1786                                 result);
1787 }
1788
1789 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1790 {
1791         unsigned int state = transport->inet->sk_state;
1792
1793         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1794                 return;
1795         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1796                 return;
1797         xs_abort_connection(xprt, transport);
1798 }
1799
1800 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1801 {
1802         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1803
1804         if (!transport->inet) {
1805                 struct sock *sk = sock->sk;
1806
1807                 write_lock_bh(&sk->sk_callback_lock);
1808
1809                 xs_save_old_callbacks(transport, sk);
1810
1811                 sk->sk_user_data = xprt;
1812                 sk->sk_data_ready = xs_tcp_data_ready;
1813                 sk->sk_state_change = xs_tcp_state_change;
1814                 sk->sk_write_space = xs_tcp_write_space;
1815                 sk->sk_error_report = xs_error_report;
1816                 sk->sk_allocation = GFP_ATOMIC;
1817
1818                 /* socket options */
1819                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1820                 sock_reset_flag(sk, SOCK_LINGER);
1821                 tcp_sk(sk)->linger2 = 0;
1822                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1823
1824                 xprt_clear_connected(xprt);
1825
1826                 /* Reset to new socket */
1827                 transport->sock = sock;
1828                 transport->inet = sk;
1829
1830                 write_unlock_bh(&sk->sk_callback_lock);
1831         }
1832
1833         if (!xprt_bound(xprt))
1834                 return -ENOTCONN;
1835
1836         /* Tell the socket layer to start connecting... */
1837         xprt->stat.connect_count++;
1838         xprt->stat.connect_start = jiffies;
1839         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1840 }
1841
1842 /**
1843  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1844  * @xprt: RPC transport to connect
1845  * @transport: socket transport to connect
1846  * @create_sock: function to create a socket of the correct type
1847  *
1848  * Invoked by a work queue tasklet.
1849  */
1850 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1851                 struct sock_xprt *transport,
1852                 struct socket *(*create_sock)(struct rpc_xprt *,
1853                         struct sock_xprt *))
1854 {
1855         struct socket *sock = transport->sock;
1856         int status = -EIO;
1857
1858         if (xprt->shutdown)
1859                 goto out;
1860
1861         if (!sock) {
1862                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1863                 sock = create_sock(xprt, transport);
1864                 if (IS_ERR(sock)) {
1865                         status = PTR_ERR(sock);
1866                         goto out;
1867                 }
1868         } else {
1869                 int abort_and_exit;
1870
1871                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1872                                 &xprt->state);
1873                 /* "close" the socket, preserving the local port */
1874                 xs_tcp_reuse_connection(xprt, transport);
1875
1876                 if (abort_and_exit)
1877                         goto out_eagain;
1878         }
1879
1880         dprintk("RPC:       worker connecting xprt %p via %s to "
1881                                 "%s (port %s)\n", xprt,
1882                         xprt->address_strings[RPC_DISPLAY_PROTO],
1883                         xprt->address_strings[RPC_DISPLAY_ADDR],
1884                         xprt->address_strings[RPC_DISPLAY_PORT]);
1885
1886         status = xs_tcp_finish_connecting(xprt, sock);
1887         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1888                         xprt, -status, xprt_connected(xprt),
1889                         sock->sk->sk_state);
1890         switch (status) {
1891         default:
1892                 printk("%s: connect returned unhandled error %d\n",
1893                         __func__, status);
1894         case -EADDRNOTAVAIL:
1895                 /* We're probably in TIME_WAIT. Get rid of existing socket,
1896                  * and retry
1897                  */
1898                 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1899                 xprt_force_disconnect(xprt);
1900                 break;
1901         case -ECONNREFUSED:
1902         case -ECONNRESET:
1903         case -ENETUNREACH:
1904                 /* retry with existing socket, after a delay */
1905         case 0:
1906         case -EINPROGRESS:
1907         case -EALREADY:
1908                 xprt_clear_connecting(xprt);
1909                 return;
1910         case -EINVAL:
1911                 /* Happens, for instance, if the user specified a link
1912                  * local IPv6 address without a scope-id.
1913                  */
1914                 goto out;
1915         }
1916 out_eagain:
1917         status = -EAGAIN;
1918 out:
1919         xprt_clear_connecting(xprt);
1920         xprt_wake_pending_tasks(xprt, status);
1921 }
1922
1923 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1924                 struct sock_xprt *transport)
1925 {
1926         struct socket *sock;
1927         int err;
1928
1929         /* start from scratch */
1930         err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1931         if (err < 0) {
1932                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1933                                 -err);
1934                 goto out_err;
1935         }
1936         xs_reclassify_socket4(sock);
1937
1938         if (xs_bind4(transport, sock) < 0) {
1939                 sock_release(sock);
1940                 goto out_err;
1941         }
1942         return sock;
1943 out_err:
1944         return ERR_PTR(-EIO);
1945 }
1946
1947 /**
1948  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1949  * @work: RPC transport to connect
1950  *
1951  * Invoked by a work queue tasklet.
1952  */
1953 static void xs_tcp_connect_worker4(struct work_struct *work)
1954 {
1955         struct sock_xprt *transport =
1956                 container_of(work, struct sock_xprt, connect_worker.work);
1957         struct rpc_xprt *xprt = &transport->xprt;
1958
1959         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1960 }
1961
1962 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1963                 struct sock_xprt *transport)
1964 {
1965         struct socket *sock;
1966         int err;
1967
1968         /* start from scratch */
1969         err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1970         if (err < 0) {
1971                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1972                                 -err);
1973                 goto out_err;
1974         }
1975         xs_reclassify_socket6(sock);
1976
1977         if (xs_bind6(transport, sock) < 0) {
1978                 sock_release(sock);
1979                 goto out_err;
1980         }
1981         return sock;
1982 out_err:
1983         return ERR_PTR(-EIO);
1984 }
1985
1986 /**
1987  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1988  * @work: RPC transport to connect
1989  *
1990  * Invoked by a work queue tasklet.
1991  */
1992 static void xs_tcp_connect_worker6(struct work_struct *work)
1993 {
1994         struct sock_xprt *transport =
1995                 container_of(work, struct sock_xprt, connect_worker.work);
1996         struct rpc_xprt *xprt = &transport->xprt;
1997
1998         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
1999 }
2000
2001 /**
2002  * xs_connect - connect a socket to a remote endpoint
2003  * @task: address of RPC task that manages state of connect request
2004  *
2005  * TCP: If the remote end dropped the connection, delay reconnecting.
2006  *
2007  * UDP socket connects are synchronous, but we use a work queue anyway
2008  * to guarantee that even unprivileged user processes can set up a
2009  * socket on a privileged port.
2010  *
2011  * If a UDP socket connect fails, the delay behavior here prevents
2012  * retry floods (hard mounts).
2013  */
2014 static void xs_connect(struct rpc_task *task)
2015 {
2016         struct rpc_xprt *xprt = task->tk_xprt;
2017         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2018
2019         if (xprt_test_and_set_connecting(xprt))
2020                 return;
2021
2022         if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2023                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2024                                 "seconds\n",
2025                                 xprt, xprt->reestablish_timeout / HZ);
2026                 queue_delayed_work(rpciod_workqueue,
2027                                    &transport->connect_worker,
2028                                    xprt->reestablish_timeout);
2029                 xprt->reestablish_timeout <<= 1;
2030                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2031                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2032                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2033                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2034         } else {
2035                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2036                 queue_delayed_work(rpciod_workqueue,
2037                                    &transport->connect_worker, 0);
2038         }
2039 }
2040
2041 static void xs_tcp_connect(struct rpc_task *task)
2042 {
2043         struct rpc_xprt *xprt = task->tk_xprt;
2044
2045         /* Exit if we need to wait for socket shutdown to complete */
2046         if (test_bit(XPRT_CLOSING, &xprt->state))
2047                 return;
2048         xs_connect(task);
2049 }
2050
2051 /**
2052  * xs_udp_print_stats - display UDP socket-specifc stats
2053  * @xprt: rpc_xprt struct containing statistics
2054  * @seq: output file
2055  *
2056  */
2057 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2058 {
2059         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2060
2061         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2062                         transport->srcport,
2063                         xprt->stat.bind_count,
2064                         xprt->stat.sends,
2065                         xprt->stat.recvs,
2066                         xprt->stat.bad_xids,
2067                         xprt->stat.req_u,
2068                         xprt->stat.bklog_u);
2069 }
2070
2071 /**
2072  * xs_tcp_print_stats - display TCP socket-specifc stats
2073  * @xprt: rpc_xprt struct containing statistics
2074  * @seq: output file
2075  *
2076  */
2077 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2078 {
2079         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2080         long idle_time = 0;
2081
2082         if (xprt_connected(xprt))
2083                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2084
2085         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2086                         transport->srcport,
2087                         xprt->stat.bind_count,
2088                         xprt->stat.connect_count,
2089                         xprt->stat.connect_time,
2090                         idle_time,
2091                         xprt->stat.sends,
2092                         xprt->stat.recvs,
2093                         xprt->stat.bad_xids,
2094                         xprt->stat.req_u,
2095                         xprt->stat.bklog_u);
2096 }
2097
2098 /*
2099  * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2100  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2101  * to use the server side send routines.
2102  */
2103 static void *bc_malloc(struct rpc_task *task, size_t size)
2104 {
2105         struct page *page;
2106         struct rpc_buffer *buf;
2107
2108         BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2109         page = alloc_page(GFP_KERNEL);
2110
2111         if (!page)
2112                 return NULL;
2113
2114         buf = page_address(page);
2115         buf->len = PAGE_SIZE;
2116
2117         return buf->data;
2118 }
2119
2120 /*
2121  * Free the space allocated in the bc_alloc routine
2122  */
2123 static void bc_free(void *buffer)
2124 {
2125         struct rpc_buffer *buf;
2126
2127         if (!buffer)
2128                 return;
2129
2130         buf = container_of(buffer, struct rpc_buffer, data);
2131         free_page((unsigned long)buf);
2132 }
2133
2134 /*
2135  * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2136  * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2137  */
2138 static int bc_sendto(struct rpc_rqst *req)
2139 {
2140         int len;
2141         struct xdr_buf *xbufp = &req->rq_snd_buf;
2142         struct rpc_xprt *xprt = req->rq_xprt;
2143         struct sock_xprt *transport =
2144                                 container_of(xprt, struct sock_xprt, xprt);
2145         struct socket *sock = transport->sock;
2146         unsigned long headoff;
2147         unsigned long tailoff;
2148
2149         /*
2150          * Set up the rpc header and record marker stuff
2151          */
2152         xs_encode_tcp_record_marker(xbufp);
2153
2154         tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2155         headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2156         len = svc_send_common(sock, xbufp,
2157                               virt_to_page(xbufp->head[0].iov_base), headoff,
2158                               xbufp->tail[0].iov_base, tailoff);
2159
2160         if (len != xbufp->len) {
2161                 printk(KERN_NOTICE "Error sending entire callback!\n");
2162                 len = -EAGAIN;
2163         }
2164
2165         return len;
2166 }
2167
2168 /*
2169  * The send routine. Borrows from svc_send
2170  */
2171 static int bc_send_request(struct rpc_task *task)
2172 {
2173         struct rpc_rqst *req = task->tk_rqstp;
2174         struct svc_xprt *xprt;
2175         struct svc_sock         *svsk;
2176         u32                     len;
2177
2178         dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2179         /*
2180          * Get the server socket associated with this callback xprt
2181          */
2182         xprt = req->rq_xprt->bc_xprt;
2183         svsk = container_of(xprt, struct svc_sock, sk_xprt);
2184
2185         /*
2186          * Grab the mutex to serialize data as the connection is shared
2187          * with the fore channel
2188          */
2189         if (!mutex_trylock(&xprt->xpt_mutex)) {
2190                 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2191                 if (!mutex_trylock(&xprt->xpt_mutex))
2192                         return -EAGAIN;
2193                 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2194         }
2195         if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2196                 len = -ENOTCONN;
2197         else
2198                 len = bc_sendto(req);
2199         mutex_unlock(&xprt->xpt_mutex);
2200
2201         if (len > 0)
2202                 len = 0;
2203
2204         return len;
2205 }
2206
2207 /*
2208  * The close routine. Since this is client initiated, we do nothing
2209  */
2210
2211 static void bc_close(struct rpc_xprt *xprt)
2212 {
2213         return;
2214 }
2215
2216 /*
2217  * The xprt destroy routine. Again, because this connection is client
2218  * initiated, we do nothing
2219  */
2220
2221 static void bc_destroy(struct rpc_xprt *xprt)
2222 {
2223         return;
2224 }
2225
2226 static struct rpc_xprt_ops xs_udp_ops = {
2227         .set_buffer_size        = xs_udp_set_buffer_size,
2228         .reserve_xprt           = xprt_reserve_xprt_cong,
2229         .release_xprt           = xprt_release_xprt_cong,
2230         .rpcbind                = rpcb_getport_async,
2231         .set_port               = xs_set_port,
2232         .connect                = xs_connect,
2233         .buf_alloc              = rpc_malloc,
2234         .buf_free               = rpc_free,
2235         .send_request           = xs_udp_send_request,
2236         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2237         .timer                  = xs_udp_timer,
2238         .release_request        = xprt_release_rqst_cong,
2239         .close                  = xs_close,
2240         .destroy                = xs_destroy,
2241         .print_stats            = xs_udp_print_stats,
2242 };
2243
2244 static struct rpc_xprt_ops xs_tcp_ops = {
2245         .reserve_xprt           = xprt_reserve_xprt,
2246         .release_xprt           = xs_tcp_release_xprt,
2247         .rpcbind                = rpcb_getport_async,
2248         .set_port               = xs_set_port,
2249         .connect                = xs_tcp_connect,
2250         .buf_alloc              = rpc_malloc,
2251         .buf_free               = rpc_free,
2252         .send_request           = xs_tcp_send_request,
2253         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2254         .close                  = xs_tcp_close,
2255         .destroy                = xs_destroy,
2256         .print_stats            = xs_tcp_print_stats,
2257 };
2258
2259 /*
2260  * The rpc_xprt_ops for the server backchannel
2261  */
2262
2263 static struct rpc_xprt_ops bc_tcp_ops = {
2264         .reserve_xprt           = xprt_reserve_xprt,
2265         .release_xprt           = xprt_release_xprt,
2266         .buf_alloc              = bc_malloc,
2267         .buf_free               = bc_free,
2268         .send_request           = bc_send_request,
2269         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2270         .close                  = bc_close,
2271         .destroy                = bc_destroy,
2272         .print_stats            = xs_tcp_print_stats,
2273 };
2274
2275 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2276                                       unsigned int slot_table_size)
2277 {
2278         struct rpc_xprt *xprt;
2279         struct sock_xprt *new;
2280
2281         if (args->addrlen > sizeof(xprt->addr)) {
2282                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2283                 return ERR_PTR(-EBADF);
2284         }
2285
2286         new = kzalloc(sizeof(*new), GFP_KERNEL);
2287         if (new == NULL) {
2288                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2289                                 "rpc_xprt\n");
2290                 return ERR_PTR(-ENOMEM);
2291         }
2292         xprt = &new->xprt;
2293
2294         xprt->max_reqs = slot_table_size;
2295         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2296         if (xprt->slot == NULL) {
2297                 kfree(xprt);
2298                 dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2299                                 "table\n");
2300                 return ERR_PTR(-ENOMEM);
2301         }
2302
2303         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2304         xprt->addrlen = args->addrlen;
2305         if (args->srcaddr)
2306                 memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2307
2308         return xprt;
2309 }
2310
2311 static const struct rpc_timeout xs_udp_default_timeout = {
2312         .to_initval = 5 * HZ,
2313         .to_maxval = 30 * HZ,
2314         .to_increment = 5 * HZ,
2315         .to_retries = 5,
2316 };
2317
2318 /**
2319  * xs_setup_udp - Set up transport to use a UDP socket
2320  * @args: rpc transport creation arguments
2321  *
2322  */
2323 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2324 {
2325         struct sockaddr *addr = args->dstaddr;
2326         struct rpc_xprt *xprt;
2327         struct sock_xprt *transport;
2328
2329         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2330         if (IS_ERR(xprt))
2331                 return xprt;
2332         transport = container_of(xprt, struct sock_xprt, xprt);
2333
2334         xprt->prot = IPPROTO_UDP;
2335         xprt->tsh_size = 0;
2336         /* XXX: header size can vary due to auth type, IPv6, etc. */
2337         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2338
2339         xprt->bind_timeout = XS_BIND_TO;
2340         xprt->connect_timeout = XS_UDP_CONN_TO;
2341         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2342         xprt->idle_timeout = XS_IDLE_DISC_TO;
2343
2344         xprt->ops = &xs_udp_ops;
2345
2346         xprt->timeout = &xs_udp_default_timeout;
2347
2348         switch (addr->sa_family) {
2349         case AF_INET:
2350                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2351                         xprt_set_bound(xprt);
2352
2353                 INIT_DELAYED_WORK(&transport->connect_worker,
2354                                         xs_udp_connect_worker4);
2355                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2356                 break;
2357         case AF_INET6:
2358                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2359                         xprt_set_bound(xprt);
2360
2361                 INIT_DELAYED_WORK(&transport->connect_worker,
2362                                         xs_udp_connect_worker6);
2363                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2364                 break;
2365         default:
2366                 kfree(xprt);
2367                 return ERR_PTR(-EAFNOSUPPORT);
2368         }
2369
2370         if (xprt_bound(xprt))
2371                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2372                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2373                                 xprt->address_strings[RPC_DISPLAY_PORT],
2374                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2375         else
2376                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2377                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2378                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2379
2380         if (try_module_get(THIS_MODULE))
2381                 return xprt;
2382
2383         kfree(xprt->slot);
2384         kfree(xprt);
2385         return ERR_PTR(-EINVAL);
2386 }
2387
2388 static const struct rpc_timeout xs_tcp_default_timeout = {
2389         .to_initval = 60 * HZ,
2390         .to_maxval = 60 * HZ,
2391         .to_retries = 2,
2392 };
2393
2394 /**
2395  * xs_setup_tcp - Set up transport to use a TCP socket
2396  * @args: rpc transport creation arguments
2397  *
2398  */
2399 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2400 {
2401         struct sockaddr *addr = args->dstaddr;
2402         struct rpc_xprt *xprt;
2403         struct sock_xprt *transport;
2404
2405         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2406         if (IS_ERR(xprt))
2407                 return xprt;
2408         transport = container_of(xprt, struct sock_xprt, xprt);
2409
2410         xprt->prot = IPPROTO_TCP;
2411         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2412         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2413
2414         xprt->bind_timeout = XS_BIND_TO;
2415         xprt->connect_timeout = XS_TCP_CONN_TO;
2416         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2417         xprt->idle_timeout = XS_IDLE_DISC_TO;
2418
2419         xprt->ops = &xs_tcp_ops;
2420         xprt->timeout = &xs_tcp_default_timeout;
2421
2422         switch (addr->sa_family) {
2423         case AF_INET:
2424                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2425                         xprt_set_bound(xprt);
2426
2427                 INIT_DELAYED_WORK(&transport->connect_worker,
2428                                         xs_tcp_connect_worker4);
2429                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2430                 break;
2431         case AF_INET6:
2432                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2433                         xprt_set_bound(xprt);
2434
2435                 INIT_DELAYED_WORK(&transport->connect_worker,
2436                                         xs_tcp_connect_worker6);
2437                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2438                 break;
2439         default:
2440                 kfree(xprt);
2441                 return ERR_PTR(-EAFNOSUPPORT);
2442         }
2443
2444         if (xprt_bound(xprt))
2445                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2446                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2447                                 xprt->address_strings[RPC_DISPLAY_PORT],
2448                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2449         else
2450                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2451                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2452                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2453
2454
2455         if (try_module_get(THIS_MODULE))
2456                 return xprt;
2457
2458         kfree(xprt->slot);
2459         kfree(xprt);
2460         return ERR_PTR(-EINVAL);
2461 }
2462
2463 /**
2464  * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2465  * @args: rpc transport creation arguments
2466  *
2467  */
2468 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2469 {
2470         struct sockaddr *addr = args->dstaddr;
2471         struct rpc_xprt *xprt;
2472         struct sock_xprt *transport;
2473         struct svc_sock *bc_sock;
2474
2475         if (!args->bc_xprt)
2476                 ERR_PTR(-EINVAL);
2477
2478         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2479         if (IS_ERR(xprt))
2480                 return xprt;
2481         transport = container_of(xprt, struct sock_xprt, xprt);
2482
2483         xprt->prot = IPPROTO_TCP;
2484         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2485         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2486         xprt->timeout = &xs_tcp_default_timeout;
2487
2488         /* backchannel */
2489         xprt_set_bound(xprt);
2490         xprt->bind_timeout = 0;
2491         xprt->connect_timeout = 0;
2492         xprt->reestablish_timeout = 0;
2493         xprt->idle_timeout = 0;
2494
2495         /*
2496          * The backchannel uses the same socket connection as the
2497          * forechannel
2498          */
2499         xprt->bc_xprt = args->bc_xprt;
2500         bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2501         bc_sock->sk_bc_xprt = xprt;
2502         transport->sock = bc_sock->sk_sock;
2503         transport->inet = bc_sock->sk_sk;
2504
2505         xprt->ops = &bc_tcp_ops;
2506
2507         switch (addr->sa_family) {
2508         case AF_INET:
2509                 xs_format_peer_addresses(xprt, "tcp",
2510                                          RPCBIND_NETID_TCP);
2511                 break;
2512         case AF_INET6:
2513                 xs_format_peer_addresses(xprt, "tcp",
2514                                    RPCBIND_NETID_TCP6);
2515                 break;
2516         default:
2517                 kfree(xprt);
2518                 return ERR_PTR(-EAFNOSUPPORT);
2519         }
2520
2521         if (xprt_bound(xprt))
2522                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2523                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2524                                 xprt->address_strings[RPC_DISPLAY_PORT],
2525                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2526         else
2527                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2528                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2529                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2530
2531         /*
2532          * Since we don't want connections for the backchannel, we set
2533          * the xprt status to connected
2534          */
2535         xprt_set_connected(xprt);
2536
2537
2538         if (try_module_get(THIS_MODULE))
2539                 return xprt;
2540         kfree(xprt->slot);
2541         kfree(xprt);
2542         return ERR_PTR(-EINVAL);
2543 }
2544
2545 static struct xprt_class        xs_udp_transport = {
2546         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2547         .name           = "udp",
2548         .owner          = THIS_MODULE,
2549         .ident          = XPRT_TRANSPORT_UDP,
2550         .setup          = xs_setup_udp,
2551 };
2552
2553 static struct xprt_class        xs_tcp_transport = {
2554         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2555         .name           = "tcp",
2556         .owner          = THIS_MODULE,
2557         .ident          = XPRT_TRANSPORT_TCP,
2558         .setup          = xs_setup_tcp,
2559 };
2560
2561 static struct xprt_class        xs_bc_tcp_transport = {
2562         .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2563         .name           = "tcp NFSv4.1 backchannel",
2564         .owner          = THIS_MODULE,
2565         .ident          = XPRT_TRANSPORT_BC_TCP,
2566         .setup          = xs_setup_bc_tcp,
2567 };
2568
2569 /**
2570  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2571  *
2572  */
2573 int init_socket_xprt(void)
2574 {
2575 #ifdef RPC_DEBUG
2576         if (!sunrpc_table_header)
2577                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2578 #endif
2579
2580         xprt_register_transport(&xs_udp_transport);
2581         xprt_register_transport(&xs_tcp_transport);
2582         xprt_register_transport(&xs_bc_tcp_transport);
2583
2584         return 0;
2585 }
2586
2587 /**
2588  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2589  *
2590  */
2591 void cleanup_socket_xprt(void)
2592 {
2593 #ifdef RPC_DEBUG
2594         if (sunrpc_table_header) {
2595                 unregister_sysctl_table(sunrpc_table_header);
2596                 sunrpc_table_header = NULL;
2597         }
2598 #endif
2599
2600         xprt_unregister_transport(&xs_udp_transport);
2601         xprt_unregister_transport(&xs_tcp_transport);
2602         xprt_unregister_transport(&xs_bc_tcp_transport);
2603 }
2604
2605 static int param_set_uint_minmax(const char *val, struct kernel_param *kp,
2606                 unsigned int min, unsigned int max)
2607 {
2608         unsigned long num;
2609         int ret;
2610
2611         if (!val)
2612                 return -EINVAL;
2613         ret = strict_strtoul(val, 0, &num);
2614         if (ret == -EINVAL || num < min || num > max)
2615                 return -EINVAL;
2616         *((unsigned int *)kp->arg) = num;
2617         return 0;
2618 }
2619
2620 static int param_set_portnr(const char *val, struct kernel_param *kp)
2621 {
2622         return param_set_uint_minmax(val, kp,
2623                         RPC_MIN_RESVPORT,
2624                         RPC_MAX_RESVPORT);
2625 }
2626
2627 static int param_get_portnr(char *buffer, struct kernel_param *kp)
2628 {
2629         return param_get_uint(buffer, kp);
2630 }
2631 #define param_check_portnr(name, p) \
2632         __param_check(name, p, unsigned int);
2633
2634 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2635 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2636
2637 static int param_set_slot_table_size(const char *val, struct kernel_param *kp)
2638 {
2639         return param_set_uint_minmax(val, kp,
2640                         RPC_MIN_SLOT_TABLE,
2641                         RPC_MAX_SLOT_TABLE);
2642 }
2643
2644 static int param_get_slot_table_size(char *buffer, struct kernel_param *kp)
2645 {
2646         return param_get_uint(buffer, kp);
2647 }
2648 #define param_check_slot_table_size(name, p) \
2649         __param_check(name, p, unsigned int);
2650
2651 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
2652                    slot_table_size, 0644);
2653 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
2654                    slot_table_size, 0644);
2655