[AF_RXRPC]: Add an interface to the AF_RXRPC module for the AFS filesystem to use
[linux-2.6.git] / net / rxrpc / ar-connection.c
1 /* RxRPC virtual connection handler
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/module.h>
13 #include <linux/net.h>
14 #include <linux/skbuff.h>
15 #include <linux/crypto.h>
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19
20 static void rxrpc_connection_reaper(struct work_struct *work);
21
22 LIST_HEAD(rxrpc_connections);
23 DEFINE_RWLOCK(rxrpc_connection_lock);
24 static unsigned long rxrpc_connection_timeout = 10 * 60;
25 static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper);
26
27 /*
28  * allocate a new client connection bundle
29  */
30 static struct rxrpc_conn_bundle *rxrpc_alloc_bundle(gfp_t gfp)
31 {
32         struct rxrpc_conn_bundle *bundle;
33
34         _enter("");
35
36         bundle = kzalloc(sizeof(struct rxrpc_conn_bundle), gfp);
37         if (bundle) {
38                 INIT_LIST_HEAD(&bundle->unused_conns);
39                 INIT_LIST_HEAD(&bundle->avail_conns);
40                 INIT_LIST_HEAD(&bundle->busy_conns);
41                 init_waitqueue_head(&bundle->chanwait);
42                 atomic_set(&bundle->usage, 1);
43         }
44
45         _leave(" = %p", bundle);
46         return bundle;
47 }
48
49 /*
50  * compare bundle parameters with what we're looking for
51  * - return -ve, 0 or +ve
52  */
53 static inline
54 int rxrpc_cmp_bundle(const struct rxrpc_conn_bundle *bundle,
55                      struct key *key, __be16 service_id)
56 {
57         return (bundle->service_id - service_id) ?:
58                 ((unsigned long) bundle->key - (unsigned long) key);
59 }
60
61 /*
62  * get bundle of client connections that a client socket can make use of
63  */
64 struct rxrpc_conn_bundle *rxrpc_get_bundle(struct rxrpc_sock *rx,
65                                            struct rxrpc_transport *trans,
66                                            struct key *key,
67                                            __be16 service_id,
68                                            gfp_t gfp)
69 {
70         struct rxrpc_conn_bundle *bundle, *candidate;
71         struct rb_node *p, *parent, **pp;
72
73         _enter("%p{%x},%x,%hx,",
74                rx, key_serial(key), trans->debug_id, ntohl(service_id));
75
76         if (rx->trans == trans && rx->bundle) {
77                 atomic_inc(&rx->bundle->usage);
78                 return rx->bundle;
79         }
80
81         /* search the extant bundles first for one that matches the specified
82          * user ID */
83         spin_lock(&trans->client_lock);
84
85         p = trans->bundles.rb_node;
86         while (p) {
87                 bundle = rb_entry(p, struct rxrpc_conn_bundle, node);
88
89                 if (rxrpc_cmp_bundle(bundle, key, service_id) < 0)
90                         p = p->rb_left;
91                 else if (rxrpc_cmp_bundle(bundle, key, service_id) > 0)
92                         p = p->rb_right;
93                 else
94                         goto found_extant_bundle;
95         }
96
97         spin_unlock(&trans->client_lock);
98
99         /* not yet present - create a candidate for a new record and then
100          * redo the search */
101         candidate = rxrpc_alloc_bundle(gfp);
102         if (!candidate) {
103                 _leave(" = -ENOMEM");
104                 return ERR_PTR(-ENOMEM);
105         }
106
107         candidate->key = key_get(key);
108         candidate->service_id = service_id;
109
110         spin_lock(&trans->client_lock);
111
112         pp = &trans->bundles.rb_node;
113         parent = NULL;
114         while (*pp) {
115                 parent = *pp;
116                 bundle = rb_entry(parent, struct rxrpc_conn_bundle, node);
117
118                 if (rxrpc_cmp_bundle(bundle, key, service_id) < 0)
119                         pp = &(*pp)->rb_left;
120                 else if (rxrpc_cmp_bundle(bundle, key, service_id) > 0)
121                         pp = &(*pp)->rb_right;
122                 else
123                         goto found_extant_second;
124         }
125
126         /* second search also failed; add the new bundle */
127         bundle = candidate;
128         candidate = NULL;
129
130         rb_link_node(&bundle->node, parent, pp);
131         rb_insert_color(&bundle->node, &trans->bundles);
132         spin_unlock(&trans->client_lock);
133         _net("BUNDLE new on trans %d", trans->debug_id);
134         if (!rx->bundle && rx->sk.sk_state == RXRPC_CLIENT_CONNECTED) {
135                 atomic_inc(&bundle->usage);
136                 rx->bundle = bundle;
137         }
138         _leave(" = %p [new]", bundle);
139         return bundle;
140
141         /* we found the bundle in the list immediately */
142 found_extant_bundle:
143         atomic_inc(&bundle->usage);
144         spin_unlock(&trans->client_lock);
145         _net("BUNDLE old on trans %d", trans->debug_id);
146         if (!rx->bundle && rx->sk.sk_state == RXRPC_CLIENT_CONNECTED) {
147                 atomic_inc(&bundle->usage);
148                 rx->bundle = bundle;
149         }
150         _leave(" = %p [extant %d]", bundle, atomic_read(&bundle->usage));
151         return bundle;
152
153         /* we found the bundle on the second time through the list */
154 found_extant_second:
155         atomic_inc(&bundle->usage);
156         spin_unlock(&trans->client_lock);
157         kfree(candidate);
158         _net("BUNDLE old2 on trans %d", trans->debug_id);
159         if (!rx->bundle && rx->sk.sk_state == RXRPC_CLIENT_CONNECTED) {
160                 atomic_inc(&bundle->usage);
161                 rx->bundle = bundle;
162         }
163         _leave(" = %p [second %d]", bundle, atomic_read(&bundle->usage));
164         return bundle;
165 }
166
167 /*
168  * release a bundle
169  */
170 void rxrpc_put_bundle(struct rxrpc_transport *trans,
171                       struct rxrpc_conn_bundle *bundle)
172 {
173         _enter("%p,%p{%d}",trans, bundle, atomic_read(&bundle->usage));
174
175         if (atomic_dec_and_lock(&bundle->usage, &trans->client_lock)) {
176                 _debug("Destroy bundle");
177                 rb_erase(&bundle->node, &trans->bundles);
178                 spin_unlock(&trans->client_lock);
179                 ASSERT(list_empty(&bundle->unused_conns));
180                 ASSERT(list_empty(&bundle->avail_conns));
181                 ASSERT(list_empty(&bundle->busy_conns));
182                 ASSERTCMP(bundle->num_conns, ==, 0);
183                 key_put(bundle->key);
184                 kfree(bundle);
185         }
186
187         _leave("");
188 }
189
190 /*
191  * allocate a new connection
192  */
193 static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
194 {
195         struct rxrpc_connection *conn;
196
197         _enter("");
198
199         conn = kzalloc(sizeof(struct rxrpc_connection), gfp);
200         if (conn) {
201                 INIT_WORK(&conn->processor, &rxrpc_process_connection);
202                 INIT_LIST_HEAD(&conn->bundle_link);
203                 conn->calls = RB_ROOT;
204                 skb_queue_head_init(&conn->rx_queue);
205                 rwlock_init(&conn->lock);
206                 spin_lock_init(&conn->state_lock);
207                 atomic_set(&conn->usage, 1);
208                 conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
209                 conn->avail_calls = RXRPC_MAXCALLS;
210                 conn->size_align = 4;
211                 conn->header_size = sizeof(struct rxrpc_header);
212         }
213
214         _leave(" = %p{%d}", conn, conn->debug_id);
215         return conn;
216 }
217
218 /*
219  * assign a connection ID to a connection and add it to the transport's
220  * connection lookup tree
221  * - called with transport client lock held
222  */
223 static void rxrpc_assign_connection_id(struct rxrpc_connection *conn)
224 {
225         struct rxrpc_connection *xconn;
226         struct rb_node *parent, **p;
227         __be32 epoch;
228         u32 real_conn_id;
229
230         _enter("");
231
232         epoch = conn->epoch;
233
234         write_lock_bh(&conn->trans->conn_lock);
235
236         conn->trans->conn_idcounter += RXRPC_CID_INC;
237         if (conn->trans->conn_idcounter < RXRPC_CID_INC)
238                 conn->trans->conn_idcounter = RXRPC_CID_INC;
239         real_conn_id = conn->trans->conn_idcounter;
240
241 attempt_insertion:
242         parent = NULL;
243         p = &conn->trans->client_conns.rb_node;
244
245         while (*p) {
246                 parent = *p;
247                 xconn = rb_entry(parent, struct rxrpc_connection, node);
248
249                 if (epoch < xconn->epoch)
250                         p = &(*p)->rb_left;
251                 else if (epoch > xconn->epoch)
252                         p = &(*p)->rb_right;
253                 else if (real_conn_id < xconn->real_conn_id)
254                         p = &(*p)->rb_left;
255                 else if (real_conn_id > xconn->real_conn_id)
256                         p = &(*p)->rb_right;
257                 else
258                         goto id_exists;
259         }
260
261         /* we've found a suitable hole - arrange for this connection to occupy
262          * it */
263         rb_link_node(&conn->node, parent, p);
264         rb_insert_color(&conn->node, &conn->trans->client_conns);
265
266         conn->real_conn_id = real_conn_id;
267         conn->cid = htonl(real_conn_id);
268         write_unlock_bh(&conn->trans->conn_lock);
269         _leave(" [CONNID %x CID %x]", real_conn_id, ntohl(conn->cid));
270         return;
271
272         /* we found a connection with the proposed ID - walk the tree from that
273          * point looking for the next unused ID */
274 id_exists:
275         for (;;) {
276                 real_conn_id += RXRPC_CID_INC;
277                 if (real_conn_id < RXRPC_CID_INC) {
278                         real_conn_id = RXRPC_CID_INC;
279                         conn->trans->conn_idcounter = real_conn_id;
280                         goto attempt_insertion;
281                 }
282
283                 parent = rb_next(parent);
284                 if (!parent)
285                         goto attempt_insertion;
286
287                 xconn = rb_entry(parent, struct rxrpc_connection, node);
288                 if (epoch < xconn->epoch ||
289                     real_conn_id < xconn->real_conn_id)
290                         goto attempt_insertion;
291         }
292 }
293
294 /*
295  * add a call to a connection's call-by-ID tree
296  */
297 static void rxrpc_add_call_ID_to_conn(struct rxrpc_connection *conn,
298                                       struct rxrpc_call *call)
299 {
300         struct rxrpc_call *xcall;
301         struct rb_node *parent, **p;
302         __be32 call_id;
303
304         write_lock_bh(&conn->lock);
305
306         call_id = call->call_id;
307         p = &conn->calls.rb_node;
308         parent = NULL;
309         while (*p) {
310                 parent = *p;
311                 xcall = rb_entry(parent, struct rxrpc_call, conn_node);
312
313                 if (call_id < xcall->call_id)
314                         p = &(*p)->rb_left;
315                 else if (call_id > xcall->call_id)
316                         p = &(*p)->rb_right;
317                 else
318                         BUG();
319         }
320
321         rb_link_node(&call->conn_node, parent, p);
322         rb_insert_color(&call->conn_node, &conn->calls);
323
324         write_unlock_bh(&conn->lock);
325 }
326
327 /*
328  * connect a call on an exclusive connection
329  */
330 static int rxrpc_connect_exclusive(struct rxrpc_sock *rx,
331                                    struct rxrpc_transport *trans,
332                                    __be16 service_id,
333                                    struct rxrpc_call *call,
334                                    gfp_t gfp)
335 {
336         struct rxrpc_connection *conn;
337         int chan, ret;
338
339         _enter("");
340
341         conn = rx->conn;
342         if (!conn) {
343                 /* not yet present - create a candidate for a new connection
344                  * and then redo the check */
345                 conn = rxrpc_alloc_connection(gfp);
346                 if (IS_ERR(conn)) {
347                         _leave(" = %ld", PTR_ERR(conn));
348                         return PTR_ERR(conn);
349                 }
350
351                 conn->trans = trans;
352                 conn->bundle = NULL;
353                 conn->service_id = service_id;
354                 conn->epoch = rxrpc_epoch;
355                 conn->in_clientflag = 0;
356                 conn->out_clientflag = RXRPC_CLIENT_INITIATED;
357                 conn->cid = 0;
358                 conn->state = RXRPC_CONN_CLIENT;
359                 conn->avail_calls = RXRPC_MAXCALLS - 1;
360                 conn->security_level = rx->min_sec_level;
361                 conn->key = key_get(rx->key);
362
363                 ret = rxrpc_init_client_conn_security(conn);
364                 if (ret < 0) {
365                         key_put(conn->key);
366                         kfree(conn);
367                         _leave(" = %d [key]", ret);
368                         return ret;
369                 }
370
371                 write_lock_bh(&rxrpc_connection_lock);
372                 list_add_tail(&conn->link, &rxrpc_connections);
373                 write_unlock_bh(&rxrpc_connection_lock);
374
375                 spin_lock(&trans->client_lock);
376                 atomic_inc(&trans->usage);
377
378                 _net("CONNECT EXCL new %d on TRANS %d",
379                      conn->debug_id, conn->trans->debug_id);
380
381                 rxrpc_assign_connection_id(conn);
382                 rx->conn = conn;
383         }
384
385         /* we've got a connection with a free channel and we can now attach the
386          * call to it
387          * - we're holding the transport's client lock
388          * - we're holding a reference on the connection
389          */
390         for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
391                 if (!conn->channels[chan])
392                         goto found_channel;
393         goto no_free_channels;
394
395 found_channel:
396         atomic_inc(&conn->usage);
397         conn->channels[chan] = call;
398         call->conn = conn;
399         call->channel = chan;
400         call->cid = conn->cid | htonl(chan);
401         call->call_id = htonl(++conn->call_counter);
402
403         _net("CONNECT client on conn %d chan %d as call %x",
404              conn->debug_id, chan, ntohl(call->call_id));
405
406         spin_unlock(&trans->client_lock);
407
408         rxrpc_add_call_ID_to_conn(conn, call);
409         _leave(" = 0");
410         return 0;
411
412 no_free_channels:
413         spin_unlock(&trans->client_lock);
414         _leave(" = -ENOSR");
415         return -ENOSR;
416 }
417
418 /*
419  * find a connection for a call
420  * - called in process context with IRQs enabled
421  */
422 int rxrpc_connect_call(struct rxrpc_sock *rx,
423                        struct rxrpc_transport *trans,
424                        struct rxrpc_conn_bundle *bundle,
425                        struct rxrpc_call *call,
426                        gfp_t gfp)
427 {
428         struct rxrpc_connection *conn, *candidate;
429         int chan, ret;
430
431         DECLARE_WAITQUEUE(myself, current);
432
433         _enter("%p,%lx,", rx, call->user_call_ID);
434
435         if (test_bit(RXRPC_SOCK_EXCLUSIVE_CONN, &rx->flags))
436                 return rxrpc_connect_exclusive(rx, trans, bundle->service_id,
437                                                call, gfp);
438
439         spin_lock(&trans->client_lock);
440         for (;;) {
441                 /* see if the bundle has a call slot available */
442                 if (!list_empty(&bundle->avail_conns)) {
443                         _debug("avail");
444                         conn = list_entry(bundle->avail_conns.next,
445                                           struct rxrpc_connection,
446                                           bundle_link);
447                         if (--conn->avail_calls == 0)
448                                 list_move(&conn->bundle_link,
449                                           &bundle->busy_conns);
450                         ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
451                         ASSERT(conn->channels[0] == NULL ||
452                                conn->channels[1] == NULL ||
453                                conn->channels[2] == NULL ||
454                                conn->channels[3] == NULL);
455                         atomic_inc(&conn->usage);
456                         break;
457                 }
458
459                 if (!list_empty(&bundle->unused_conns)) {
460                         _debug("unused");
461                         conn = list_entry(bundle->unused_conns.next,
462                                           struct rxrpc_connection,
463                                           bundle_link);
464                         ASSERTCMP(conn->avail_calls, ==, RXRPC_MAXCALLS);
465                         conn->avail_calls = RXRPC_MAXCALLS - 1;
466                         ASSERT(conn->channels[0] == NULL &&
467                                conn->channels[1] == NULL &&
468                                conn->channels[2] == NULL &&
469                                conn->channels[3] == NULL);
470                         atomic_inc(&conn->usage);
471                         list_move(&conn->bundle_link, &bundle->avail_conns);
472                         break;
473                 }
474
475                 /* need to allocate a new connection */
476                 _debug("get new conn [%d]", bundle->num_conns);
477
478                 spin_unlock(&trans->client_lock);
479
480                 if (signal_pending(current))
481                         goto interrupted;
482
483                 if (bundle->num_conns >= 20) {
484                         _debug("too many conns");
485
486                         if (!(gfp & __GFP_WAIT)) {
487                                 _leave(" = -EAGAIN");
488                                 return -EAGAIN;
489                         }
490
491                         add_wait_queue(&bundle->chanwait, &myself);
492                         for (;;) {
493                                 set_current_state(TASK_INTERRUPTIBLE);
494                                 if (bundle->num_conns < 20 ||
495                                     !list_empty(&bundle->unused_conns) ||
496                                     !list_empty(&bundle->avail_conns))
497                                         break;
498                                 if (signal_pending(current))
499                                         goto interrupted_dequeue;
500                                 schedule();
501                         }
502                         remove_wait_queue(&bundle->chanwait, &myself);
503                         __set_current_state(TASK_RUNNING);
504                         spin_lock(&trans->client_lock);
505                         continue;
506                 }
507
508                 /* not yet present - create a candidate for a new connection and then
509                  * redo the check */
510                 candidate = rxrpc_alloc_connection(gfp);
511                 if (IS_ERR(candidate)) {
512                         _leave(" = %ld", PTR_ERR(candidate));
513                         return PTR_ERR(candidate);
514                 }
515
516                 candidate->trans = trans;
517                 candidate->bundle = bundle;
518                 candidate->service_id = bundle->service_id;
519                 candidate->epoch = rxrpc_epoch;
520                 candidate->in_clientflag = 0;
521                 candidate->out_clientflag = RXRPC_CLIENT_INITIATED;
522                 candidate->cid = 0;
523                 candidate->state = RXRPC_CONN_CLIENT;
524                 candidate->avail_calls = RXRPC_MAXCALLS;
525                 candidate->security_level = rx->min_sec_level;
526                 candidate->key = key_get(bundle->key);
527
528                 ret = rxrpc_init_client_conn_security(candidate);
529                 if (ret < 0) {
530                         key_put(candidate->key);
531                         kfree(candidate);
532                         _leave(" = %d [key]", ret);
533                         return ret;
534                 }
535
536                 write_lock_bh(&rxrpc_connection_lock);
537                 list_add_tail(&candidate->link, &rxrpc_connections);
538                 write_unlock_bh(&rxrpc_connection_lock);
539
540                 spin_lock(&trans->client_lock);
541
542                 list_add(&candidate->bundle_link, &bundle->unused_conns);
543                 bundle->num_conns++;
544                 atomic_inc(&bundle->usage);
545                 atomic_inc(&trans->usage);
546
547                 _net("CONNECT new %d on TRANS %d",
548                      candidate->debug_id, candidate->trans->debug_id);
549
550                 rxrpc_assign_connection_id(candidate);
551                 if (candidate->security)
552                         candidate->security->prime_packet_security(candidate);
553
554                 /* leave the candidate lurking in zombie mode attached to the
555                  * bundle until we're ready for it */
556                 rxrpc_put_connection(candidate);
557                 candidate = NULL;
558         }
559
560         /* we've got a connection with a free channel and we can now attach the
561          * call to it
562          * - we're holding the transport's client lock
563          * - we're holding a reference on the connection
564          * - we're holding a reference on the bundle
565          */
566         for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
567                 if (!conn->channels[chan])
568                         goto found_channel;
569         ASSERT(conn->channels[0] == NULL ||
570                conn->channels[1] == NULL ||
571                conn->channels[2] == NULL ||
572                conn->channels[3] == NULL);
573         BUG();
574
575 found_channel:
576         conn->channels[chan] = call;
577         call->conn = conn;
578         call->channel = chan;
579         call->cid = conn->cid | htonl(chan);
580         call->call_id = htonl(++conn->call_counter);
581
582         _net("CONNECT client on conn %d chan %d as call %x",
583              conn->debug_id, chan, ntohl(call->call_id));
584
585         ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
586         spin_unlock(&trans->client_lock);
587
588         rxrpc_add_call_ID_to_conn(conn, call);
589
590         _leave(" = 0");
591         return 0;
592
593 interrupted_dequeue:
594         remove_wait_queue(&bundle->chanwait, &myself);
595         __set_current_state(TASK_RUNNING);
596 interrupted:
597         _leave(" = -ERESTARTSYS");
598         return -ERESTARTSYS;
599 }
600
601 /*
602  * get a record of an incoming connection
603  */
604 struct rxrpc_connection *
605 rxrpc_incoming_connection(struct rxrpc_transport *trans,
606                           struct rxrpc_header *hdr,
607                           gfp_t gfp)
608 {
609         struct rxrpc_connection *conn, *candidate = NULL;
610         struct rb_node *p, **pp;
611         const char *new = "old";
612         __be32 epoch;
613         u32 conn_id;
614
615         _enter("");
616
617         ASSERT(hdr->flags & RXRPC_CLIENT_INITIATED);
618
619         epoch = hdr->epoch;
620         conn_id = ntohl(hdr->cid) & RXRPC_CIDMASK;
621
622         /* search the connection list first */
623         read_lock_bh(&trans->conn_lock);
624
625         p = trans->server_conns.rb_node;
626         while (p) {
627                 conn = rb_entry(p, struct rxrpc_connection, node);
628
629                 _debug("maybe %x", conn->real_conn_id);
630
631                 if (epoch < conn->epoch)
632                         p = p->rb_left;
633                 else if (epoch > conn->epoch)
634                         p = p->rb_right;
635                 else if (conn_id < conn->real_conn_id)
636                         p = p->rb_left;
637                 else if (conn_id > conn->real_conn_id)
638                         p = p->rb_right;
639                 else
640                         goto found_extant_connection;
641         }
642         read_unlock_bh(&trans->conn_lock);
643
644         /* not yet present - create a candidate for a new record and then
645          * redo the search */
646         candidate = rxrpc_alloc_connection(gfp);
647         if (!candidate) {
648                 _leave(" = -ENOMEM");
649                 return ERR_PTR(-ENOMEM);
650         }
651
652         candidate->trans = trans;
653         candidate->epoch = hdr->epoch;
654         candidate->cid = hdr->cid & __constant_cpu_to_be32(RXRPC_CIDMASK);
655         candidate->service_id = hdr->serviceId;
656         candidate->security_ix = hdr->securityIndex;
657         candidate->in_clientflag = RXRPC_CLIENT_INITIATED;
658         candidate->out_clientflag = 0;
659         candidate->real_conn_id = conn_id;
660         candidate->state = RXRPC_CONN_SERVER;
661         if (candidate->service_id)
662                 candidate->state = RXRPC_CONN_SERVER_UNSECURED;
663
664         write_lock_bh(&trans->conn_lock);
665
666         pp = &trans->server_conns.rb_node;
667         p = NULL;
668         while (*pp) {
669                 p = *pp;
670                 conn = rb_entry(p, struct rxrpc_connection, node);
671
672                 if (epoch < conn->epoch)
673                         pp = &(*pp)->rb_left;
674                 else if (epoch > conn->epoch)
675                         pp = &(*pp)->rb_right;
676                 else if (conn_id < conn->real_conn_id)
677                         pp = &(*pp)->rb_left;
678                 else if (conn_id > conn->real_conn_id)
679                         pp = &(*pp)->rb_right;
680                 else
681                         goto found_extant_second;
682         }
683
684         /* we can now add the new candidate to the list */
685         conn = candidate;
686         candidate = NULL;
687         rb_link_node(&conn->node, p, pp);
688         rb_insert_color(&conn->node, &trans->server_conns);
689         atomic_inc(&conn->trans->usage);
690
691         write_unlock_bh(&trans->conn_lock);
692
693         write_lock_bh(&rxrpc_connection_lock);
694         list_add_tail(&conn->link, &rxrpc_connections);
695         write_unlock_bh(&rxrpc_connection_lock);
696
697         new = "new";
698
699 success:
700         _net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->real_conn_id);
701
702         _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
703         return conn;
704
705         /* we found the connection in the list immediately */
706 found_extant_connection:
707         if (hdr->securityIndex != conn->security_ix) {
708                 read_unlock_bh(&trans->conn_lock);
709                 goto security_mismatch;
710         }
711         atomic_inc(&conn->usage);
712         read_unlock_bh(&trans->conn_lock);
713         goto success;
714
715         /* we found the connection on the second time through the list */
716 found_extant_second:
717         if (hdr->securityIndex != conn->security_ix) {
718                 write_unlock_bh(&trans->conn_lock);
719                 goto security_mismatch;
720         }
721         atomic_inc(&conn->usage);
722         write_unlock_bh(&trans->conn_lock);
723         kfree(candidate);
724         goto success;
725
726 security_mismatch:
727         kfree(candidate);
728         _leave(" = -EKEYREJECTED");
729         return ERR_PTR(-EKEYREJECTED);
730 }
731
732 /*
733  * find a connection based on transport and RxRPC connection ID for an incoming
734  * packet
735  */
736 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_transport *trans,
737                                                struct rxrpc_header *hdr)
738 {
739         struct rxrpc_connection *conn;
740         struct rb_node *p;
741         __be32 epoch;
742         u32 conn_id;
743
744         _enter(",{%x,%x}", ntohl(hdr->cid), hdr->flags);
745
746         read_lock_bh(&trans->conn_lock);
747
748         conn_id = ntohl(hdr->cid) & RXRPC_CIDMASK;
749         epoch = hdr->epoch;
750
751         if (hdr->flags & RXRPC_CLIENT_INITIATED)
752                 p = trans->server_conns.rb_node;
753         else
754                 p = trans->client_conns.rb_node;
755
756         while (p) {
757                 conn = rb_entry(p, struct rxrpc_connection, node);
758
759                 _debug("maybe %x", conn->real_conn_id);
760
761                 if (epoch < conn->epoch)
762                         p = p->rb_left;
763                 else if (epoch > conn->epoch)
764                         p = p->rb_right;
765                 else if (conn_id < conn->real_conn_id)
766                         p = p->rb_left;
767                 else if (conn_id > conn->real_conn_id)
768                         p = p->rb_right;
769                 else
770                         goto found;
771         }
772
773         read_unlock_bh(&trans->conn_lock);
774         _leave(" = NULL");
775         return NULL;
776
777 found:
778         atomic_inc(&conn->usage);
779         read_unlock_bh(&trans->conn_lock);
780         _leave(" = %p", conn);
781         return conn;
782 }
783
784 /*
785  * release a virtual connection
786  */
787 void rxrpc_put_connection(struct rxrpc_connection *conn)
788 {
789         _enter("%p{u=%d,d=%d}",
790                conn, atomic_read(&conn->usage), conn->debug_id);
791
792         ASSERTCMP(atomic_read(&conn->usage), >, 0);
793
794         conn->put_time = xtime.tv_sec;
795         if (atomic_dec_and_test(&conn->usage)) {
796                 _debug("zombie");
797                 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
798         }
799
800         _leave("");
801 }
802
803 /*
804  * destroy a virtual connection
805  */
806 static void rxrpc_destroy_connection(struct rxrpc_connection *conn)
807 {
808         _enter("%p{%d}", conn, atomic_read(&conn->usage));
809
810         ASSERTCMP(atomic_read(&conn->usage), ==, 0);
811
812         _net("DESTROY CONN %d", conn->debug_id);
813
814         if (conn->bundle)
815                 rxrpc_put_bundle(conn->trans, conn->bundle);
816
817         ASSERT(RB_EMPTY_ROOT(&conn->calls));
818         rxrpc_purge_queue(&conn->rx_queue);
819
820         rxrpc_clear_conn_security(conn);
821         rxrpc_put_transport(conn->trans);
822         kfree(conn);
823         _leave("");
824 }
825
826 /*
827  * reap dead connections
828  */
829 void rxrpc_connection_reaper(struct work_struct *work)
830 {
831         struct rxrpc_connection *conn, *_p;
832         unsigned long now, earliest, reap_time;
833
834         LIST_HEAD(graveyard);
835
836         _enter("");
837
838         now = xtime.tv_sec;
839         earliest = ULONG_MAX;
840
841         write_lock_bh(&rxrpc_connection_lock);
842         list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
843                 _debug("reap CONN %d { u=%d,t=%ld }",
844                        conn->debug_id, atomic_read(&conn->usage),
845                        (long) now - (long) conn->put_time);
846
847                 if (likely(atomic_read(&conn->usage) > 0))
848                         continue;
849
850                 spin_lock(&conn->trans->client_lock);
851                 write_lock(&conn->trans->conn_lock);
852                 reap_time = conn->put_time + rxrpc_connection_timeout;
853
854                 if (atomic_read(&conn->usage) > 0) {
855                         ;
856                 } else if (reap_time <= now) {
857                         list_move_tail(&conn->link, &graveyard);
858                         if (conn->out_clientflag)
859                                 rb_erase(&conn->node,
860                                          &conn->trans->client_conns);
861                         else
862                                 rb_erase(&conn->node,
863                                          &conn->trans->server_conns);
864                         if (conn->bundle) {
865                                 list_del_init(&conn->bundle_link);
866                                 conn->bundle->num_conns--;
867                         }
868
869                 } else if (reap_time < earliest) {
870                         earliest = reap_time;
871                 }
872
873                 write_unlock(&conn->trans->conn_lock);
874                 spin_unlock(&conn->trans->client_lock);
875         }
876         write_unlock_bh(&rxrpc_connection_lock);
877
878         if (earliest != ULONG_MAX) {
879                 _debug("reschedule reaper %ld", (long) earliest - now);
880                 ASSERTCMP(earliest, >, now);
881                 rxrpc_queue_delayed_work(&rxrpc_connection_reap,
882                                          (earliest - now) * HZ);
883         }
884
885         /* then destroy all those pulled out */
886         while (!list_empty(&graveyard)) {
887                 conn = list_entry(graveyard.next, struct rxrpc_connection,
888                                   link);
889                 list_del_init(&conn->link);
890
891                 ASSERTCMP(atomic_read(&conn->usage), ==, 0);
892                 rxrpc_destroy_connection(conn);
893         }
894
895         _leave("");
896 }
897
898 /*
899  * preemptively destroy all the connection records rather than waiting for them
900  * to time out
901  */
902 void __exit rxrpc_destroy_all_connections(void)
903 {
904         _enter("");
905
906         rxrpc_connection_timeout = 0;
907         cancel_delayed_work(&rxrpc_connection_reap);
908         rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
909
910         _leave("");
911 }