/* call.c: Rx call routines * * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. */ #include #include #include #include #include #include #include #include #include #include "internal.h" __RXACCT_DECL(atomic_t rxrpc_call_count); __RXACCT_DECL(atomic_t rxrpc_message_count); LIST_HEAD(rxrpc_calls); DECLARE_RWSEM(rxrpc_calls_sem); unsigned rxrpc_call_rcv_timeout = HZ/3; static unsigned rxrpc_call_acks_timeout = HZ/3; static unsigned rxrpc_call_dfr_ack_timeout = HZ/20; static unsigned short rxrpc_call_max_resend = HZ/10; const char *rxrpc_call_states[] = { "COMPLETE", "ERROR", "SRVR_RCV_OPID", "SRVR_RCV_ARGS", "SRVR_GOT_ARGS", "SRVR_SND_REPLY", "SRVR_RCV_FINAL_ACK", "CLNT_SND_ARGS", "CLNT_RCV_REPLY", "CLNT_GOT_REPLY" }; const char *rxrpc_call_error_states[] = { "NO_ERROR", "LOCAL_ABORT", "PEER_ABORT", "LOCAL_ERROR", "REMOTE_ERROR" }; const char *rxrpc_pkts[] = { "?00", "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", "?09", "?10", "?11", "?12", "?13", "?14", "?15" }; static const char *rxrpc_acks[] = { "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", "-?-" }; static const char _acktype[] = "NA-"; static void rxrpc_call_receive_packet(struct rxrpc_call *call); static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg); static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg); static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t higest); static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); static int __rxrpc_call_read_data(struct rxrpc_call *call); static int rxrpc_call_record_ACK(struct rxrpc_call *call, struct rxrpc_message *msg, rxrpc_seq_t seq, size_t count); static int rxrpc_call_flush(struct rxrpc_call *call); #define _state(call) \ _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]); static void rxrpc_call_default_attn_func(struct rxrpc_call *call) { wake_up(&call->waitq); } static void rxrpc_call_default_error_func(struct rxrpc_call *call) { wake_up(&call->waitq); } static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) { switch (call->app_err_state) { case RXRPC_ESTATE_LOCAL_ABORT: call->app_abort_code = -call->app_errno; case RXRPC_ESTATE_PEER_ABORT: call->app_errno = -ECONNABORTED; default: break; } } static void __rxrpc_call_acks_timeout(unsigned long _call) { struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); call->flags |= RXRPC_CALL_ACKS_TIMO; rxrpc_krxiod_queue_call(call); } static void __rxrpc_call_rcv_timeout(unsigned long _call) { struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); call->flags |= RXRPC_CALL_RCV_TIMO; rxrpc_krxiod_queue_call(call); } static void __rxrpc_call_ackr_timeout(unsigned long _call) { struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); call->flags |= RXRPC_CALL_ACKR_TIMO; rxrpc_krxiod_queue_call(call); } /*****************************************************************************/ /* * calculate a timeout based on an RTT value */ static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, unsigned long val) { unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); expiry += 10; if (expiry < HZ / 25) expiry = HZ / 25; if (expiry > HZ) expiry = HZ; _leave(" = %lu jiffies", expiry); return jiffies + expiry; } /* end __rxrpc_rtt_based_timeout() */ /*****************************************************************************/ /* * create a new call record */ static inline int __rxrpc_create_call(struct rxrpc_connection *conn, struct rxrpc_call **_call) { struct rxrpc_call *call; _enter("%p", conn); /* allocate and initialise a call record */ call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); if (!call) { _leave(" ENOMEM"); return -ENOMEM; } atomic_set(&call->usage, 1); init_waitqueue_head(&call->waitq); spin_lock_init(&call->lock); INIT_LIST_HEAD(&call->link); INIT_LIST_HEAD(&call->acks_pendq); INIT_LIST_HEAD(&call->rcv_receiveq); INIT_LIST_HEAD(&call->rcv_krxiodq_lk); INIT_LIST_HEAD(&call->app_readyq); INIT_LIST_HEAD(&call->app_unreadyq); INIT_LIST_HEAD(&call->app_link); INIT_LIST_HEAD(&call->app_attn_link); init_timer(&call->acks_timeout); call->acks_timeout.data = (unsigned long) call; call->acks_timeout.function = __rxrpc_call_acks_timeout; init_timer(&call->rcv_timeout); call->rcv_timeout.data = (unsigned long) call; call->rcv_timeout.function = __rxrpc_call_rcv_timeout; init_timer(&call->ackr_dfr_timo); call->ackr_dfr_timo.data = (unsigned long) call; call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; call->conn = conn; call->ackr_win_bot = 1; call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; call->ackr_prev_seq = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_attn_func = rxrpc_call_default_attn_func; call->app_error_func = rxrpc_call_default_error_func; call->app_aemap_func = rxrpc_call_default_aemap_func; call->app_scr_alloc = call->app_scratch; call->cjif = jiffies; _leave(" = 0 (%p)", call); *_call = call; return 0; } /* end __rxrpc_create_call() */ /*****************************************************************************/ /* * create a new call record for outgoing calls */ int rxrpc_create_call(struct rxrpc_connection *conn, rxrpc_call_attn_func_t attn, rxrpc_call_error_func_t error, rxrpc_call_aemap_func_t aemap, struct rxrpc_call **_call) { DECLARE_WAITQUEUE(myself, current); struct rxrpc_call *call; int ret, cix, loop; _enter("%p", conn); /* allocate and initialise a call record */ ret = __rxrpc_create_call(conn, &call); if (ret < 0) { _leave(" = %d", ret); return ret; } call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; if (attn) call->app_attn_func = attn; if (error) call->app_error_func = error; if (aemap) call->app_aemap_func = aemap; _state(call); spin_lock(&conn->lock); set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&conn->chanwait, &myself); try_again: /* try to find an unused channel */ for (cix = 0; cix < 4; cix++) if (!conn->channels[cix]) goto obtained_chan; /* no free channels - wait for one to become available */ ret = -EINTR; if (signal_pending(current)) goto error_unwait; spin_unlock(&conn->lock); schedule(); set_current_state(TASK_INTERRUPTIBLE); spin_lock(&conn->lock); goto try_again; /* got a channel - now attach to the connection */ obtained_chan: remove_wait_queue(&conn->chanwait, &myself); set_current_state(TASK_RUNNING); /* concoct a unique call number */ next_callid: call->call_id = htonl(++conn->call_counter); for (loop = 0; loop < 4; loop++) if (conn->channels[loop] && conn->channels[loop]->call_id == call->call_id) goto next_callid; rxrpc_get_connection(conn); conn->channels[cix] = call; /* assign _after_ done callid check loop */ do_gettimeofday(&conn->atime); call->chan_ix = htonl(cix); spin_unlock(&conn->lock); down_write(&rxrpc_calls_sem); list_add_tail(&call->call_link, &rxrpc_calls); up_write(&rxrpc_calls_sem); __RXACCT(atomic_inc(&rxrpc_call_count)); *_call = call; _leave(" = 0 (call=%p cix=%u)", call, cix); return 0; error_unwait: remove_wait_queue(&conn->chanwait, &myself); set_current_state(TASK_RUNNING); spin_unlock(&conn->lock); free_page((unsigned long) call); _leave(" = %d", ret); return ret; } /* end rxrpc_create_call() */ /*****************************************************************************/ /* * create a new call record for incoming calls */ int rxrpc_incoming_call(struct rxrpc_connection *conn, struct rxrpc_message *msg, struct rxrpc_call **_call) { struct rxrpc_call *call; unsigned cix; int ret; cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); /* allocate and initialise a call record */ ret = __rxrpc_create_call(conn, &call); if (ret < 0) { _leave(" = %d", ret); return ret; } call->pkt_rcv_count = 1; call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; call->app_mark = sizeof(uint32_t); _state(call); /* attach to the connection */ ret = -EBUSY; call->chan_ix = htonl(cix); call->call_id = msg->hdr.callNumber; spin_lock(&conn->lock); if (!conn->channels[cix] || conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR ) { conn->channels[cix] = call; rxrpc_get_connection(conn); ret = 0; } spin_unlock(&conn->lock); if (ret < 0) { free_page((unsigned long) call); call = NULL; } if (ret == 0) { down_write(&rxrpc_calls_sem); list_add_tail(&call->call_link, &rxrpc_calls); up_write(&rxrpc_calls_sem); __RXACCT(atomic_inc(&rxrpc_call_count)); *_call = call; } _leave(" = %d [%p]", ret, call); return ret; } /* end rxrpc_incoming_call() */ /*****************************************************************************/ /* * free a call record */ void rxrpc_put_call(struct rxrpc_call *call) { struct rxrpc_connection *conn = call->conn; struct rxrpc_message *msg; _enter("%p{u=%d}",call,atomic_read(&call->usage)); /* sanity check */ if (atomic_read(&call->usage) <= 0) BUG(); /* to prevent a race, the decrement and the de-list must be effectively * atomic */ spin_lock(&conn->lock); if (likely(!atomic_dec_and_test(&call->usage))) { spin_unlock(&conn->lock); _leave(""); return; } if (conn->channels[ntohl(call->chan_ix)] == call) conn->channels[ntohl(call->chan_ix)] = NULL; spin_unlock(&conn->lock); wake_up(&conn->chanwait); rxrpc_put_connection(conn); /* clear the timers and dequeue from krxiod */ del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); rxrpc_krxiod_dequeue_call(call); /* clean up the contents of the struct */ if (call->snd_nextmsg) rxrpc_put_message(call->snd_nextmsg); if (call->snd_ping) rxrpc_put_message(call->snd_ping); while (!list_empty(&call->acks_pendq)) { msg = list_entry(call->acks_pendq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->rcv_receiveq)) { msg = list_entry(call->rcv_receiveq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->app_readyq)) { msg = list_entry(call->app_readyq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->app_unreadyq)) { msg = list_entry(call->app_unreadyq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } module_put(call->owner); down_write(&rxrpc_calls_sem); list_del(&call->call_link); up_write(&rxrpc_calls_sem); __RXACCT(atomic_dec(&rxrpc_call_count)); free_page((unsigned long) call); _leave(" [destroyed]"); } /* end rxrpc_put_call() */ /*****************************************************************************/ /* * actually generate a normal ACK */ static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, rxrpc_seq_t seq) { struct rxrpc_message *msg; struct kvec diov[3]; __be32 aux[4]; int delta, ret; /* ACKs default to DELAY */ if (!call->ackr.reason) call->ackr.reason = RXRPC_ACK_DELAY; _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", jiffies - call->cjif, ntohs(call->ackr.maxSkew), ntohl(call->ackr.firstPacket), ntohl(call->ackr.previousPacket), ntohl(call->ackr.serial), rxrpc_acks[call->ackr.reason], call->ackr.nAcks); aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ aux[1] = htonl(1444); /* max MTU */ aux[2] = htonl(16); /* rwind */ aux[3] = htonl(4); /* max packets */ diov[0].iov_len = sizeof(struct rxrpc_ackpacket); diov[0].iov_base = &call->ackr; diov[1].iov_len = call->ackr_pend_cnt + 3; diov[1].iov_base = call->ackr_array; diov[2].iov_len = sizeof(aux); diov[2].iov_base = &aux; /* build and send the message */ ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 3, diov, GFP_KERNEL, &msg); if (ret < 0) goto out; msg->seq = seq; msg->hdr.seq = htonl(seq); msg->hdr.flags |= RXRPC_SLOW_START_OK; ret = rxrpc_conn_sendmsg(call->conn, msg); rxrpc_put_message(msg); if (ret < 0) goto out; call->pkt_snd_count++; /* count how many actual ACKs there were at the front */ for (delta = 0; delta < call->ackr_pend_cnt; delta++) if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) break; call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ /* crank the ACK window around */ if (delta == 0) { /* un-ACK'd window */ } else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { /* partially ACK'd window * - shuffle down to avoid losing out-of-sequence packets */ call->ackr_win_bot += delta; call->ackr_win_top += delta; memmove(&call->ackr_array[0], &call->ackr_array[delta], call->ackr_pend_cnt); memset(&call->ackr_array[call->ackr_pend_cnt], RXRPC_ACK_TYPE_NACK, sizeof(call->ackr_array) - call->ackr_pend_cnt); } else { /* fully ACK'd window * - just clear the whole thing */ memset(&call->ackr_array, RXRPC_ACK_TYPE_NACK, sizeof(call->ackr_array)); } /* clear this ACK */ memset(&call->ackr, 0, sizeof(call->ackr)); out: if (!call->app_call_state) printk("___ STATE 0 ___\n"); return ret; } /* end __rxrpc_call_gen_normal_ACK() */ /*****************************************************************************/ /* * note the reception of a packet in the call's ACK records and generate an * appropriate ACK packet if necessary * - returns 0 if packet should be processed, 1 if packet should be ignored * and -ve on an error */ static int rxrpc_call_generate_ACK(struct rxrpc_call *call, struct rxrpc_header *hdr, struct rxrpc_ackpacket *ack) { struct rxrpc_message *msg; rxrpc_seq_t seq; unsigned offset; int ret = 0, err; u8 special_ACK, do_ACK, force; _enter("%p,%p { seq=%d tp=%d fl=%02x }", call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); seq = ntohl(hdr->seq); offset = seq - call->ackr_win_bot; do_ACK = RXRPC_ACK_DELAY; special_ACK = 0; force = (seq == 1); if (call->ackr_high_seq < seq) call->ackr_high_seq = seq; /* deal with generation of obvious special ACKs first */ if (ack && ack->reason == RXRPC_ACK_PING) { special_ACK = RXRPC_ACK_PING_RESPONSE; ret = 1; goto gen_ACK; } if (seq < call->ackr_win_bot) { special_ACK = RXRPC_ACK_DUPLICATE; ret = 1; goto gen_ACK; } if (seq >= call->ackr_win_top) { special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; ret = 1; goto gen_ACK; } if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { special_ACK = RXRPC_ACK_DUPLICATE; ret = 1; goto gen_ACK; } /* okay... it's a normal data packet inside the ACK window */ call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; if (offset < call->ackr_pend_cnt) { } else if (offset > call->ackr_pend_cnt) { do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; call->ackr_pend_cnt = offset; goto gen_ACK; } if (hdr->flags & RXRPC_REQUEST_ACK) { do_ACK = RXRPC_ACK_REQUESTED; } /* generate an ACK on the final packet of a reply just received */ if (hdr->flags & RXRPC_LAST_PACKET) { if (call->conn->out_clientflag) force = 1; } else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { do_ACK = RXRPC_ACK_REQUESTED; } /* re-ACK packets previously received out-of-order */ for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) break; call->ackr_pend_cnt = offset; /* generate an ACK if we fill up the window */ if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) force = 1; gen_ACK: _debug("%05lu ACKs pend=%u norm=%s special=%s%s", jiffies - call->cjif, call->ackr_pend_cnt, rxrpc_acks[do_ACK], rxrpc_acks[special_ACK], force ? " immediate" : do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : hdr->flags & RXRPC_LAST_PACKET ? " finalise" : " defer" ); /* send any pending normal ACKs if need be */ if (call->ackr_pend_cnt > 0) { /* fill out the appropriate form */ call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, 65535U)); call->ackr.firstPacket = htonl(call->ackr_win_bot); call->ackr.previousPacket = call->ackr_prev_seq; call->ackr.serial = hdr->serial; call->ackr.nAcks = call->ackr_pend_cnt; if (do_ACK == RXRPC_ACK_REQUESTED) call->ackr.reason = do_ACK; /* generate the ACK immediately if necessary */ if (special_ACK || force) { err = __rxrpc_call_gen_normal_ACK( call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); if (err < 0) { ret = err; goto out; } } } if (call->ackr.reason == RXRPC_ACK_REQUESTED) call->ackr_dfr_seq = seq; /* start the ACK timer if not running if there are any pending deferred * ACKs */ if (call->ackr_pend_cnt > 0 && call->ackr.reason != RXRPC_ACK_REQUESTED && !timer_pending(&call->ackr_dfr_timo) ) { unsigned long timo; timo = rxrpc_call_dfr_ack_timeout + jiffies; _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); spin_lock(&call->lock); mod_timer(&call->ackr_dfr_timo, timo); spin_unlock(&call->lock); } else if ((call->ackr_pend_cnt == 0 || call->ackr.reason == RXRPC_ACK_REQUESTED) && timer_pending(&call->ackr_dfr_timo) ) { /* stop timer if no pending ACKs */ _debug("CLEAR ACKR TIMER"); del_timer_sync(&call->ackr_dfr_timo); } /* send a special ACK if one is required */ if (special_ACK) { struct rxrpc_ackpacket ack; struct kvec diov[2]; uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; /* fill out the appropriate form */ ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); ack.maxSkew = htons(min(call->ackr_high_seq - seq, 65535U)); ack.firstPacket = htonl(call->ackr_win_bot); ack.previousPacket = call->ackr_prev_seq; ack.serial = hdr->serial; ack.reason = special_ACK; ack.nAcks = 0; _proto("Rx Sending s-ACK" " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", ntohs(ack.maxSkew), ntohl(ack.firstPacket), ntohl(ack.previousPacket), ntohl(ack.serial), rxrpc_acks[ack.reason], ack.nAcks); diov[0].iov_len = sizeof(struct rxrpc_ackpacket); diov[0].iov_base = &ack; diov[1].iov_len = sizeof(acks); diov[1].iov_base = acks; /* build and send the message */ err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, hdr->seq ? 2 : 1, diov, GFP_KERNEL, &msg); if (err < 0) { ret = err; goto out; } msg->seq = seq; msg->hdr.seq = htonl(seq); msg->hdr.flags |= RXRPC_SLOW_START_OK; err = rxrpc_conn_sendmsg(call->conn, msg); rxrpc_put_message(msg); if (err < 0) { ret = err; goto out; } call->pkt_snd_count++; } out: if (hdr->seq) call->ackr_prev_seq = hdr->seq; _leave(" = %d", ret); return ret; } /* end rxrpc_call_generate_ACK() */ /*****************************************************************************/ /* * handle work to be done on a call * - includes packet reception and timeout processing */ void rxrpc_call_do_stuff(struct rxrpc_call *call) { _enter("%p{flags=%lx}", call, call->flags); /* handle packet reception */ if (call->flags & RXRPC_CALL_RCV_PKT) { _debug("- receive packet"); call->flags &= ~RXRPC_CALL_RCV_PKT; rxrpc_call_receive_packet(call); } /* handle overdue ACKs */ if (call->flags & RXRPC_CALL_ACKS_TIMO) { _debug("- overdue ACK timeout"); call->flags &= ~RXRPC_CALL_ACKS_TIMO; rxrpc_call_resend(call, call->snd_seq_count); } /* handle lack of reception */ if (call->flags & RXRPC_CALL_RCV_TIMO) { _debug("- reception timeout"); call->flags &= ~RXRPC_CALL_RCV_TIMO; rxrpc_call_abort(call, -EIO); } /* handle deferred ACKs */ if (call->flags & RXRPC_CALL_ACKR_TIMO || (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) ) { _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", jiffies - call->cjif, rxrpc_acks[call->ackr.reason], call->ackr.nAcks); call->flags &= ~RXRPC_CALL_ACKR_TIMO; if (call->ackr.nAcks > 0 && call->app_call_state != RXRPC_CSTATE_ERROR) { /* generate ACK */ __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); call->ackr_dfr_seq = 0; } } _leave(""); } /* end rxrpc_call_do_stuff() */ /*****************************************************************************/ /* * send an abort message at call or connection level * - must be called with call->lock held * - the supplied error code is sent as the packet data */ static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) { struct rxrpc_connection *conn = call->conn; struct rxrpc_message *msg; struct kvec diov[1]; int ret; __be32 _error; _enter("%p{%08x},%p{%d},%d", conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); /* if this call is already aborted, then just wake up any waiters */ if (call->app_call_state == RXRPC_CSTATE_ERROR) { spin_unlock(&call->lock); call->app_error_func(call); _leave(" = 0"); return 0; } rxrpc_get_call(call); /* change the state _with_ the lock still held */ call->app_call_state = RXRPC_CSTATE_ERROR; call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; call->app_errno = errno; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; _state(call); /* ask the app to translate the error code */ call->app_aemap_func(call); spin_unlock(&call->lock); /* flush any outstanding ACKs */ del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); if (rxrpc_call_is_ack_pending(call)) __rxrpc_call_gen_normal_ACK(call, 0); /* send the abort packet only if we actually traded some other * packets */ ret = 0; if (call->pkt_snd_count || call->pkt_rcv_count) { /* actually send the abort */ _proto("Rx Sending Call ABORT { data=%d }", call->app_abort_code); _error = htonl(call->app_abort_code); diov[0].iov_len = sizeof(_error); diov[0].iov_base = &_error; ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, 1, diov, GFP_KERNEL, &msg); if (ret == 0) { ret = rxrpc_conn_sendmsg(conn, msg); rxrpc_put_message(msg); } } /* tell the app layer to let go */ call->app_error_func(call); rxrpc_put_call(call); _leave(" = %d", ret); return ret; } /* end __rxrpc_call_abort() */ /*****************************************************************************/ /* * send an abort message at call or connection level * - the supplied error code is sent as the packet data */ int rxrpc_call_abort(struct rxrpc_call *call, int error) { spin_lock(&call->lock); return __rxrpc_call_abort(call, error); } /* end rxrpc_call_abort() */ /*****************************************************************************/ /* * process packets waiting for this call */ static void rxrpc_call_receive_packet(struct rxrpc_call *call) { struct rxrpc_message *msg; struct list_head *_p; _enter("%p", call); rxrpc_get_call(call); /* must not go away too soon if aborted by * app-layer */ while (!list_empty(&call->rcv_receiveq)) { /* try to get next packet */ _p = NULL; spin_lock(&call->lock); if (!list_empty(&call->rcv_receiveq)) { _p = call->rcv_receiveq.next; list_del_init(_p); } spin_unlock(&call->lock); if (!_p) break; msg = list_entry(_p, struct rxrpc_message, link); _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", jiffies - call->cjif, rxrpc_pkts[msg->hdr.type], ntohl(msg->hdr.serial), msg->seq, msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' ); switch (msg->hdr.type) { /* deal with data packets */ case RXRPC_PACKET_TYPE_DATA: /* ACK the packet if necessary */ switch (rxrpc_call_generate_ACK(call, &msg->hdr, NULL)) { case 0: /* useful packet */ rxrpc_call_receive_data_packet(call, msg); break; case 1: /* duplicate or out-of-window packet */ break; default: rxrpc_put_message(msg); goto out; } break; /* deal with ACK packets */ case RXRPC_PACKET_TYPE_ACK: rxrpc_call_receive_ack_packet(call, msg); break; /* deal with abort packets */ case RXRPC_PACKET_TYPE_ABORT: { __be32 _dbuf, *dp; dp = skb_header_pointer(msg->pkt, msg->offset, sizeof(_dbuf), &_dbuf); if (dp == NULL) printk("Rx Received short ABORT packet\n"); _proto("Rx Received Call ABORT { data=%d }", (dp ? ntohl(*dp) : 0)); spin_lock(&call->lock); call->app_call_state = RXRPC_CSTATE_ERROR; call->app_err_state = RXRPC_ESTATE_PEER_ABORT; call->app_abort_code = (dp ? ntohl(*dp) : 0); call->app_errno = -ECONNABORTED; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; /* ask the app to translate the error code */ call->app_aemap_func(call); _state(call); spin_unlock(&call->lock); call->app_error_func(call); break; } default: /* deal with other packet types */ _proto("Rx Unsupported packet type %u (#%u)", msg->hdr.type, msg->seq); break; } rxrpc_put_message(msg); } out: rxrpc_put_call(call); _leave(""); } /* end rxrpc_call_receive_packet() */ /*****************************************************************************/ /* * process next data packet * - as the next data packet arrives: * - it is queued on app_readyq _if_ it is the next one expected * (app_ready_seq+1) * - it is queued on app_unreadyq _if_ it is not the next one expected * - if a packet placed on app_readyq completely fills a hole leading up to * the first packet on app_unreadyq, then packets now in sequence are * tranferred to app_readyq * - the application layer can only see packets on app_readyq * (app_ready_qty bytes) * - the application layer is prodded every time a new packet arrives */ static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg) { const struct rxrpc_operation *optbl, *op; struct rxrpc_message *pmsg; struct list_head *_p; int ret, lo, hi, rmtimo; __be32 opid; _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); rxrpc_get_message(msg); /* add to the unready queue if we'd have to create a hole in the ready * queue otherwise */ if (msg->seq != call->app_ready_seq + 1) { _debug("Call add packet %d to unreadyq", msg->seq); /* insert in seq order */ list_for_each(_p, &call->app_unreadyq) { pmsg = list_entry(_p, struct rxrpc_message, link); if (pmsg->seq > msg->seq) break; } list_add_tail(&msg->link, _p); _leave(" [unreadyq]"); return; } /* next in sequence - simply append into the call's ready queue */ _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", msg->seq, msg->dsize, call->app_ready_qty); spin_lock(&call->lock); call->app_ready_seq = msg->seq; call->app_ready_qty += msg->dsize; list_add_tail(&msg->link, &call->app_readyq); /* move unready packets to the readyq if we got rid of a hole */ while (!list_empty(&call->app_unreadyq)) { pmsg = list_entry(call->app_unreadyq.next, struct rxrpc_message, link); if (pmsg->seq != call->app_ready_seq + 1) break; /* next in sequence - just move list-to-list */ _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", pmsg->seq, pmsg->dsize, call->app_ready_qty); call->app_ready_seq = pmsg->seq; call->app_ready_qty += pmsg->dsize; list_del_init(&pmsg->link); list_add_tail(&pmsg->link, &call->app_readyq); } /* see if we've got the last packet yet */ if (!list_empty(&call->app_readyq)) { pmsg = list_entry(call->app_readyq.prev, struct rxrpc_message, link); if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { call->app_last_rcv = 1; _debug("Last packet on readyq"); } } switch (call->app_call_state) { /* do nothing if call already aborted */ case RXRPC_CSTATE_ERROR: spin_unlock(&call->lock); _leave(" [error]"); return; /* extract the operation ID from an incoming call if that's not * yet been done */ case RXRPC_CSTATE_SRVR_RCV_OPID: spin_unlock(&call->lock); /* handle as yet insufficient data for the operation ID */ if (call->app_ready_qty < 4) { if (call->app_last_rcv) /* trouble - last packet seen */ rxrpc_call_abort(call, -EINVAL); _leave(""); return; } /* pull the operation ID out of the buffer */ ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); if (ret < 0) { printk("Unexpected error from read-data: %d\n", ret); if (call->app_call_state != RXRPC_CSTATE_ERROR) rxrpc_call_abort(call, ret); _leave(""); return; } call->app_opcode = ntohl(opid); /* locate the operation in the available ops table */ optbl = call->conn->service->ops_begin; lo = 0; hi = call->conn->service->ops_end - optbl; while (lo < hi) { int mid = (hi + lo) / 2; op = &optbl[mid]; if (call->app_opcode == op->id) goto found_op; if (call->app_opcode > op->id) lo = mid + 1; else hi = mid; } /* search failed */ kproto("Rx Client requested operation %d from %s service", call->app_opcode, call->conn->service->name); rxrpc_call_abort(call, -EINVAL); _leave(" [inval]"); return; found_op: _proto("Rx Client requested operation %s from %s service", op->name, call->conn->service->name); /* we're now waiting for the argument block (unless the call * was aborted) */ spin_lock(&call->lock); if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { if (!call->app_last_rcv) call->app_call_state = RXRPC_CSTATE_SRVR_RCV_ARGS; else if (call->app_ready_qty > 0) call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; else call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; call->app_mark = op->asize; call->app_user = op->user; } spin_unlock(&call->lock); _state(call); break; case RXRPC_CSTATE_SRVR_RCV_ARGS: /* change state if just received last packet of arg block */ if (call->app_last_rcv) call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; spin_unlock(&call->lock); _state(call); break; case RXRPC_CSTATE_CLNT_RCV_REPLY: /* change state if just received last packet of reply block */ rmtimo = 0; if (call->app_last_rcv) { call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; rmtimo = 1; } spin_unlock(&call->lock); if (rmtimo) { del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); } _state(call); break; default: /* deal with data reception in an unexpected state */ printk("Unexpected state [[[ %u ]]]\n", call->app_call_state); __rxrpc_call_abort(call, -EBADMSG); _leave(""); return; } if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY && call->app_last_rcv) BUG(); /* otherwise just invoke the data function whenever we can satisfy its desire for more * data */ _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s", call->app_call_state, call->app_ready_qty, call->app_mark, call->app_last_rcv ? " last-rcvd" : ""); spin_lock(&call->lock); ret = __rxrpc_call_read_data(call); switch (ret) { case 0: spin_unlock(&call->lock); call->app_attn_func(call); break; case -EAGAIN: spin_unlock(&call->lock); break; case -ECONNABORTED: spin_unlock(&call->lock); break; default: __rxrpc_call_abort(call, ret); break; } _state(call); _leave(""); } /* end rxrpc_call_receive_data_packet() */ /*****************************************************************************/ /* * received an ACK packet */ static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg) { struct rxrpc_ackpacket _ack, *ap; rxrpc_serial_net_t serial; rxrpc_seq_t seq; int ret; _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); /* extract the basic ACK record */ ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack); if (ap == NULL) { printk("Rx Received short ACK packet\n"); return; } msg->offset += sizeof(_ack); serial = ap->serial; seq = ntohl(ap->firstPacket); _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", ntohl(msg->hdr.serial), ntohs(ap->bufferSpace), ntohs(ap->maxSkew), seq, ntohl(ap->previousPacket), ntohl(serial), rxrpc_acks[ap->reason], call->ackr.nAcks ); /* check the other side isn't ACK'ing a sequence number I haven't sent * yet */ if (ap->nAcks > 0 && (seq > call->snd_seq_count || seq + ap->nAcks - 1 > call->snd_seq_count)) { printk("Received ACK (#%u-#%u) for unsent packet\n", seq, seq + ap->nAcks - 1); rxrpc_call_abort(call, -EINVAL); _leave(""); return; } /* deal with RTT calculation */ if (serial) { struct rxrpc_message *rttmsg; /* find the prompting packet */ spin_lock(&call->lock); if (call->snd_ping && call->snd_ping->hdr.serial == serial) { /* it was a ping packet */ rttmsg = call->snd_ping; call->snd_ping = NULL; spin_unlock(&call->lock); if (rttmsg) { rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt(call->conn->peer, rttmsg, msg); rxrpc_put_message(rttmsg); } } else { struct list_head *_p; /* it ought to be a data packet - look in the pending * ACK list */ list_for_each(_p, &call->acks_pendq) { rttmsg = list_entry(_p, struct rxrpc_message, link); if (rttmsg->hdr.serial == serial) { if (rttmsg->rttdone) /* never do RTT twice without * resending */ break; rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt( call->conn->peer, rttmsg, msg); break; } } spin_unlock(&call->lock); } } switch (ap->reason) { /* deal with negative/positive acknowledgement of data * packets */ case RXRPC_ACK_REQUESTED: case RXRPC_ACK_DELAY: case RXRPC_ACK_IDLE: rxrpc_call_definitively_ACK(call, seq - 1); case RXRPC_ACK_DUPLICATE: case RXRPC_ACK_OUT_OF_SEQUENCE: case RXRPC_ACK_EXCEEDS_WINDOW: call->snd_resend_cnt = 0; ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks); if (ret < 0) rxrpc_call_abort(call, ret); break; /* respond to ping packets immediately */ case RXRPC_ACK_PING: rxrpc_call_generate_ACK(call, &msg->hdr, ap); break; /* only record RTT on ping response packets */ case RXRPC_ACK_PING_RESPONSE: if (call->snd_ping) { struct rxrpc_message *rttmsg; /* only do RTT stuff if the response matches the * retained ping */ rttmsg = NULL; spin_lock(&call->lock); if (call->snd_ping && call->snd_ping->hdr.serial == ap->serial) { rttmsg = call->snd_ping; call->snd_ping = NULL; } spin_unlock(&call->lock); if (rttmsg) { rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt(call->conn->peer, rttmsg, msg); rxrpc_put_message(rttmsg); } } break; default: printk("Unsupported ACK reason %u\n", ap->reason); break; } _leave(""); } /* end rxrpc_call_receive_ack_packet() */ /*****************************************************************************/ /* * record definitive ACKs for all messages up to and including the one with the * 'highest' seq */ static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t highest) { struct rxrpc_message *msg; int now_complete; _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest); while (call->acks_dftv_seq < highest) { call->acks_dftv_seq++; _proto("Definitive ACK on packet #%u", call->acks_dftv_seq); /* discard those at front of queue until message with highest * ACK is found */ spin_lock(&call->lock); msg = NULL; if (!list_empty(&call->acks_pendq)) { msg = list_entry(call->acks_pendq.next, struct rxrpc_message, link); list_del_init(&msg->link); /* dequeue */ if (msg->state == RXRPC_MSG_SENT) call->acks_pend_cnt--; } spin_unlock(&call->lock); /* insanity check */ if (!msg) panic("%s(): acks_pendq unexpectedly empty\n", __FUNCTION__); if (msg->seq != call->acks_dftv_seq) panic("%s(): Packet #%u expected at front of acks_pendq" " (#%u found)\n", __FUNCTION__, call->acks_dftv_seq, msg->seq); /* discard the message */ msg->state = RXRPC_MSG_DONE; rxrpc_put_message(msg); } /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ now_complete = 0; spin_lock(&call->lock); if (call->acks_dftv_seq == call->snd_seq_count) { if (call->app_call_state != RXRPC_CSTATE_COMPLETE) { call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); now_complete = 1; } } spin_unlock(&call->lock); if (now_complete) { del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); call->app_attn_func(call); } _leave(""); } /* end rxrpc_call_definitively_ACK() */ /*****************************************************************************/ /* * record the specified amount of ACKs/NAKs */ static int rxrpc_call_record_ACK(struct rxrpc_call *call, struct rxrpc_message *msg, rxrpc_seq_t seq, size_t count) { struct rxrpc_message *dmsg; struct list_head *_p; rxrpc_seq_t highest; unsigned ix; size_t chunk; char resend, now_complete; u8 acks[16]; _enter("%p{apc=%u ads=%u},%p,%u,%Zu", call, call->acks_pend_cnt, call->acks_dftv_seq, msg, seq, count); /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order * ACKs) */ if (seq <= call->acks_dftv_seq) { unsigned delta = call->acks_dftv_seq - seq; if (count <= delta) { _leave(" = 0 [all definitively ACK'd]"); return 0; } seq += delta; count -= delta; msg->offset += delta; } highest = seq + count - 1; resend = 0; while (count > 0) { /* extract up to 16 ACK slots at a time */ chunk = min(count, sizeof(acks)); count -= chunk; memset(acks, 2, sizeof(acks)); if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) { printk("Rx Received short ACK packet\n"); _leave(" = -EINVAL"); return -EINVAL; } msg->offset += chunk; /* check that the ACK set is valid */ for (ix = 0; ix < chunk; ix++) { switch (acks[ix]) { case RXRPC_ACK_TYPE_ACK: break; case RXRPC_ACK_TYPE_NACK: resend = 1; break; default: printk("Rx Received unsupported ACK state" " %u\n", acks[ix]); _leave(" = -EINVAL"); return -EINVAL; } } _proto("Rx ACK of packets #%u-#%u " "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", seq, (unsigned) (seq + chunk - 1), _acktype[acks[0x0]], _acktype[acks[0x1]], _acktype[acks[0x2]], _acktype[acks[0x3]], _acktype[acks[0x4]], _acktype[acks[0x5]], _acktype[acks[0x6]], _acktype[acks[0x7]], _acktype[acks[0x8]], _acktype[acks[0x9]], _acktype[acks[0xA]], _acktype[acks[0xB]], _acktype[acks[0xC]], _acktype[acks[0xD]], _acktype[acks[0xE]], _acktype[acks[0xF]], call->acks_pend_cnt ); /* mark the packets in the ACK queue as being provisionally * ACK'd */ ix = 0; spin_lock(&call->lock); /* find the first packet ACK'd/NAK'd here */ list_for_each(_p, &call->acks_pendq) { dmsg = list_entry(_p, struct rxrpc_message, link); if (dmsg->seq == seq) goto found_first; _debug("- %u: skipping #%u", ix, dmsg->seq); } goto bad_queue; found_first: do { _debug("- %u: processing #%u (%c) apc=%u", ix, dmsg->seq, _acktype[acks[ix]], call->acks_pend_cnt); if (acks[ix] == RXRPC_ACK_TYPE_ACK) { if (dmsg->state == RXRPC_MSG_SENT) call->acks_pend_cnt--; dmsg->state = RXRPC_MSG_ACKED; } else { if (dmsg->state == RXRPC_MSG_ACKED) call->acks_pend_cnt++; dmsg->state = RXRPC_MSG_SENT; } ix++; seq++; _p = dmsg->link.next; dmsg = list_entry(_p, struct rxrpc_message, link); } while(ix < chunk && _p != &call->acks_pendq && dmsg->seq == seq); if (ix < chunk) goto bad_queue; spin_unlock(&call->lock); } if (resend) rxrpc_call_resend(call, highest); /* if all packets are provisionally ACK'd, then wake up anyone who's * waiting for that */ now_complete = 0; spin_lock(&call->lock); if (call->acks_pend_cnt == 0) { if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); } now_complete = 1; } spin_unlock(&call->lock); if (now_complete) { _debug("- wake up waiters"); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); call->app_attn_func(call); } _leave(" = 0 (apc=%u)", call->acks_pend_cnt); return 0; bad_queue: panic("%s(): acks_pendq in bad state (packet #%u absent)\n", __FUNCTION__, seq); } /* end rxrpc_call_record_ACK() */ /*****************************************************************************/ /* * transfer data from the ready packet queue to the asynchronous read buffer * - since this func is the only one going to look at packets queued on * app_readyq, we don't need a lock to modify or access them, only to modify * the queue pointers * - called with call->lock held * - the buffer must be in kernel space * - returns: * 0 if buffer filled * -EAGAIN if buffer not filled and more data to come * -EBADMSG if last packet received and insufficient data left * -ECONNABORTED if the call has in an error state */ static int __rxrpc_call_read_data(struct rxrpc_call *call) { struct rxrpc_message *msg; size_t qty; int ret; _enter("%p{as=%d buf=%p qty=%Zu/%Zu}", call, call->app_async_read, call->app_read_buf, call->app_ready_qty, call->app_mark); /* check the state */ switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_RCV_ARGS: case RXRPC_CSTATE_CLNT_RCV_REPLY: if (call->app_last_rcv) { printk("%s(%p,%p,%Zd):" " Inconsistent call state (%s, last pkt)", __FUNCTION__, call, call->app_read_buf, call->app_mark, rxrpc_call_states[call->app_call_state]); BUG(); } break; case RXRPC_CSTATE_SRVR_RCV_OPID: case RXRPC_CSTATE_SRVR_GOT_ARGS: case RXRPC_CSTATE_CLNT_GOT_REPLY: break; case RXRPC_CSTATE_SRVR_SND_REPLY: if (!call->app_last_rcv) { printk("%s(%p,%p,%Zd):" " Inconsistent call state (%s, not last pkt)", __FUNCTION__, call, call->app_read_buf, call->app_mark, rxrpc_call_states[call->app_call_state]); BUG(); } _debug("Trying to read data from call in SND_REPLY state"); break; case RXRPC_CSTATE_ERROR: _leave(" = -ECONNABORTED"); return -ECONNABORTED; default: printk("reading in unexpected state [[[ %u ]]]\n", call->app_call_state); BUG(); } /* handle the case of not having an async buffer */ if (!call->app_async_read) { if (call->app_mark == RXRPC_APP_MARK_EOF) { ret = call->app_last_rcv ? 0 : -EAGAIN; } else { if (call->app_mark >= call->app_ready_qty) { call->app_mark = RXRPC_APP_MARK_EOF; ret = 0; } else { ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; } } _leave(" = %d [no buf]", ret); return 0; } while (!list_empty(&call->app_readyq) && call->app_mark > 0) { msg = list_entry(call->app_readyq.next, struct rxrpc_message, link); /* drag as much data as we need out of this packet */ qty = min(call->app_mark, msg->dsize); _debug("reading %Zu from skb=%p off=%lu", qty, msg->pkt, msg->offset); if (call->app_read_buf) if (skb_copy_bits(msg->pkt, msg->offset, call->app_read_buf, qty) < 0) panic("%s: Failed to copy data from packet:" " (%p,%p,%Zd)", __FUNCTION__, call, call->app_read_buf, qty); /* if that packet is now empty, discard it */ call->app_ready_qty -= qty; msg->dsize -= qty; if (msg->dsize == 0) { list_del_init(&msg->link); rxrpc_put_message(msg); } else { msg->offset += qty; } call->app_mark -= qty; if (call->app_read_buf) call->app_read_buf += qty; } if (call->app_mark == 0) { call->app_async_read = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; /* adjust the state if used up all packets */ if (list_empty(&call->app_readyq) && call->app_last_rcv) { switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_RCV_OPID: call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; call->app_mark = RXRPC_APP_MARK_EOF; _state(call); del_timer_sync(&call->rcv_timeout); break; case RXRPC_CSTATE_SRVR_GOT_ARGS: call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; _state(call); del_timer_sync(&call->rcv_timeout); break; default: call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->ackr_dfr_timo); del_timer_sync(&call->rcv_timeout); break; } } _leave(" = 0"); return 0; } if (call->app_last_rcv) { _debug("Insufficient data (%Zu/%Zu)", call->app_ready_qty, call->app_mark); call->app_async_read = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; _leave(" = -EBADMSG"); return -EBADMSG; } _leave(" = -EAGAIN"); return -EAGAIN; } /* end __rxrpc_call_read_data() */ /*****************************************************************************/ /* * attempt to read the specified amount of data from the call's ready queue * into the buffer provided * - since this func is the only one going to look at packets queued on * app_readyq, we don't need a lock to modify or access them, only to modify * the queue pointers * - if the buffer pointer is NULL, then data is merely drained, not copied * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is * enough data or an error will be generated * - note that the caller must have added the calling task to the call's wait * queue beforehand * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this * function doesn't read all available data */ int rxrpc_call_read_data(struct rxrpc_call *call, void *buffer, size_t size, int flags) { int ret; _enter("%p{arq=%Zu},%p,%Zd,%x", call, call->app_ready_qty, buffer, size, flags); spin_lock(&call->lock); if (unlikely(!!call->app_read_buf)) { spin_unlock(&call->lock); _leave(" = -EBUSY"); return -EBUSY; } call->app_mark = size; call->app_read_buf = buffer; call->app_async_read = 1; call->app_read_count++; /* read as much data as possible */ ret = __rxrpc_call_read_data(call); switch (ret) { case 0: if (flags & RXRPC_CALL_READ_ALL && (!call->app_last_rcv || call->app_ready_qty > 0)) { _leave(" = -EBADMSG"); __rxrpc_call_abort(call, -EBADMSG); return -EBADMSG; } spin_unlock(&call->lock); call->app_attn_func(call); _leave(" = 0"); return ret; case -ECONNABORTED: spin_unlock(&call->lock); _leave(" = %d [aborted]", ret); return ret; default: __rxrpc_call_abort(call, ret); _leave(" = %d", ret); return ret; case -EAGAIN: spin_unlock(&call->lock); if (!(flags & RXRPC_CALL_READ_BLOCK)) { _leave(" = -EAGAIN"); return -EAGAIN; } /* wait for the data to arrive */ _debug("blocking for data arrival"); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!call->app_async_read || signal_pending(current)) break; schedule(); } set_current_state(TASK_RUNNING); if (signal_pending(current)) { _leave(" = -EINTR"); return -EINTR; } if (call->app_call_state == RXRPC_CSTATE_ERROR) { _leave(" = -ECONNABORTED"); return -ECONNABORTED; } _leave(" = 0"); return 0; } } /* end rxrpc_call_read_data() */ /*****************************************************************************/ /* * write data to a call * - the data may not be sent immediately if it doesn't fill a buffer * - if we can't queue all the data for buffering now, siov[] will have been * adjusted to take account of what has been sent */ int rxrpc_call_write_data(struct rxrpc_call *call, size_t sioc, struct kvec *siov, u8 rxhdr_flags, unsigned int __nocast alloc_flags, int dup_data, size_t *size_sent) { struct rxrpc_message *msg; struct kvec *sptr; size_t space, size, chunk, tmp; char *buf; int ret; _enter("%p,%Zu,%p,%02x,%x,%d,%p", call, sioc, siov, rxhdr_flags, alloc_flags, dup_data, size_sent); *size_sent = 0; size = 0; ret = -EINVAL; /* can't send more if we've sent last packet from this end */ switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_SND_REPLY: case RXRPC_CSTATE_CLNT_SND_ARGS: break; case RXRPC_CSTATE_ERROR: ret = call->app_errno; default: goto out; } /* calculate how much data we've been given */ sptr = siov; for (; sioc > 0; sptr++, sioc--) { if (!sptr->iov_len) continue; if (!sptr->iov_base) goto out; size += sptr->iov_len; } _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size); do { /* make sure there's a message under construction */ if (!call->snd_nextmsg) { /* no - allocate a message with no data yet attached */ ret = rxrpc_conn_newmsg(call->conn, call, RXRPC_PACKET_TYPE_DATA, 0, NULL, alloc_flags, &call->snd_nextmsg); if (ret < 0) goto out; _debug("- allocated new message [ds=%Zu]", call->snd_nextmsg->dsize); } msg = call->snd_nextmsg; msg->hdr.flags |= rxhdr_flags; /* deal with zero-length terminal packet */ if (size == 0) { if (rxhdr_flags & RXRPC_LAST_PACKET) { ret = rxrpc_call_flush(call); if (ret < 0) goto out; } break; } /* work out how much space current packet has available */ space = call->conn->mtu_size - msg->dsize; chunk = min(space, size); _debug("- [before] space=%Zu chunk=%Zu", space, chunk); while (!siov->iov_len) siov++; /* if we are going to have to duplicate the data then coalesce * it too */ if (dup_data) { /* don't allocate more that 1 page at a time */ if (chunk > PAGE_SIZE) chunk = PAGE_SIZE; /* allocate a data buffer and attach to the message */ buf = kmalloc(chunk, alloc_flags); if (unlikely(!buf)) { if (msg->dsize == sizeof(struct rxrpc_header)) { /* discard an empty msg and wind back * the seq counter */ rxrpc_put_message(msg); call->snd_nextmsg = NULL; call->snd_seq_count--; } ret = -ENOMEM; goto out; } tmp = msg->dcount++; set_bit(tmp, &msg->dfree); msg->data[tmp].iov_base = buf; msg->data[tmp].iov_len = chunk; msg->dsize += chunk; *size_sent += chunk; size -= chunk; /* load the buffer with data */ while (chunk > 0) { tmp = min(chunk, siov->iov_len); memcpy(buf, siov->iov_base, tmp); buf += tmp; siov->iov_base += tmp; siov->iov_len -= tmp; if (!siov->iov_len) siov++; chunk -= tmp; } } else { /* we want to attach the supplied buffers directly */ while (chunk > 0 && msg->dcount < RXRPC_MSG_MAX_IOCS) { tmp = msg->dcount++; msg->data[tmp].iov_base = siov->iov_base; msg->data[tmp].iov_len = siov->iov_len; msg->dsize += siov->iov_len; *size_sent += siov->iov_len; size -= siov->iov_len; chunk -= siov->iov_len; siov++; } } _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size); /* dispatch the message when full, final or requesting ACK */ if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) { ret = rxrpc_call_flush(call); if (ret < 0) goto out; } } while(size > 0); ret = 0; out: _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size); return ret; } /* end rxrpc_call_write_data() */ /*****************************************************************************/ /* * flush outstanding packets to the network */ static int rxrpc_call_flush(struct rxrpc_call *call) { struct rxrpc_message *msg; int ret = 0; _enter("%p", call); rxrpc_get_call(call); /* if there's a packet under construction, then dispatch it now */ if (call->snd_nextmsg) { msg = call->snd_nextmsg; call->snd_nextmsg = NULL; if (msg->hdr.flags & RXRPC_LAST_PACKET) { msg->hdr.flags &= ~RXRPC_MORE_PACKETS; if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS) msg->hdr.flags |= RXRPC_REQUEST_ACK; } else { msg->hdr.flags |= RXRPC_MORE_PACKETS; } _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }", msg->dsize, msg->dcount, msg->dfree); /* queue and adjust call state */ spin_lock(&call->lock); list_add_tail(&msg->link, &call->acks_pendq); /* decide what to do depending on current state and if this is * the last packet */ ret = -EINVAL; switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_SND_REPLY: if (msg->hdr.flags & RXRPC_LAST_PACKET) { call->app_call_state = RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; _state(call); } break; case RXRPC_CSTATE_CLNT_SND_ARGS: if (msg->hdr.flags & RXRPC_LAST_PACKET) { call->app_call_state = RXRPC_CSTATE_CLNT_RCV_REPLY; _state(call); } break; case RXRPC_CSTATE_ERROR: ret = call->app_errno; default: spin_unlock(&call->lock); goto out; } call->acks_pend_cnt++; mod_timer(&call->acks_timeout, __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); spin_unlock(&call->lock); ret = rxrpc_conn_sendmsg(call->conn, msg); if (ret == 0) call->pkt_snd_count++; } out: rxrpc_put_call(call); _leave(" = %d", ret); return ret; } /* end rxrpc_call_flush() */ /*****************************************************************************/ /* * resend NAK'd or unacknowledged packets up to the highest one specified */ static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest) { struct rxrpc_message *msg; struct list_head *_p; rxrpc_seq_t seq = 0; _enter("%p,%u", call, highest); _proto("Rx Resend required"); /* handle too many resends */ if (call->snd_resend_cnt >= rxrpc_call_max_resend) { _debug("Aborting due to too many resends (rcv=%d)", call->pkt_rcv_count); rxrpc_call_abort(call, call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT); _leave(""); return; } spin_lock(&call->lock); call->snd_resend_cnt++; for (;;) { /* determine which the next packet we might need to ACK is */ if (seq <= call->acks_dftv_seq) seq = call->acks_dftv_seq; seq++; if (seq > highest) break; /* look for the packet in the pending-ACK queue */ list_for_each(_p, &call->acks_pendq) { msg = list_entry(_p, struct rxrpc_message, link); if (msg->seq == seq) goto found_msg; } panic("%s(%p,%d):" " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", __FUNCTION__, call, highest, call->acks_dftv_seq, call->snd_seq_count, seq); found_msg: if (msg->state != RXRPC_MSG_SENT) continue; /* only un-ACK'd packets */ rxrpc_get_message(msg); spin_unlock(&call->lock); /* send each message again (and ignore any errors we might * incur) */ _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }", msg->dsize, msg->dcount, msg->dfree); if (rxrpc_conn_sendmsg(call->conn, msg) == 0) call->pkt_snd_count++; rxrpc_put_message(msg); spin_lock(&call->lock); } /* reset the timeout */ mod_timer(&call->acks_timeout, __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); spin_unlock(&call->lock); _leave(""); } /* end rxrpc_call_resend() */ /*****************************************************************************/ /* * handle an ICMP error being applied to a call */ void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno) { _enter("%p{%u},%d", call, ntohl(call->call_id), errno); /* if this call is already aborted, then just wake up any waiters */ if (call->app_call_state == RXRPC_CSTATE_ERROR) { call->app_error_func(call); } else { /* tell the app layer what happened */ spin_lock(&call->lock); call->app_call_state = RXRPC_CSTATE_ERROR; _state(call); if (local) call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; else call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; call->app_errno = errno; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; /* map the error */ call->app_aemap_func(call); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); spin_unlock(&call->lock); call->app_error_func(call); } _leave(""); } /* end rxrpc_call_handle_error() */