ac31cceda2f170009ca58b28f459a7a8a9483278
[linux-3.10.git] / net / rxrpc / ar-call.c
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/module.h>
13 #include <linux/circ_buf.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include "ar-internal.h"
17
18 struct kmem_cache *rxrpc_call_jar;
19 LIST_HEAD(rxrpc_calls);
20 DEFINE_RWLOCK(rxrpc_call_lock);
21 static unsigned rxrpc_call_max_lifetime = 60;
22 static unsigned rxrpc_dead_call_timeout = 10;
23
24 static void rxrpc_destroy_call(struct work_struct *work);
25 static void rxrpc_call_life_expired(unsigned long _call);
26 static void rxrpc_dead_call_expired(unsigned long _call);
27 static void rxrpc_ack_time_expired(unsigned long _call);
28 static void rxrpc_resend_time_expired(unsigned long _call);
29
30 /*
31  * allocate a new call
32  */
33 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
34 {
35         struct rxrpc_call *call;
36
37         call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
38         if (!call)
39                 return NULL;
40
41         call->acks_winsz = 16;
42         call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
43                                     gfp);
44         if (!call->acks_window) {
45                 kmem_cache_free(rxrpc_call_jar, call);
46                 return NULL;
47         }
48
49         setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
50                     (unsigned long) call);
51         setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
52                     (unsigned long) call);
53         setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
54                     (unsigned long) call);
55         setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
56                     (unsigned long) call);
57         INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
58         INIT_WORK(&call->processor, &rxrpc_process_call);
59         INIT_LIST_HEAD(&call->accept_link);
60         skb_queue_head_init(&call->rx_queue);
61         skb_queue_head_init(&call->rx_oos_queue);
62         init_waitqueue_head(&call->tx_waitq);
63         spin_lock_init(&call->lock);
64         rwlock_init(&call->state_lock);
65         atomic_set(&call->usage, 1);
66         call->debug_id = atomic_inc_return(&rxrpc_debug_id);
67         call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
68
69         memset(&call->sock_node, 0xed, sizeof(call->sock_node));
70
71         call->rx_data_expect = 1;
72         call->rx_data_eaten = 0;
73         call->rx_first_oos = 0;
74         call->ackr_win_top = call->rx_data_eaten + 1 + RXRPC_MAXACKS;
75         call->creation_jif = jiffies;
76         return call;
77 }
78
79 /*
80  * allocate a new client call and attempt to to get a connection slot for it
81  */
82 static struct rxrpc_call *rxrpc_alloc_client_call(
83         struct rxrpc_sock *rx,
84         struct rxrpc_transport *trans,
85         struct rxrpc_conn_bundle *bundle,
86         gfp_t gfp)
87 {
88         struct rxrpc_call *call;
89         int ret;
90
91         _enter("");
92
93         ASSERT(rx != NULL);
94         ASSERT(trans != NULL);
95         ASSERT(bundle != NULL);
96
97         call = rxrpc_alloc_call(gfp);
98         if (!call)
99                 return ERR_PTR(-ENOMEM);
100
101         sock_hold(&rx->sk);
102         call->socket = rx;
103         call->rx_data_post = 1;
104
105         ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
106         if (ret < 0) {
107                 kmem_cache_free(rxrpc_call_jar, call);
108                 return ERR_PTR(ret);
109         }
110
111         spin_lock(&call->conn->trans->peer->lock);
112         list_add(&call->error_link, &call->conn->trans->peer->error_targets);
113         spin_unlock(&call->conn->trans->peer->lock);
114
115         call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
116         add_timer(&call->lifetimer);
117
118         _leave(" = %p", call);
119         return call;
120 }
121
122 /*
123  * set up a call for the given data
124  * - called in process context with IRQs enabled
125  */
126 struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
127                                          struct rxrpc_transport *trans,
128                                          struct rxrpc_conn_bundle *bundle,
129                                          unsigned long user_call_ID,
130                                          int create,
131                                          gfp_t gfp)
132 {
133         struct rxrpc_call *call, *candidate;
134         struct rb_node *p, *parent, **pp;
135
136         _enter("%p,%d,%d,%lx,%d",
137                rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
138                user_call_ID, create);
139
140         /* search the extant calls first for one that matches the specified
141          * user ID */
142         read_lock(&rx->call_lock);
143
144         p = rx->calls.rb_node;
145         while (p) {
146                 call = rb_entry(p, struct rxrpc_call, sock_node);
147
148                 if (user_call_ID < call->user_call_ID)
149                         p = p->rb_left;
150                 else if (user_call_ID > call->user_call_ID)
151                         p = p->rb_right;
152                 else
153                         goto found_extant_call;
154         }
155
156         read_unlock(&rx->call_lock);
157
158         if (!create || !trans)
159                 return ERR_PTR(-EBADSLT);
160
161         /* not yet present - create a candidate for a new record and then
162          * redo the search */
163         candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
164         if (IS_ERR(candidate)) {
165                 _leave(" = %ld", PTR_ERR(candidate));
166                 return candidate;
167         }
168
169         candidate->user_call_ID = user_call_ID;
170         __set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
171
172         write_lock(&rx->call_lock);
173
174         pp = &rx->calls.rb_node;
175         parent = NULL;
176         while (*pp) {
177                 parent = *pp;
178                 call = rb_entry(parent, struct rxrpc_call, sock_node);
179
180                 if (user_call_ID < call->user_call_ID)
181                         pp = &(*pp)->rb_left;
182                 else if (user_call_ID > call->user_call_ID)
183                         pp = &(*pp)->rb_right;
184                 else
185                         goto found_extant_second;
186         }
187
188         /* second search also failed; add the new call */
189         call = candidate;
190         candidate = NULL;
191         rxrpc_get_call(call);
192
193         rb_link_node(&call->sock_node, parent, pp);
194         rb_insert_color(&call->sock_node, &rx->calls);
195         write_unlock(&rx->call_lock);
196
197         write_lock_bh(&rxrpc_call_lock);
198         list_add_tail(&call->link, &rxrpc_calls);
199         write_unlock_bh(&rxrpc_call_lock);
200
201         _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
202
203         _leave(" = %p [new]", call);
204         return call;
205
206         /* we found the call in the list immediately */
207 found_extant_call:
208         rxrpc_get_call(call);
209         read_unlock(&rx->call_lock);
210         _leave(" = %p [extant %d]", call, atomic_read(&call->usage));
211         return call;
212
213         /* we found the call on the second time through the list */
214 found_extant_second:
215         rxrpc_get_call(call);
216         write_unlock(&rx->call_lock);
217         rxrpc_put_call(candidate);
218         _leave(" = %p [second %d]", call, atomic_read(&call->usage));
219         return call;
220 }
221
222 /*
223  * set up an incoming call
224  * - called in process context with IRQs enabled
225  */
226 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
227                                        struct rxrpc_connection *conn,
228                                        struct rxrpc_header *hdr,
229                                        gfp_t gfp)
230 {
231         struct rxrpc_call *call, *candidate;
232         struct rb_node **p, *parent;
233         __be32 call_id;
234
235         _enter(",%d,,%x", conn->debug_id, gfp);
236
237         ASSERT(rx != NULL);
238
239         candidate = rxrpc_alloc_call(gfp);
240         if (!candidate)
241                 return ERR_PTR(-EBUSY);
242
243         candidate->socket = rx;
244         candidate->conn = conn;
245         candidate->cid = hdr->cid;
246         candidate->call_id = hdr->callNumber;
247         candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
248         candidate->rx_data_post = 0;
249         candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
250         if (conn->security_ix > 0)
251                 candidate->state = RXRPC_CALL_SERVER_SECURING;
252
253         write_lock_bh(&conn->lock);
254
255         /* set the channel for this call */
256         call = conn->channels[candidate->channel];
257         _debug("channel[%u] is %p", candidate->channel, call);
258         if (call && call->call_id == hdr->callNumber) {
259                 /* already set; must've been a duplicate packet */
260                 _debug("extant call [%d]", call->state);
261                 ASSERTCMP(call->conn, ==, conn);
262
263                 read_lock(&call->state_lock);
264                 switch (call->state) {
265                 case RXRPC_CALL_LOCALLY_ABORTED:
266                         if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
267                                 schedule_work(&call->processor);
268                 case RXRPC_CALL_REMOTELY_ABORTED:
269                         read_unlock(&call->state_lock);
270                         goto aborted_call;
271                 default:
272                         rxrpc_get_call(call);
273                         read_unlock(&call->state_lock);
274                         goto extant_call;
275                 }
276         }
277
278         if (call) {
279                 /* it seems the channel is still in use from the previous call
280                  * - ditch the old binding if its call is now complete */
281                 _debug("CALL: %u { %s }",
282                        call->debug_id, rxrpc_call_states[call->state]);
283
284                 if (call->state >= RXRPC_CALL_COMPLETE) {
285                         conn->channels[call->channel] = NULL;
286                 } else {
287                         write_unlock_bh(&conn->lock);
288                         kmem_cache_free(rxrpc_call_jar, candidate);
289                         _leave(" = -EBUSY");
290                         return ERR_PTR(-EBUSY);
291                 }
292         }
293
294         /* check the call number isn't duplicate */
295         _debug("check dup");
296         call_id = hdr->callNumber;
297         p = &conn->calls.rb_node;
298         parent = NULL;
299         while (*p) {
300                 parent = *p;
301                 call = rb_entry(parent, struct rxrpc_call, conn_node);
302
303                 if (call_id < call->call_id)
304                         p = &(*p)->rb_left;
305                 else if (call_id > call->call_id)
306                         p = &(*p)->rb_right;
307                 else
308                         goto old_call;
309         }
310
311         /* make the call available */
312         _debug("new call");
313         call = candidate;
314         candidate = NULL;
315         rb_link_node(&call->conn_node, parent, p);
316         rb_insert_color(&call->conn_node, &conn->calls);
317         conn->channels[call->channel] = call;
318         sock_hold(&rx->sk);
319         atomic_inc(&conn->usage);
320         write_unlock_bh(&conn->lock);
321
322         spin_lock(&conn->trans->peer->lock);
323         list_add(&call->error_link, &conn->trans->peer->error_targets);
324         spin_unlock(&conn->trans->peer->lock);
325
326         write_lock_bh(&rxrpc_call_lock);
327         list_add_tail(&call->link, &rxrpc_calls);
328         write_unlock_bh(&rxrpc_call_lock);
329
330         _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
331
332         call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
333         add_timer(&call->lifetimer);
334         _leave(" = %p {%d} [new]", call, call->debug_id);
335         return call;
336
337 extant_call:
338         write_unlock_bh(&conn->lock);
339         kmem_cache_free(rxrpc_call_jar, candidate);
340         _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
341         return call;
342
343 aborted_call:
344         write_unlock_bh(&conn->lock);
345         kmem_cache_free(rxrpc_call_jar, candidate);
346         _leave(" = -ECONNABORTED");
347         return ERR_PTR(-ECONNABORTED);
348
349 old_call:
350         write_unlock_bh(&conn->lock);
351         kmem_cache_free(rxrpc_call_jar, candidate);
352         _leave(" = -ECONNRESET [old]");
353         return ERR_PTR(-ECONNRESET);
354 }
355
356 /*
357  * find an extant server call
358  * - called in process context with IRQs enabled
359  */
360 struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
361                                           unsigned long user_call_ID)
362 {
363         struct rxrpc_call *call;
364         struct rb_node *p;
365
366         _enter("%p,%lx", rx, user_call_ID);
367
368         /* search the extant calls for one that matches the specified user
369          * ID */
370         read_lock(&rx->call_lock);
371
372         p = rx->calls.rb_node;
373         while (p) {
374                 call = rb_entry(p, struct rxrpc_call, sock_node);
375
376                 if (user_call_ID < call->user_call_ID)
377                         p = p->rb_left;
378                 else if (user_call_ID > call->user_call_ID)
379                         p = p->rb_right;
380                 else
381                         goto found_extant_call;
382         }
383
384         read_unlock(&rx->call_lock);
385         _leave(" = NULL");
386         return NULL;
387
388         /* we found the call in the list immediately */
389 found_extant_call:
390         rxrpc_get_call(call);
391         read_unlock(&rx->call_lock);
392         _leave(" = %p [%d]", call, atomic_read(&call->usage));
393         return call;
394 }
395
396 /*
397  * detach a call from a socket and set up for release
398  */
399 void rxrpc_release_call(struct rxrpc_call *call)
400 {
401         struct rxrpc_sock *rx = call->socket;
402
403         _enter("{%d,%d,%d,%d}",
404                call->debug_id, atomic_read(&call->usage),
405                atomic_read(&call->ackr_not_idle),
406                call->rx_first_oos);
407
408         spin_lock_bh(&call->lock);
409         if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
410                 BUG();
411         spin_unlock_bh(&call->lock);
412
413         /* dissociate from the socket
414          * - the socket's ref on the call is passed to the death timer
415          */
416         _debug("RELEASE CALL %p (%d CONN %p)",
417                call, call->debug_id, call->conn);
418
419         write_lock_bh(&rx->call_lock);
420         if (!list_empty(&call->accept_link)) {
421                 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
422                        call, call->events, call->flags);
423                 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
424                 list_del_init(&call->accept_link);
425                 sk_acceptq_removed(&rx->sk);
426         } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
427                 rb_erase(&call->sock_node, &rx->calls);
428                 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
429                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
430         }
431         write_unlock_bh(&rx->call_lock);
432
433         if (call->conn->out_clientflag)
434                 spin_lock(&call->conn->trans->client_lock);
435         write_lock_bh(&call->conn->lock);
436
437         /* free up the channel for reuse */
438         if (call->conn->out_clientflag) {
439                 call->conn->avail_calls++;
440                 if (call->conn->avail_calls == RXRPC_MAXCALLS)
441                         list_move_tail(&call->conn->bundle_link,
442                                        &call->conn->bundle->unused_conns);
443                 else if (call->conn->avail_calls == 1)
444                         list_move_tail(&call->conn->bundle_link,
445                                        &call->conn->bundle->avail_conns);
446         }
447
448         write_lock(&call->state_lock);
449         if (call->conn->channels[call->channel] == call)
450                 call->conn->channels[call->channel] = NULL;
451
452         if (call->state < RXRPC_CALL_COMPLETE &&
453             call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
454                 _debug("+++ ABORTING STATE %d +++\n", call->state);
455                 call->state = RXRPC_CALL_LOCALLY_ABORTED;
456                 call->abort_code = RX_CALL_DEAD;
457                 set_bit(RXRPC_CALL_ABORT, &call->events);
458                 schedule_work(&call->processor);
459         }
460         write_unlock(&call->state_lock);
461         write_unlock_bh(&call->conn->lock);
462         if (call->conn->out_clientflag)
463                 spin_unlock(&call->conn->trans->client_lock);
464
465         if (!skb_queue_empty(&call->rx_queue) ||
466             !skb_queue_empty(&call->rx_oos_queue)) {
467                 struct rxrpc_skb_priv *sp;
468                 struct sk_buff *skb;
469
470                 _debug("purge Rx queues");
471
472                 spin_lock_bh(&call->lock);
473                 while ((skb = skb_dequeue(&call->rx_queue)) ||
474                        (skb = skb_dequeue(&call->rx_oos_queue))) {
475                         sp = rxrpc_skb(skb);
476                         if (sp->call) {
477                                 ASSERTCMP(sp->call, ==, call);
478                                 rxrpc_put_call(call);
479                                 sp->call = NULL;
480                         }
481                         skb->destructor = NULL;
482                         spin_unlock_bh(&call->lock);
483
484                         _debug("- zap %s %%%u #%u",
485                                rxrpc_pkts[sp->hdr.type],
486                                ntohl(sp->hdr.serial),
487                                ntohl(sp->hdr.seq));
488                         rxrpc_free_skb(skb);
489                         spin_lock_bh(&call->lock);
490                 }
491                 spin_unlock_bh(&call->lock);
492
493                 ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
494         }
495
496         del_timer_sync(&call->resend_timer);
497         del_timer_sync(&call->ack_timer);
498         del_timer_sync(&call->lifetimer);
499         call->deadspan.expires = jiffies + rxrpc_dead_call_timeout * HZ;
500         add_timer(&call->deadspan);
501
502         _leave("");
503 }
504
505 /*
506  * handle a dead call being ready for reaping
507  */
508 static void rxrpc_dead_call_expired(unsigned long _call)
509 {
510         struct rxrpc_call *call = (struct rxrpc_call *) _call;
511
512         _enter("{%d}", call->debug_id);
513
514         write_lock_bh(&call->state_lock);
515         call->state = RXRPC_CALL_DEAD;
516         write_unlock_bh(&call->state_lock);
517         rxrpc_put_call(call);
518 }
519
520 /*
521  * mark a call as to be released, aborting it if it's still in progress
522  * - called with softirqs disabled
523  */
524 static void rxrpc_mark_call_released(struct rxrpc_call *call)
525 {
526         bool sched;
527
528         write_lock(&call->state_lock);
529         if (call->state < RXRPC_CALL_DEAD) {
530                 sched = false;
531                 if (call->state < RXRPC_CALL_COMPLETE) {
532                         _debug("abort call %p", call);
533                         call->state = RXRPC_CALL_LOCALLY_ABORTED;
534                         call->abort_code = RX_CALL_DEAD;
535                         if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
536                                 sched = true;
537                 }
538                 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
539                         sched = true;
540                 if (sched)
541                         schedule_work(&call->processor);
542         }
543         write_unlock(&call->state_lock);
544 }
545
546 /*
547  * release all the calls associated with a socket
548  */
549 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
550 {
551         struct rxrpc_call *call;
552         struct rb_node *p;
553
554         _enter("%p", rx);
555
556         read_lock_bh(&rx->call_lock);
557
558         /* mark all the calls as no longer wanting incoming packets */
559         for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
560                 call = rb_entry(p, struct rxrpc_call, sock_node);
561                 rxrpc_mark_call_released(call);
562         }
563
564         /* kill the not-yet-accepted incoming calls */
565         list_for_each_entry(call, &rx->secureq, accept_link) {
566                 rxrpc_mark_call_released(call);
567         }
568
569         list_for_each_entry(call, &rx->acceptq, accept_link) {
570                 rxrpc_mark_call_released(call);
571         }
572
573         read_unlock_bh(&rx->call_lock);
574         _leave("");
575 }
576
577 /*
578  * release a call
579  */
580 void __rxrpc_put_call(struct rxrpc_call *call)
581 {
582         ASSERT(call != NULL);
583
584         _enter("%p{u=%d}", call, atomic_read(&call->usage));
585
586         ASSERTCMP(atomic_read(&call->usage), >, 0);
587
588         if (atomic_dec_and_test(&call->usage)) {
589                 _debug("call %d dead", call->debug_id);
590                 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
591                 schedule_work(&call->destroyer);
592         }
593         _leave("");
594 }
595
596 /*
597  * clean up a call
598  */
599 static void rxrpc_cleanup_call(struct rxrpc_call *call)
600 {
601         _net("DESTROY CALL %d", call->debug_id);
602
603         ASSERT(call->socket);
604
605         memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
606
607         del_timer_sync(&call->lifetimer);
608         del_timer_sync(&call->deadspan);
609         del_timer_sync(&call->ack_timer);
610         del_timer_sync(&call->resend_timer);
611
612         ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
613         ASSERTCMP(call->events, ==, 0);
614         if (work_pending(&call->processor)) {
615                 _debug("defer destroy");
616                 schedule_work(&call->destroyer);
617                 return;
618         }
619
620         if (call->conn) {
621                 spin_lock(&call->conn->trans->peer->lock);
622                 list_del(&call->error_link);
623                 spin_unlock(&call->conn->trans->peer->lock);
624
625                 write_lock_bh(&call->conn->lock);
626                 rb_erase(&call->conn_node, &call->conn->calls);
627                 write_unlock_bh(&call->conn->lock);
628                 rxrpc_put_connection(call->conn);
629         }
630
631         if (call->acks_window) {
632                 _debug("kill Tx window %d",
633                        CIRC_CNT(call->acks_head, call->acks_tail,
634                                 call->acks_winsz));
635                 smp_mb();
636                 while (CIRC_CNT(call->acks_head, call->acks_tail,
637                                 call->acks_winsz) > 0) {
638                         struct rxrpc_skb_priv *sp;
639                         unsigned long _skb;
640
641                         _skb = call->acks_window[call->acks_tail] & ~1;
642                         sp = rxrpc_skb((struct sk_buff *) _skb);
643                         _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
644                         rxrpc_free_skb((struct sk_buff *) _skb);
645                         call->acks_tail =
646                                 (call->acks_tail + 1) & (call->acks_winsz - 1);
647                 }
648
649                 kfree(call->acks_window);
650         }
651
652         rxrpc_free_skb(call->tx_pending);
653
654         rxrpc_purge_queue(&call->rx_queue);
655         ASSERT(skb_queue_empty(&call->rx_oos_queue));
656         sock_put(&call->socket->sk);
657         kmem_cache_free(rxrpc_call_jar, call);
658 }
659
660 /*
661  * destroy a call
662  */
663 static void rxrpc_destroy_call(struct work_struct *work)
664 {
665         struct rxrpc_call *call =
666                 container_of(work, struct rxrpc_call, destroyer);
667
668         _enter("%p{%d,%d,%p}",
669                call, atomic_read(&call->usage), call->channel, call->conn);
670
671         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
672
673         write_lock_bh(&rxrpc_call_lock);
674         list_del_init(&call->link);
675         write_unlock_bh(&rxrpc_call_lock);
676
677         rxrpc_cleanup_call(call);
678         _leave("");
679 }
680
681 /*
682  * preemptively destroy all the call records from a transport endpoint rather
683  * than waiting for them to time out
684  */
685 void __exit rxrpc_destroy_all_calls(void)
686 {
687         struct rxrpc_call *call;
688
689         _enter("");
690         write_lock_bh(&rxrpc_call_lock);
691
692         while (!list_empty(&rxrpc_calls)) {
693                 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
694                 _debug("Zapping call %p", call);
695
696                 list_del_init(&call->link);
697
698                 switch (atomic_read(&call->usage)) {
699                 case 0:
700                         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
701                         break;
702                 case 1:
703                         if (del_timer_sync(&call->deadspan) != 0 &&
704                             call->state != RXRPC_CALL_DEAD)
705                                 rxrpc_dead_call_expired((unsigned long) call);
706                         if (call->state != RXRPC_CALL_DEAD)
707                                 break;
708                 default:
709                         printk(KERN_ERR "RXRPC:"
710                                " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
711                                call, atomic_read(&call->usage),
712                                atomic_read(&call->ackr_not_idle),
713                                rxrpc_call_states[call->state],
714                                call->flags, call->events);
715                         if (!skb_queue_empty(&call->rx_queue))
716                                 printk(KERN_ERR"RXRPC: Rx queue occupied\n");
717                         if (!skb_queue_empty(&call->rx_oos_queue))
718                                 printk(KERN_ERR"RXRPC: OOS queue occupied\n");
719                         break;
720                 }
721
722                 write_unlock_bh(&rxrpc_call_lock);
723                 cond_resched();
724                 write_lock_bh(&rxrpc_call_lock);
725         }
726
727         write_unlock_bh(&rxrpc_call_lock);
728         _leave("");
729 }
730
731 /*
732  * handle call lifetime being exceeded
733  */
734 static void rxrpc_call_life_expired(unsigned long _call)
735 {
736         struct rxrpc_call *call = (struct rxrpc_call *) _call;
737
738         if (call->state >= RXRPC_CALL_COMPLETE)
739                 return;
740
741         _enter("{%d}", call->debug_id);
742         read_lock_bh(&call->state_lock);
743         if (call->state < RXRPC_CALL_COMPLETE) {
744                 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
745                 schedule_work(&call->processor);
746         }
747         read_unlock_bh(&call->state_lock);
748 }
749
750 /*
751  * handle resend timer expiry
752  */
753 static void rxrpc_resend_time_expired(unsigned long _call)
754 {
755         struct rxrpc_call *call = (struct rxrpc_call *) _call;
756
757         _enter("{%d}", call->debug_id);
758
759         if (call->state >= RXRPC_CALL_COMPLETE)
760                 return;
761
762         read_lock_bh(&call->state_lock);
763         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
764         if (call->state < RXRPC_CALL_COMPLETE &&
765             !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
766                 schedule_work(&call->processor);
767         read_unlock_bh(&call->state_lock);
768 }
769
770 /*
771  * handle ACK timer expiry
772  */
773 static void rxrpc_ack_time_expired(unsigned long _call)
774 {
775         struct rxrpc_call *call = (struct rxrpc_call *) _call;
776
777         _enter("{%d}", call->debug_id);
778
779         if (call->state >= RXRPC_CALL_COMPLETE)
780                 return;
781
782         read_lock_bh(&call->state_lock);
783         if (call->state < RXRPC_CALL_COMPLETE &&
784             !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
785                 schedule_work(&call->processor);
786         read_unlock_bh(&call->state_lock);
787 }