[AF_RXRPC]: Add an interface to the AF_RXRPC module for the AFS filesystem to use
[linux-3.10.git] / net / rxrpc / ar-call.c
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/module.h>
13 #include <linux/circ_buf.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include "ar-internal.h"
17
18 struct kmem_cache *rxrpc_call_jar;
19 LIST_HEAD(rxrpc_calls);
20 DEFINE_RWLOCK(rxrpc_call_lock);
21 static unsigned rxrpc_call_max_lifetime = 60;
22 static unsigned rxrpc_dead_call_timeout = 2;
23
24 static void rxrpc_destroy_call(struct work_struct *work);
25 static void rxrpc_call_life_expired(unsigned long _call);
26 static void rxrpc_dead_call_expired(unsigned long _call);
27 static void rxrpc_ack_time_expired(unsigned long _call);
28 static void rxrpc_resend_time_expired(unsigned long _call);
29
30 /*
31  * allocate a new call
32  */
33 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
34 {
35         struct rxrpc_call *call;
36
37         call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
38         if (!call)
39                 return NULL;
40
41         call->acks_winsz = 16;
42         call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
43                                     gfp);
44         if (!call->acks_window) {
45                 kmem_cache_free(rxrpc_call_jar, call);
46                 return NULL;
47         }
48
49         setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
50                     (unsigned long) call);
51         setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
52                     (unsigned long) call);
53         setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
54                     (unsigned long) call);
55         setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
56                     (unsigned long) call);
57         INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
58         INIT_WORK(&call->processor, &rxrpc_process_call);
59         INIT_LIST_HEAD(&call->accept_link);
60         skb_queue_head_init(&call->rx_queue);
61         skb_queue_head_init(&call->rx_oos_queue);
62         init_waitqueue_head(&call->tx_waitq);
63         spin_lock_init(&call->lock);
64         rwlock_init(&call->state_lock);
65         atomic_set(&call->usage, 1);
66         call->debug_id = atomic_inc_return(&rxrpc_debug_id);
67         call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
68
69         memset(&call->sock_node, 0xed, sizeof(call->sock_node));
70
71         call->rx_data_expect = 1;
72         call->rx_data_eaten = 0;
73         call->rx_first_oos = 0;
74         call->ackr_win_top = call->rx_data_eaten + 1 + RXRPC_MAXACKS;
75         call->creation_jif = jiffies;
76         return call;
77 }
78
79 /*
80  * allocate a new client call and attempt to to get a connection slot for it
81  */
82 static struct rxrpc_call *rxrpc_alloc_client_call(
83         struct rxrpc_sock *rx,
84         struct rxrpc_transport *trans,
85         struct rxrpc_conn_bundle *bundle,
86         gfp_t gfp)
87 {
88         struct rxrpc_call *call;
89         int ret;
90
91         _enter("");
92
93         ASSERT(rx != NULL);
94         ASSERT(trans != NULL);
95         ASSERT(bundle != NULL);
96
97         call = rxrpc_alloc_call(gfp);
98         if (!call)
99                 return ERR_PTR(-ENOMEM);
100
101         sock_hold(&rx->sk);
102         call->socket = rx;
103         call->rx_data_post = 1;
104
105         ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
106         if (ret < 0) {
107                 kmem_cache_free(rxrpc_call_jar, call);
108                 return ERR_PTR(ret);
109         }
110
111         spin_lock(&call->conn->trans->peer->lock);
112         list_add(&call->error_link, &call->conn->trans->peer->error_targets);
113         spin_unlock(&call->conn->trans->peer->lock);
114
115         call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
116         add_timer(&call->lifetimer);
117
118         _leave(" = %p", call);
119         return call;
120 }
121
122 /*
123  * set up a call for the given data
124  * - called in process context with IRQs enabled
125  */
126 struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
127                                          struct rxrpc_transport *trans,
128                                          struct rxrpc_conn_bundle *bundle,
129                                          unsigned long user_call_ID,
130                                          int create,
131                                          gfp_t gfp)
132 {
133         struct rxrpc_call *call, *candidate;
134         struct rb_node *p, *parent, **pp;
135
136         _enter("%p,%d,%d,%lx,%d",
137                rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
138                user_call_ID, create);
139
140         /* search the extant calls first for one that matches the specified
141          * user ID */
142         read_lock(&rx->call_lock);
143
144         p = rx->calls.rb_node;
145         while (p) {
146                 call = rb_entry(p, struct rxrpc_call, sock_node);
147
148                 if (user_call_ID < call->user_call_ID)
149                         p = p->rb_left;
150                 else if (user_call_ID > call->user_call_ID)
151                         p = p->rb_right;
152                 else
153                         goto found_extant_call;
154         }
155
156         read_unlock(&rx->call_lock);
157
158         if (!create || !trans)
159                 return ERR_PTR(-EBADSLT);
160
161         /* not yet present - create a candidate for a new record and then
162          * redo the search */
163         candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
164         if (IS_ERR(candidate)) {
165                 _leave(" = %ld", PTR_ERR(candidate));
166                 return candidate;
167         }
168
169         candidate->user_call_ID = user_call_ID;
170         __set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
171
172         write_lock(&rx->call_lock);
173
174         pp = &rx->calls.rb_node;
175         parent = NULL;
176         while (*pp) {
177                 parent = *pp;
178                 call = rb_entry(parent, struct rxrpc_call, sock_node);
179
180                 if (user_call_ID < call->user_call_ID)
181                         pp = &(*pp)->rb_left;
182                 else if (user_call_ID > call->user_call_ID)
183                         pp = &(*pp)->rb_right;
184                 else
185                         goto found_extant_second;
186         }
187
188         /* second search also failed; add the new call */
189         call = candidate;
190         candidate = NULL;
191         rxrpc_get_call(call);
192
193         rb_link_node(&call->sock_node, parent, pp);
194         rb_insert_color(&call->sock_node, &rx->calls);
195         write_unlock(&rx->call_lock);
196
197         write_lock_bh(&rxrpc_call_lock);
198         list_add_tail(&call->link, &rxrpc_calls);
199         write_unlock_bh(&rxrpc_call_lock);
200
201         _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
202
203         _leave(" = %p [new]", call);
204         return call;
205
206         /* we found the call in the list immediately */
207 found_extant_call:
208         rxrpc_get_call(call);
209         read_unlock(&rx->call_lock);
210         _leave(" = %p [extant %d]", call, atomic_read(&call->usage));
211         return call;
212
213         /* we found the call on the second time through the list */
214 found_extant_second:
215         rxrpc_get_call(call);
216         write_unlock(&rx->call_lock);
217         rxrpc_put_call(candidate);
218         _leave(" = %p [second %d]", call, atomic_read(&call->usage));
219         return call;
220 }
221
222 /*
223  * set up an incoming call
224  * - called in process context with IRQs enabled
225  */
226 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
227                                        struct rxrpc_connection *conn,
228                                        struct rxrpc_header *hdr,
229                                        gfp_t gfp)
230 {
231         struct rxrpc_call *call, *candidate;
232         struct rb_node **p, *parent;
233         __be32 call_id;
234
235         _enter(",%d,,%x", conn->debug_id, gfp);
236
237         ASSERT(rx != NULL);
238
239         candidate = rxrpc_alloc_call(gfp);
240         if (!candidate)
241                 return ERR_PTR(-EBUSY);
242
243         candidate->socket = rx;
244         candidate->conn = conn;
245         candidate->cid = hdr->cid;
246         candidate->call_id = hdr->callNumber;
247         candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
248         candidate->rx_data_post = 0;
249         candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
250         if (conn->security_ix > 0)
251                 candidate->state = RXRPC_CALL_SERVER_SECURING;
252
253         write_lock_bh(&conn->lock);
254
255         /* set the channel for this call */
256         call = conn->channels[candidate->channel];
257         _debug("channel[%u] is %p", candidate->channel, call);
258         if (call && call->call_id == hdr->callNumber) {
259                 /* already set; must've been a duplicate packet */
260                 _debug("extant call [%d]", call->state);
261                 ASSERTCMP(call->conn, ==, conn);
262
263                 read_lock(&call->state_lock);
264                 switch (call->state) {
265                 case RXRPC_CALL_LOCALLY_ABORTED:
266                         if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
267                                 rxrpc_queue_call(call);
268                 case RXRPC_CALL_REMOTELY_ABORTED:
269                         read_unlock(&call->state_lock);
270                         goto aborted_call;
271                 default:
272                         rxrpc_get_call(call);
273                         read_unlock(&call->state_lock);
274                         goto extant_call;
275                 }
276         }
277
278         if (call) {
279                 /* it seems the channel is still in use from the previous call
280                  * - ditch the old binding if its call is now complete */
281                 _debug("CALL: %u { %s }",
282                        call->debug_id, rxrpc_call_states[call->state]);
283
284                 if (call->state >= RXRPC_CALL_COMPLETE) {
285                         conn->channels[call->channel] = NULL;
286                 } else {
287                         write_unlock_bh(&conn->lock);
288                         kmem_cache_free(rxrpc_call_jar, candidate);
289                         _leave(" = -EBUSY");
290                         return ERR_PTR(-EBUSY);
291                 }
292         }
293
294         /* check the call number isn't duplicate */
295         _debug("check dup");
296         call_id = hdr->callNumber;
297         p = &conn->calls.rb_node;
298         parent = NULL;
299         while (*p) {
300                 parent = *p;
301                 call = rb_entry(parent, struct rxrpc_call, conn_node);
302
303                 if (call_id < call->call_id)
304                         p = &(*p)->rb_left;
305                 else if (call_id > call->call_id)
306                         p = &(*p)->rb_right;
307                 else
308                         goto old_call;
309         }
310
311         /* make the call available */
312         _debug("new call");
313         call = candidate;
314         candidate = NULL;
315         rb_link_node(&call->conn_node, parent, p);
316         rb_insert_color(&call->conn_node, &conn->calls);
317         conn->channels[call->channel] = call;
318         sock_hold(&rx->sk);
319         atomic_inc(&conn->usage);
320         write_unlock_bh(&conn->lock);
321
322         spin_lock(&conn->trans->peer->lock);
323         list_add(&call->error_link, &conn->trans->peer->error_targets);
324         spin_unlock(&conn->trans->peer->lock);
325
326         write_lock_bh(&rxrpc_call_lock);
327         list_add_tail(&call->link, &rxrpc_calls);
328         write_unlock_bh(&rxrpc_call_lock);
329
330         _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
331
332         call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
333         add_timer(&call->lifetimer);
334         _leave(" = %p {%d} [new]", call, call->debug_id);
335         return call;
336
337 extant_call:
338         write_unlock_bh(&conn->lock);
339         kmem_cache_free(rxrpc_call_jar, candidate);
340         _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
341         return call;
342
343 aborted_call:
344         write_unlock_bh(&conn->lock);
345         kmem_cache_free(rxrpc_call_jar, candidate);
346         _leave(" = -ECONNABORTED");
347         return ERR_PTR(-ECONNABORTED);
348
349 old_call:
350         write_unlock_bh(&conn->lock);
351         kmem_cache_free(rxrpc_call_jar, candidate);
352         _leave(" = -ECONNRESET [old]");
353         return ERR_PTR(-ECONNRESET);
354 }
355
356 /*
357  * find an extant server call
358  * - called in process context with IRQs enabled
359  */
360 struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
361                                           unsigned long user_call_ID)
362 {
363         struct rxrpc_call *call;
364         struct rb_node *p;
365
366         _enter("%p,%lx", rx, user_call_ID);
367
368         /* search the extant calls for one that matches the specified user
369          * ID */
370         read_lock(&rx->call_lock);
371
372         p = rx->calls.rb_node;
373         while (p) {
374                 call = rb_entry(p, struct rxrpc_call, sock_node);
375
376                 if (user_call_ID < call->user_call_ID)
377                         p = p->rb_left;
378                 else if (user_call_ID > call->user_call_ID)
379                         p = p->rb_right;
380                 else
381                         goto found_extant_call;
382         }
383
384         read_unlock(&rx->call_lock);
385         _leave(" = NULL");
386         return NULL;
387
388         /* we found the call in the list immediately */
389 found_extant_call:
390         rxrpc_get_call(call);
391         read_unlock(&rx->call_lock);
392         _leave(" = %p [%d]", call, atomic_read(&call->usage));
393         return call;
394 }
395
396 /*
397  * detach a call from a socket and set up for release
398  */
399 void rxrpc_release_call(struct rxrpc_call *call)
400 {
401         struct rxrpc_connection *conn = call->conn;
402         struct rxrpc_sock *rx = call->socket;
403
404         _enter("{%d,%d,%d,%d}",
405                call->debug_id, atomic_read(&call->usage),
406                atomic_read(&call->ackr_not_idle),
407                call->rx_first_oos);
408
409         spin_lock_bh(&call->lock);
410         if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
411                 BUG();
412         spin_unlock_bh(&call->lock);
413
414         /* dissociate from the socket
415          * - the socket's ref on the call is passed to the death timer
416          */
417         _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
418
419         write_lock_bh(&rx->call_lock);
420         if (!list_empty(&call->accept_link)) {
421                 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
422                        call, call->events, call->flags);
423                 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
424                 list_del_init(&call->accept_link);
425                 sk_acceptq_removed(&rx->sk);
426         } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
427                 rb_erase(&call->sock_node, &rx->calls);
428                 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
429                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
430         }
431         write_unlock_bh(&rx->call_lock);
432
433         /* free up the channel for reuse */
434         spin_lock(&conn->trans->client_lock);
435         write_lock_bh(&conn->lock);
436         write_lock(&call->state_lock);
437
438         if (conn->channels[call->channel] == call)
439                 conn->channels[call->channel] = NULL;
440
441         if (conn->out_clientflag && conn->bundle) {
442                 conn->avail_calls++;
443                 switch (conn->avail_calls) {
444                 case 1:
445                         list_move_tail(&conn->bundle_link,
446                                        &conn->bundle->avail_conns);
447                 case 2 ... RXRPC_MAXCALLS - 1:
448                         ASSERT(conn->channels[0] == NULL ||
449                                conn->channels[1] == NULL ||
450                                conn->channels[2] == NULL ||
451                                conn->channels[3] == NULL);
452                         break;
453                 case RXRPC_MAXCALLS:
454                         list_move_tail(&conn->bundle_link,
455                                        &conn->bundle->unused_conns);
456                         ASSERT(conn->channels[0] == NULL &&
457                                conn->channels[1] == NULL &&
458                                conn->channels[2] == NULL &&
459                                conn->channels[3] == NULL);
460                         break;
461                 default:
462                         printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
463                                conn->avail_calls);
464                         BUG();
465                 }
466         }
467
468         spin_unlock(&conn->trans->client_lock);
469
470         if (call->state < RXRPC_CALL_COMPLETE &&
471             call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
472                 _debug("+++ ABORTING STATE %d +++\n", call->state);
473                 call->state = RXRPC_CALL_LOCALLY_ABORTED;
474                 call->abort_code = RX_CALL_DEAD;
475                 set_bit(RXRPC_CALL_ABORT, &call->events);
476                 rxrpc_queue_call(call);
477         }
478         write_unlock(&call->state_lock);
479         write_unlock_bh(&conn->lock);
480
481         /* clean up the Rx queue */
482         if (!skb_queue_empty(&call->rx_queue) ||
483             !skb_queue_empty(&call->rx_oos_queue)) {
484                 struct rxrpc_skb_priv *sp;
485                 struct sk_buff *skb;
486
487                 _debug("purge Rx queues");
488
489                 spin_lock_bh(&call->lock);
490                 while ((skb = skb_dequeue(&call->rx_queue)) ||
491                        (skb = skb_dequeue(&call->rx_oos_queue))) {
492                         sp = rxrpc_skb(skb);
493                         if (sp->call) {
494                                 ASSERTCMP(sp->call, ==, call);
495                                 rxrpc_put_call(call);
496                                 sp->call = NULL;
497                         }
498                         skb->destructor = NULL;
499                         spin_unlock_bh(&call->lock);
500
501                         _debug("- zap %s %%%u #%u",
502                                rxrpc_pkts[sp->hdr.type],
503                                ntohl(sp->hdr.serial),
504                                ntohl(sp->hdr.seq));
505                         rxrpc_free_skb(skb);
506                         spin_lock_bh(&call->lock);
507                 }
508                 spin_unlock_bh(&call->lock);
509
510                 ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
511         }
512
513         del_timer_sync(&call->resend_timer);
514         del_timer_sync(&call->ack_timer);
515         del_timer_sync(&call->lifetimer);
516         call->deadspan.expires = jiffies + rxrpc_dead_call_timeout * HZ;
517         add_timer(&call->deadspan);
518
519         _leave("");
520 }
521
522 /*
523  * handle a dead call being ready for reaping
524  */
525 static void rxrpc_dead_call_expired(unsigned long _call)
526 {
527         struct rxrpc_call *call = (struct rxrpc_call *) _call;
528
529         _enter("{%d}", call->debug_id);
530
531         write_lock_bh(&call->state_lock);
532         call->state = RXRPC_CALL_DEAD;
533         write_unlock_bh(&call->state_lock);
534         rxrpc_put_call(call);
535 }
536
537 /*
538  * mark a call as to be released, aborting it if it's still in progress
539  * - called with softirqs disabled
540  */
541 static void rxrpc_mark_call_released(struct rxrpc_call *call)
542 {
543         bool sched;
544
545         write_lock(&call->state_lock);
546         if (call->state < RXRPC_CALL_DEAD) {
547                 sched = false;
548                 if (call->state < RXRPC_CALL_COMPLETE) {
549                         _debug("abort call %p", call);
550                         call->state = RXRPC_CALL_LOCALLY_ABORTED;
551                         call->abort_code = RX_CALL_DEAD;
552                         if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
553                                 sched = true;
554                 }
555                 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
556                         sched = true;
557                 if (sched)
558                         rxrpc_queue_call(call);
559         }
560         write_unlock(&call->state_lock);
561 }
562
563 /*
564  * release all the calls associated with a socket
565  */
566 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
567 {
568         struct rxrpc_call *call;
569         struct rb_node *p;
570
571         _enter("%p", rx);
572
573         read_lock_bh(&rx->call_lock);
574
575         /* mark all the calls as no longer wanting incoming packets */
576         for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
577                 call = rb_entry(p, struct rxrpc_call, sock_node);
578                 rxrpc_mark_call_released(call);
579         }
580
581         /* kill the not-yet-accepted incoming calls */
582         list_for_each_entry(call, &rx->secureq, accept_link) {
583                 rxrpc_mark_call_released(call);
584         }
585
586         list_for_each_entry(call, &rx->acceptq, accept_link) {
587                 rxrpc_mark_call_released(call);
588         }
589
590         read_unlock_bh(&rx->call_lock);
591         _leave("");
592 }
593
594 /*
595  * release a call
596  */
597 void __rxrpc_put_call(struct rxrpc_call *call)
598 {
599         ASSERT(call != NULL);
600
601         _enter("%p{u=%d}", call, atomic_read(&call->usage));
602
603         ASSERTCMP(atomic_read(&call->usage), >, 0);
604
605         if (atomic_dec_and_test(&call->usage)) {
606                 _debug("call %d dead", call->debug_id);
607                 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
608                 rxrpc_queue_work(&call->destroyer);
609         }
610         _leave("");
611 }
612
613 /*
614  * clean up a call
615  */
616 static void rxrpc_cleanup_call(struct rxrpc_call *call)
617 {
618         _net("DESTROY CALL %d", call->debug_id);
619
620         ASSERT(call->socket);
621
622         memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
623
624         del_timer_sync(&call->lifetimer);
625         del_timer_sync(&call->deadspan);
626         del_timer_sync(&call->ack_timer);
627         del_timer_sync(&call->resend_timer);
628
629         ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
630         ASSERTCMP(call->events, ==, 0);
631         if (work_pending(&call->processor)) {
632                 _debug("defer destroy");
633                 rxrpc_queue_work(&call->destroyer);
634                 return;
635         }
636
637         if (call->conn) {
638                 spin_lock(&call->conn->trans->peer->lock);
639                 list_del(&call->error_link);
640                 spin_unlock(&call->conn->trans->peer->lock);
641
642                 write_lock_bh(&call->conn->lock);
643                 rb_erase(&call->conn_node, &call->conn->calls);
644                 write_unlock_bh(&call->conn->lock);
645                 rxrpc_put_connection(call->conn);
646         }
647
648         if (call->acks_window) {
649                 _debug("kill Tx window %d",
650                        CIRC_CNT(call->acks_head, call->acks_tail,
651                                 call->acks_winsz));
652                 smp_mb();
653                 while (CIRC_CNT(call->acks_head, call->acks_tail,
654                                 call->acks_winsz) > 0) {
655                         struct rxrpc_skb_priv *sp;
656                         unsigned long _skb;
657
658                         _skb = call->acks_window[call->acks_tail] & ~1;
659                         sp = rxrpc_skb((struct sk_buff *) _skb);
660                         _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
661                         rxrpc_free_skb((struct sk_buff *) _skb);
662                         call->acks_tail =
663                                 (call->acks_tail + 1) & (call->acks_winsz - 1);
664                 }
665
666                 kfree(call->acks_window);
667         }
668
669         rxrpc_free_skb(call->tx_pending);
670
671         rxrpc_purge_queue(&call->rx_queue);
672         ASSERT(skb_queue_empty(&call->rx_oos_queue));
673         sock_put(&call->socket->sk);
674         kmem_cache_free(rxrpc_call_jar, call);
675 }
676
677 /*
678  * destroy a call
679  */
680 static void rxrpc_destroy_call(struct work_struct *work)
681 {
682         struct rxrpc_call *call =
683                 container_of(work, struct rxrpc_call, destroyer);
684
685         _enter("%p{%d,%d,%p}",
686                call, atomic_read(&call->usage), call->channel, call->conn);
687
688         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
689
690         write_lock_bh(&rxrpc_call_lock);
691         list_del_init(&call->link);
692         write_unlock_bh(&rxrpc_call_lock);
693
694         rxrpc_cleanup_call(call);
695         _leave("");
696 }
697
698 /*
699  * preemptively destroy all the call records from a transport endpoint rather
700  * than waiting for them to time out
701  */
702 void __exit rxrpc_destroy_all_calls(void)
703 {
704         struct rxrpc_call *call;
705
706         _enter("");
707         write_lock_bh(&rxrpc_call_lock);
708
709         while (!list_empty(&rxrpc_calls)) {
710                 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
711                 _debug("Zapping call %p", call);
712
713                 list_del_init(&call->link);
714
715                 switch (atomic_read(&call->usage)) {
716                 case 0:
717                         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
718                         break;
719                 case 1:
720                         if (del_timer_sync(&call->deadspan) != 0 &&
721                             call->state != RXRPC_CALL_DEAD)
722                                 rxrpc_dead_call_expired((unsigned long) call);
723                         if (call->state != RXRPC_CALL_DEAD)
724                                 break;
725                 default:
726                         printk(KERN_ERR "RXRPC:"
727                                " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
728                                call, atomic_read(&call->usage),
729                                atomic_read(&call->ackr_not_idle),
730                                rxrpc_call_states[call->state],
731                                call->flags, call->events);
732                         if (!skb_queue_empty(&call->rx_queue))
733                                 printk(KERN_ERR"RXRPC: Rx queue occupied\n");
734                         if (!skb_queue_empty(&call->rx_oos_queue))
735                                 printk(KERN_ERR"RXRPC: OOS queue occupied\n");
736                         break;
737                 }
738
739                 write_unlock_bh(&rxrpc_call_lock);
740                 cond_resched();
741                 write_lock_bh(&rxrpc_call_lock);
742         }
743
744         write_unlock_bh(&rxrpc_call_lock);
745         _leave("");
746 }
747
748 /*
749  * handle call lifetime being exceeded
750  */
751 static void rxrpc_call_life_expired(unsigned long _call)
752 {
753         struct rxrpc_call *call = (struct rxrpc_call *) _call;
754
755         if (call->state >= RXRPC_CALL_COMPLETE)
756                 return;
757
758         _enter("{%d}", call->debug_id);
759         read_lock_bh(&call->state_lock);
760         if (call->state < RXRPC_CALL_COMPLETE) {
761                 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
762                 rxrpc_queue_call(call);
763         }
764         read_unlock_bh(&call->state_lock);
765 }
766
767 /*
768  * handle resend timer expiry
769  */
770 static void rxrpc_resend_time_expired(unsigned long _call)
771 {
772         struct rxrpc_call *call = (struct rxrpc_call *) _call;
773
774         _enter("{%d}", call->debug_id);
775
776         if (call->state >= RXRPC_CALL_COMPLETE)
777                 return;
778
779         read_lock_bh(&call->state_lock);
780         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
781         if (call->state < RXRPC_CALL_COMPLETE &&
782             !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
783                 rxrpc_queue_call(call);
784         read_unlock_bh(&call->state_lock);
785 }
786
787 /*
788  * handle ACK timer expiry
789  */
790 static void rxrpc_ack_time_expired(unsigned long _call)
791 {
792         struct rxrpc_call *call = (struct rxrpc_call *) _call;
793
794         _enter("{%d}", call->debug_id);
795
796         if (call->state >= RXRPC_CALL_COMPLETE)
797                 return;
798
799         read_lock_bh(&call->state_lock);
800         if (call->state < RXRPC_CALL_COMPLETE &&
801             !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
802                 rxrpc_queue_call(call);
803         read_unlock_bh(&call->state_lock);
804 }