RDS: rds_send_xmit() locking/irq fixes
[linux-2.6.git] / net / rds / tcp_listen.c
1 /*
2  * Copyright (c) 2006 Oracle.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/gfp.h>
35 #include <linux/in.h>
36 #include <net/tcp.h>
37
38 #include "rds.h"
39 #include "tcp.h"
40
41 /*
42  * cheesy, but simple..
43  */
44 static void rds_tcp_accept_worker(struct work_struct *work);
45 static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker);
46 static struct socket *rds_tcp_listen_sock;
47
48 static int rds_tcp_accept_one(struct socket *sock)
49 {
50         struct socket *new_sock = NULL;
51         struct rds_connection *conn;
52         int ret;
53         struct inet_sock *inet;
54
55         ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
56                                sock->sk->sk_protocol, &new_sock);
57         if (ret)
58                 goto out;
59
60         new_sock->type = sock->type;
61         new_sock->ops = sock->ops;
62         ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
63         if (ret < 0)
64                 goto out;
65
66         rds_tcp_tune(new_sock);
67
68         inet = inet_sk(new_sock->sk);
69
70         rdsdebug("accepted tcp %pI4:%u -> %pI4:%u\n",
71                  &inet->inet_saddr, ntohs(inet->inet_sport),
72                  &inet->inet_daddr, ntohs(inet->inet_dport));
73
74         conn = rds_conn_create(inet->inet_saddr, inet->inet_daddr,
75                                &rds_tcp_transport, GFP_KERNEL);
76         if (IS_ERR(conn)) {
77                 ret = PTR_ERR(conn);
78                 goto out;
79         }
80
81         /*
82          * see the comment above rds_queue_delayed_reconnect()
83          */
84         if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
85                 if (rds_conn_state(conn) == RDS_CONN_UP)
86                         rds_tcp_stats_inc(s_tcp_listen_closed_stale);
87                 else
88                         rds_tcp_stats_inc(s_tcp_connect_raced);
89                 rds_conn_drop(conn);
90                 ret = 0;
91                 goto out;
92         }
93
94         rds_tcp_set_callbacks(new_sock, conn);
95         rds_connect_complete(conn);
96         new_sock = NULL;
97         ret = 0;
98
99 out:
100         if (new_sock)
101                 sock_release(new_sock);
102         return ret;
103 }
104
105 static void rds_tcp_accept_worker(struct work_struct *work)
106 {
107         while (rds_tcp_accept_one(rds_tcp_listen_sock) == 0)
108                 cond_resched();
109 }
110
111 void rds_tcp_listen_data_ready(struct sock *sk, int bytes)
112 {
113         void (*ready)(struct sock *sk, int bytes);
114
115         rdsdebug("listen data ready sk %p\n", sk);
116
117         read_lock(&sk->sk_callback_lock);
118         ready = sk->sk_user_data;
119         if (!ready) { /* check for teardown race */
120                 ready = sk->sk_data_ready;
121                 goto out;
122         }
123
124         /*
125          * ->sk_data_ready is also called for a newly established child socket
126          * before it has been accepted and the accepter has set up their
127          * data_ready.. we only want to queue listen work for our listening
128          * socket
129          */
130         if (sk->sk_state == TCP_LISTEN)
131                 queue_work(rds_wq, &rds_tcp_listen_work);
132
133 out:
134         read_unlock(&sk->sk_callback_lock);
135         ready(sk, bytes);
136 }
137
138 int __init rds_tcp_listen_init(void)
139 {
140         struct sockaddr_in sin;
141         struct socket *sock = NULL;
142         int ret;
143
144         ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
145         if (ret < 0)
146                 goto out;
147
148         sock->sk->sk_reuse = 1;
149         rds_tcp_nonagle(sock);
150
151         write_lock_bh(&sock->sk->sk_callback_lock);
152         sock->sk->sk_user_data = sock->sk->sk_data_ready;
153         sock->sk->sk_data_ready = rds_tcp_listen_data_ready;
154         write_unlock_bh(&sock->sk->sk_callback_lock);
155
156         sin.sin_family = PF_INET,
157         sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
158         sin.sin_port = (__force u16)htons(RDS_TCP_PORT);
159
160         ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
161         if (ret < 0)
162                 goto out;
163
164         ret = sock->ops->listen(sock, 64);
165         if (ret < 0)
166                 goto out;
167
168         rds_tcp_listen_sock = sock;
169         sock = NULL;
170 out:
171         if (sock)
172                 sock_release(sock);
173         return ret;
174 }
175
176 void rds_tcp_listen_stop(void)
177 {
178         struct socket *sock = rds_tcp_listen_sock;
179         struct sock *sk;
180
181         if (!sock)
182                 return;
183
184         sk = sock->sk;
185
186         /* serialize with and prevent further callbacks */
187         lock_sock(sk);
188         write_lock_bh(&sk->sk_callback_lock);
189         if (sk->sk_user_data) {
190                 sk->sk_data_ready = sk->sk_user_data;
191                 sk->sk_user_data = NULL;
192         }
193         write_unlock_bh(&sk->sk_callback_lock);
194         release_sock(sk);
195
196         /* wait for accepts to stop and close the socket */
197         flush_workqueue(rds_wq);
198         sock_release(sock);
199         rds_tcp_listen_sock = NULL;
200 }