67aa9510f09be520e0fd514dd5d17d7bc8fb1bbf
[linux-3.10.git] / net / rxrpc / ar-output.c
1 /* RxRPC packet transmission
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/circ_buf.h>
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18
19 int rxrpc_resend_timeout = 4;
20
21 static int rxrpc_send_data(struct kiocb *iocb,
22                            struct rxrpc_sock *rx,
23                            struct rxrpc_call *call,
24                            struct msghdr *msg, size_t len);
25
26 /*
27  * extract control messages from the sendmsg() control buffer
28  */
29 static int rxrpc_sendmsg_cmsg(struct rxrpc_sock *rx, struct msghdr *msg,
30                               unsigned long *user_call_ID,
31                               enum rxrpc_command *command,
32                               u32 *abort_code,
33                               bool server)
34 {
35         struct cmsghdr *cmsg;
36         int len;
37
38         *command = RXRPC_CMD_SEND_DATA;
39
40         if (msg->msg_controllen == 0)
41                 return -EINVAL;
42
43         for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
44                 if (!CMSG_OK(msg, cmsg))
45                         return -EINVAL;
46
47                 len = cmsg->cmsg_len - CMSG_ALIGN(sizeof(struct cmsghdr));
48                 _debug("CMSG %d, %d, %d",
49                        cmsg->cmsg_level, cmsg->cmsg_type, len);
50
51                 if (cmsg->cmsg_level != SOL_RXRPC)
52                         continue;
53
54                 switch (cmsg->cmsg_type) {
55                 case RXRPC_USER_CALL_ID:
56                         if (msg->msg_flags & MSG_CMSG_COMPAT) {
57                                 if (len != sizeof(u32))
58                                         return -EINVAL;
59                                 *user_call_ID = *(u32 *) CMSG_DATA(cmsg);
60                         } else {
61                                 if (len != sizeof(unsigned long))
62                                         return -EINVAL;
63                                 *user_call_ID = *(unsigned long *)
64                                         CMSG_DATA(cmsg);
65                         }
66                         _debug("User Call ID %lx", *user_call_ID);
67                         break;
68
69                 case RXRPC_ABORT:
70                         if (*command != RXRPC_CMD_SEND_DATA)
71                                 return -EINVAL;
72                         *command = RXRPC_CMD_SEND_ABORT;
73                         if (len != sizeof(*abort_code))
74                                 return -EINVAL;
75                         *abort_code = *(unsigned int *) CMSG_DATA(cmsg);
76                         _debug("Abort %x", *abort_code);
77                         if (*abort_code == 0)
78                                 return -EINVAL;
79                         break;
80
81                 case RXRPC_ACCEPT:
82                         if (*command != RXRPC_CMD_SEND_DATA)
83                                 return -EINVAL;
84                         *command = RXRPC_CMD_ACCEPT;
85                         if (len != 0)
86                                 return -EINVAL;
87                         if (!server)
88                                 return -EISCONN;
89                         break;
90
91                 default:
92                         return -EINVAL;
93                 }
94         }
95
96         _leave(" = 0");
97         return 0;
98 }
99
100 /*
101  * abort a call, sending an ABORT packet to the peer
102  */
103 static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
104 {
105         write_lock_bh(&call->state_lock);
106
107         if (call->state <= RXRPC_CALL_COMPLETE) {
108                 call->state = RXRPC_CALL_LOCALLY_ABORTED;
109                 call->abort_code = abort_code;
110                 set_bit(RXRPC_CALL_ABORT, &call->events);
111                 del_timer_sync(&call->resend_timer);
112                 del_timer_sync(&call->ack_timer);
113                 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
114                 clear_bit(RXRPC_CALL_ACK, &call->events);
115                 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
116                 schedule_work(&call->processor);
117         }
118
119         write_unlock_bh(&call->state_lock);
120 }
121
122 /*
123  * send a message forming part of a client call through an RxRPC socket
124  * - caller holds the socket locked
125  * - the socket may be either a client socket or a server socket
126  */
127 int rxrpc_client_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
128                          struct rxrpc_transport *trans, struct msghdr *msg,
129                          size_t len)
130 {
131         struct rxrpc_conn_bundle *bundle;
132         enum rxrpc_command cmd;
133         struct rxrpc_call *call;
134         unsigned long user_call_ID = 0;
135         struct key *key;
136         __be16 service_id;
137         u32 abort_code = 0;
138         int ret;
139
140         _enter("");
141
142         ASSERT(trans != NULL);
143
144         ret = rxrpc_sendmsg_cmsg(rx, msg, &user_call_ID, &cmd, &abort_code,
145                                  false);
146         if (ret < 0)
147                 return ret;
148
149         bundle = NULL;
150         if (trans) {
151                 service_id = rx->service_id;
152                 if (msg->msg_name) {
153                         struct sockaddr_rxrpc *srx =
154                                 (struct sockaddr_rxrpc *) msg->msg_name;
155                         service_id = htons(srx->srx_service);
156                 }
157                 key = rx->key;
158                 if (key && !rx->key->payload.data)
159                         key = NULL;
160                 bundle = rxrpc_get_bundle(rx, trans, key, service_id,
161                                           GFP_KERNEL);
162                 if (IS_ERR(bundle))
163                         return PTR_ERR(bundle);
164         }
165
166         call = rxrpc_get_client_call(rx, trans, bundle, user_call_ID,
167                                      abort_code == 0, GFP_KERNEL);
168         if (trans)
169                 rxrpc_put_bundle(trans, bundle);
170         if (IS_ERR(call)) {
171                 _leave(" = %ld", PTR_ERR(call));
172                 return PTR_ERR(call);
173         }
174
175         _debug("CALL %d USR %lx ST %d on CONN %p",
176                call->debug_id, call->user_call_ID, call->state, call->conn);
177
178         if (call->state >= RXRPC_CALL_COMPLETE) {
179                 /* it's too late for this call */
180                 ret = -ESHUTDOWN;
181         } else if (cmd == RXRPC_CMD_SEND_ABORT) {
182                 rxrpc_send_abort(call, abort_code);
183         } else if (cmd != RXRPC_CMD_SEND_DATA) {
184                 ret = -EINVAL;
185         } else if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST) {
186                 /* request phase complete for this client call */
187                 ret = -EPROTO;
188         } else {
189                 ret = rxrpc_send_data(iocb, rx, call, msg, len);
190         }
191
192         rxrpc_put_call(call);
193         _leave(" = %d", ret);
194         return ret;
195 }
196
197 /*
198  * send a message through a server socket
199  * - caller holds the socket locked
200  */
201 int rxrpc_server_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
202                          struct msghdr *msg, size_t len)
203 {
204         enum rxrpc_command cmd;
205         struct rxrpc_call *call;
206         unsigned long user_call_ID = 0;
207         u32 abort_code = 0;
208         int ret;
209
210         _enter("");
211
212         ret = rxrpc_sendmsg_cmsg(rx, msg, &user_call_ID, &cmd, &abort_code,
213                                  true);
214         if (ret < 0)
215                 return ret;
216
217         if (cmd == RXRPC_CMD_ACCEPT)
218                 return rxrpc_accept_call(rx, user_call_ID);
219
220         call = rxrpc_find_server_call(rx, user_call_ID);
221         if (!call)
222                 return -EBADSLT;
223         if (call->state >= RXRPC_CALL_COMPLETE) {
224                 ret = -ESHUTDOWN;
225                 goto out;
226         }
227
228         switch (cmd) {
229         case RXRPC_CMD_SEND_DATA:
230                 if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
231                     call->state != RXRPC_CALL_SERVER_ACK_REQUEST &&
232                     call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
233                         /* Tx phase not yet begun for this call */
234                         ret = -EPROTO;
235                         break;
236                 }
237
238                 ret = rxrpc_send_data(iocb, rx, call, msg, len);
239                 break;
240
241         case RXRPC_CMD_SEND_ABORT:
242                 rxrpc_send_abort(call, abort_code);
243                 break;
244         default:
245                 BUG();
246         }
247
248         out:
249         rxrpc_put_call(call);
250         _leave(" = %d", ret);
251         return ret;
252 }
253
254 /*
255  * send a packet through the transport endpoint
256  */
257 int rxrpc_send_packet(struct rxrpc_transport *trans, struct sk_buff *skb)
258 {
259         struct kvec iov[1];
260         struct msghdr msg;
261         int ret, opt;
262
263         _enter(",{%d}", skb->len);
264
265         iov[0].iov_base = skb->head;
266         iov[0].iov_len = skb->len;
267
268         msg.msg_name = &trans->peer->srx.transport.sin;
269         msg.msg_namelen = sizeof(trans->peer->srx.transport.sin);
270         msg.msg_control = NULL;
271         msg.msg_controllen = 0;
272         msg.msg_flags = 0;
273
274         /* send the packet with the don't fragment bit set if we currently
275          * think it's small enough */
276         if (skb->len - sizeof(struct rxrpc_header) < trans->peer->maxdata) {
277                 down_read(&trans->local->defrag_sem);
278                 /* send the packet by UDP
279                  * - returns -EMSGSIZE if UDP would have to fragment the packet
280                  *   to go out of the interface
281                  *   - in which case, we'll have processed the ICMP error
282                  *     message and update the peer record
283                  */
284                 ret = kernel_sendmsg(trans->local->socket, &msg, iov, 1,
285                                      iov[0].iov_len);
286
287                 up_read(&trans->local->defrag_sem);
288                 if (ret == -EMSGSIZE)
289                         goto send_fragmentable;
290
291                 _leave(" = %d [%u]", ret, trans->peer->maxdata);
292                 return ret;
293         }
294
295 send_fragmentable:
296         /* attempt to send this message with fragmentation enabled */
297         _debug("send fragment");
298
299         down_write(&trans->local->defrag_sem);
300         opt = IP_PMTUDISC_DONT;
301         ret = kernel_setsockopt(trans->local->socket, SOL_IP, IP_MTU_DISCOVER,
302                                 (char *) &opt, sizeof(opt));
303         if (ret == 0) {
304                 ret = kernel_sendmsg(trans->local->socket, &msg, iov, 1,
305                                      iov[0].iov_len);
306
307                 opt = IP_PMTUDISC_DO;
308                 kernel_setsockopt(trans->local->socket, SOL_IP,
309                                   IP_MTU_DISCOVER, (char *) &opt, sizeof(opt));
310         }
311
312         up_write(&trans->local->defrag_sem);
313         _leave(" = %d [frag %u]", ret, trans->peer->maxdata);
314         return ret;
315 }
316
317 /*
318  * wait for space to appear in the transmit/ACK window
319  * - caller holds the socket locked
320  */
321 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
322                                     struct rxrpc_call *call,
323                                     long *timeo)
324 {
325         DECLARE_WAITQUEUE(myself, current);
326         int ret;
327
328         _enter(",{%d},%ld",
329                CIRC_SPACE(call->acks_head, call->acks_tail, call->acks_winsz),
330                *timeo);
331
332         add_wait_queue(&call->tx_waitq, &myself);
333
334         for (;;) {
335                 set_current_state(TASK_INTERRUPTIBLE);
336                 ret = 0;
337                 if (CIRC_SPACE(call->acks_head, call->acks_tail,
338                                call->acks_winsz) > 0)
339                         break;
340                 if (signal_pending(current)) {
341                         ret = sock_intr_errno(*timeo);
342                         break;
343                 }
344
345                 release_sock(&rx->sk);
346                 *timeo = schedule_timeout(*timeo);
347                 lock_sock(&rx->sk);
348         }
349
350         remove_wait_queue(&call->tx_waitq, &myself);
351         set_current_state(TASK_RUNNING);
352         _leave(" = %d", ret);
353         return ret;
354 }
355
356 /*
357  * attempt to schedule an instant Tx resend
358  */
359 static inline void rxrpc_instant_resend(struct rxrpc_call *call)
360 {
361         read_lock_bh(&call->state_lock);
362         if (try_to_del_timer_sync(&call->resend_timer) >= 0) {
363                 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
364                 if (call->state < RXRPC_CALL_COMPLETE &&
365                     !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
366                         schedule_work(&call->processor);
367         }
368         read_unlock_bh(&call->state_lock);
369 }
370
371 /*
372  * queue a packet for transmission, set the resend timer and attempt
373  * to send the packet immediately
374  */
375 static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
376                                bool last)
377 {
378         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
379         int ret;
380
381         _net("queue skb %p [%d]", skb, call->acks_head);
382
383         ASSERT(call->acks_window != NULL);
384         call->acks_window[call->acks_head] = (unsigned long) skb;
385         smp_wmb();
386         call->acks_head = (call->acks_head + 1) & (call->acks_winsz - 1);
387
388         if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
389                 _debug("________awaiting reply/ACK__________");
390                 write_lock_bh(&call->state_lock);
391                 switch (call->state) {
392                 case RXRPC_CALL_CLIENT_SEND_REQUEST:
393                         call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
394                         break;
395                 case RXRPC_CALL_SERVER_ACK_REQUEST:
396                         call->state = RXRPC_CALL_SERVER_SEND_REPLY;
397                         if (!last)
398                                 break;
399                 case RXRPC_CALL_SERVER_SEND_REPLY:
400                         call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
401                         break;
402                 default:
403                         break;
404                 }
405                 write_unlock_bh(&call->state_lock);
406         }
407
408         _proto("Tx DATA %%%u { #%u }",
409                ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
410
411         sp->need_resend = 0;
412         sp->resend_at = jiffies + rxrpc_resend_timeout * HZ;
413         if (!test_and_set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags)) {
414                 _debug("run timer");
415                 call->resend_timer.expires = sp->resend_at;
416                 add_timer(&call->resend_timer);
417         }
418
419         /* attempt to cancel the rx-ACK timer, deferring reply transmission if
420          * we're ACK'ing the request phase of an incoming call */
421         ret = -EAGAIN;
422         if (try_to_del_timer_sync(&call->ack_timer) >= 0) {
423                 /* the packet may be freed by rxrpc_process_call() before this
424                  * returns */
425                 ret = rxrpc_send_packet(call->conn->trans, skb);
426                 _net("sent skb %p", skb);
427         } else {
428                 _debug("failed to delete ACK timer");
429         }
430
431         if (ret < 0) {
432                 _debug("need instant resend %d", ret);
433                 sp->need_resend = 1;
434                 rxrpc_instant_resend(call);
435         }
436
437         _leave("");
438 }
439
440 /*
441  * send data through a socket
442  * - must be called in process context
443  * - caller holds the socket locked
444  */
445 static int rxrpc_send_data(struct kiocb *iocb,
446                            struct rxrpc_sock *rx,
447                            struct rxrpc_call *call,
448                            struct msghdr *msg, size_t len)
449 {
450         struct rxrpc_skb_priv *sp;
451         unsigned char __user *from;
452         struct sk_buff *skb;
453         struct iovec *iov;
454         struct sock *sk = &rx->sk;
455         long timeo;
456         bool more;
457         int ret, ioc, segment, copied;
458
459         _enter(",,,{%zu},%zu", msg->msg_iovlen, len);
460
461         timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
462
463         /* this should be in poll */
464         clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
465
466         if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
467                 return -EPIPE;
468
469         iov = msg->msg_iov;
470         ioc = msg->msg_iovlen - 1;
471         from = iov->iov_base;
472         segment = iov->iov_len;
473         iov++;
474         more = msg->msg_flags & MSG_MORE;
475
476         skb = call->tx_pending;
477         call->tx_pending = NULL;
478
479         copied = 0;
480         do {
481                 int copy;
482
483                 if (segment > len)
484                         segment = len;
485
486                 _debug("SEGMENT %d @%p", segment, from);
487
488                 if (!skb) {
489                         size_t size, chunk, max, space;
490
491                         _debug("alloc");
492
493                         if (CIRC_SPACE(call->acks_head, call->acks_tail,
494                                        call->acks_winsz) <= 0) {
495                                 ret = -EAGAIN;
496                                 if (msg->msg_flags & MSG_DONTWAIT)
497                                         goto maybe_error;
498                                 ret = rxrpc_wait_for_tx_window(rx, call,
499                                                                &timeo);
500                                 if (ret < 0)
501                                         goto maybe_error;
502                         }
503
504                         max = call->conn->trans->peer->maxdata;
505                         max -= call->conn->security_size;
506                         max &= ~(call->conn->size_align - 1UL);
507
508                         chunk = max;
509                         if (chunk > len)
510                                 chunk = len;
511
512                         space = chunk + call->conn->size_align;
513                         space &= ~(call->conn->size_align - 1UL);
514
515                         size = space + call->conn->header_size;
516
517                         _debug("SIZE: %zu/%zu/%zu", chunk, space, size);
518
519                         /* create a buffer that we can retain until it's ACK'd */
520                         skb = sock_alloc_send_skb(
521                                 sk, size, msg->msg_flags & MSG_DONTWAIT, &ret);
522                         if (!skb)
523                                 goto maybe_error;
524
525                         rxrpc_new_skb(skb);
526
527                         _debug("ALLOC SEND %p", skb);
528
529                         ASSERTCMP(skb->mark, ==, 0);
530
531                         _debug("HS: %u", call->conn->header_size);
532                         skb_reserve(skb, call->conn->header_size);
533                         skb->len += call->conn->header_size;
534
535                         sp = rxrpc_skb(skb);
536                         sp->remain = chunk;
537                         if (sp->remain > skb_tailroom(skb))
538                                 sp->remain = skb_tailroom(skb);
539
540                         _net("skb: hr %d, tr %d, hl %d, rm %d",
541                                skb_headroom(skb),
542                                skb_tailroom(skb),
543                                skb_headlen(skb),
544                                sp->remain);
545
546                         skb->ip_summed = CHECKSUM_UNNECESSARY;
547                 }
548
549                 _debug("append");
550                 sp = rxrpc_skb(skb);
551
552                 /* append next segment of data to the current buffer */
553                 copy = skb_tailroom(skb);
554                 ASSERTCMP(copy, >, 0);
555                 if (copy > segment)
556                         copy = segment;
557                 if (copy > sp->remain)
558                         copy = sp->remain;
559
560                 _debug("add");
561                 ret = skb_add_data(skb, from, copy);
562                 _debug("added");
563                 if (ret < 0)
564                         goto efault;
565                 sp->remain -= copy;
566                 skb->mark += copy;
567
568                 len -= copy;
569                 segment -= copy;
570                 from += copy;
571                 while (segment == 0 && ioc > 0) {
572                         from = iov->iov_base;
573                         segment = iov->iov_len;
574                         iov++;
575                         ioc--;
576                 }
577                 if (len == 0) {
578                         segment = 0;
579                         ioc = 0;
580                 }
581
582                 /* check for the far side aborting the call or a network error
583                  * occurring */
584                 if (call->state > RXRPC_CALL_COMPLETE)
585                         goto call_aborted;
586
587                 /* add the packet to the send queue if it's now full */
588                 if (sp->remain <= 0 || (segment == 0 && !more)) {
589                         struct rxrpc_connection *conn = call->conn;
590                         size_t pad;
591
592                         /* pad out if we're using security */
593                         if (conn->security) {
594                                 pad = conn->security_size + skb->mark;
595                                 pad = conn->size_align - pad;
596                                 pad &= conn->size_align - 1;
597                                 _debug("pad %zu", pad);
598                                 if (pad)
599                                         memset(skb_put(skb, pad), 0, pad);
600                         }
601
602                         sp->hdr.epoch = conn->epoch;
603                         sp->hdr.cid = call->cid;
604                         sp->hdr.callNumber = call->call_id;
605                         sp->hdr.seq =
606                                 htonl(atomic_inc_return(&call->sequence));
607                         sp->hdr.serial =
608                                 htonl(atomic_inc_return(&conn->serial));
609                         sp->hdr.type = RXRPC_PACKET_TYPE_DATA;
610                         sp->hdr.userStatus = 0;
611                         sp->hdr.securityIndex = conn->security_ix;
612                         sp->hdr._rsvd = 0;
613                         sp->hdr.serviceId = conn->service_id;
614
615                         sp->hdr.flags = conn->out_clientflag;
616                         if (len == 0 && !more)
617                                 sp->hdr.flags |= RXRPC_LAST_PACKET;
618                         else if (CIRC_SPACE(call->acks_head, call->acks_tail,
619                                             call->acks_winsz) > 1)
620                                 sp->hdr.flags |= RXRPC_MORE_PACKETS;
621
622                         ret = rxrpc_secure_packet(
623                                 call, skb, skb->mark,
624                                 skb->head + sizeof(struct rxrpc_header));
625                         if (ret < 0)
626                                 goto out;
627
628                         memcpy(skb->head, &sp->hdr,
629                                sizeof(struct rxrpc_header));
630                         rxrpc_queue_packet(call, skb, segment == 0 && !more);
631                         skb = NULL;
632                 }
633
634         } while (segment > 0);
635
636 out:
637         call->tx_pending = skb;
638         _leave(" = %d", ret);
639         return ret;
640
641 call_aborted:
642         rxrpc_free_skb(skb);
643         if (call->state == RXRPC_CALL_NETWORK_ERROR)
644                 ret = call->conn->trans->peer->net_error;
645         else
646                 ret = -ECONNABORTED;
647         _leave(" = %d", ret);
648         return ret;
649
650 maybe_error:
651         if (copied)
652                 ret = copied;
653         goto out;
654
655 efault:
656         ret = -EFAULT;
657         goto out;
658 }