net/ceph: Only clear SOCK_NOSPACE when there is sufficient space in the socket buffer
[linux-2.6.git] / net / ceph / messenger.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/crc32c.h>
4 #include <linux/ctype.h>
5 #include <linux/highmem.h>
6 #include <linux/inet.h>
7 #include <linux/kthread.h>
8 #include <linux/net.h>
9 #include <linux/slab.h>
10 #include <linux/socket.h>
11 #include <linux/string.h>
12 #include <linux/bio.h>
13 #include <linux/blkdev.h>
14 #include <linux/dns_resolver.h>
15 #include <net/tcp.h>
16
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/messenger.h>
19 #include <linux/ceph/decode.h>
20 #include <linux/ceph/pagelist.h>
21 #include <linux/export.h>
22
23 /*
24  * Ceph uses the messenger to exchange ceph_msg messages with other
25  * hosts in the system.  The messenger provides ordered and reliable
26  * delivery.  We tolerate TCP disconnects by reconnecting (with
27  * exponential backoff) in the case of a fault (disconnection, bad
28  * crc, protocol error).  Acks allow sent messages to be discarded by
29  * the sender.
30  */
31
32 /* static tag bytes (protocol control messages) */
33 static char tag_msg = CEPH_MSGR_TAG_MSG;
34 static char tag_ack = CEPH_MSGR_TAG_ACK;
35 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
36
37 #ifdef CONFIG_LOCKDEP
38 static struct lock_class_key socket_class;
39 #endif
40
41
42 static void queue_con(struct ceph_connection *con);
43 static void con_work(struct work_struct *);
44 static void ceph_fault(struct ceph_connection *con);
45
46 /*
47  * nicely render a sockaddr as a string.
48  */
49 #define MAX_ADDR_STR 20
50 #define MAX_ADDR_STR_LEN 60
51 static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
52 static DEFINE_SPINLOCK(addr_str_lock);
53 static int last_addr_str;
54
55 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
56 {
57         int i;
58         char *s;
59         struct sockaddr_in *in4 = (void *)ss;
60         struct sockaddr_in6 *in6 = (void *)ss;
61
62         spin_lock(&addr_str_lock);
63         i = last_addr_str++;
64         if (last_addr_str == MAX_ADDR_STR)
65                 last_addr_str = 0;
66         spin_unlock(&addr_str_lock);
67         s = addr_str[i];
68
69         switch (ss->ss_family) {
70         case AF_INET:
71                 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
72                          (unsigned int)ntohs(in4->sin_port));
73                 break;
74
75         case AF_INET6:
76                 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
77                          (unsigned int)ntohs(in6->sin6_port));
78                 break;
79
80         default:
81                 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
82                          (int)ss->ss_family);
83         }
84
85         return s;
86 }
87 EXPORT_SYMBOL(ceph_pr_addr);
88
89 static void encode_my_addr(struct ceph_messenger *msgr)
90 {
91         memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
92         ceph_encode_addr(&msgr->my_enc_addr);
93 }
94
95 /*
96  * work queue for all reading and writing to/from the socket.
97  */
98 struct workqueue_struct *ceph_msgr_wq;
99
100 int ceph_msgr_init(void)
101 {
102         ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
103         if (!ceph_msgr_wq) {
104                 pr_err("msgr_init failed to create workqueue\n");
105                 return -ENOMEM;
106         }
107         return 0;
108 }
109 EXPORT_SYMBOL(ceph_msgr_init);
110
111 void ceph_msgr_exit(void)
112 {
113         destroy_workqueue(ceph_msgr_wq);
114 }
115 EXPORT_SYMBOL(ceph_msgr_exit);
116
117 void ceph_msgr_flush(void)
118 {
119         flush_workqueue(ceph_msgr_wq);
120 }
121 EXPORT_SYMBOL(ceph_msgr_flush);
122
123
124 /*
125  * socket callback functions
126  */
127
128 /* data available on socket, or listen socket received a connect */
129 static void ceph_data_ready(struct sock *sk, int count_unused)
130 {
131         struct ceph_connection *con =
132                 (struct ceph_connection *)sk->sk_user_data;
133         if (sk->sk_state != TCP_CLOSE_WAIT) {
134                 dout("ceph_data_ready on %p state = %lu, queueing work\n",
135                      con, con->state);
136                 queue_con(con);
137         }
138 }
139
140 /* socket has buffer space for writing */
141 static void ceph_write_space(struct sock *sk)
142 {
143         struct ceph_connection *con =
144                 (struct ceph_connection *)sk->sk_user_data;
145
146         /* only queue to workqueue if there is data we want to write,
147          * and there is sufficient space in the socket buffer to accept
148          * more data.  clear SOCK_NOSPACE so that ceph_write_space()
149          * doesn't get called again until try_write() fills the socket
150          * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
151          * and net/core/stream.c:sk_stream_write_space().
152          */
153         if (test_bit(WRITE_PENDING, &con->state)) {
154                 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
155                         dout("ceph_write_space %p queueing write work\n", con);
156                         clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
157                         queue_con(con);
158                 }
159         } else {
160                 dout("ceph_write_space %p nothing to write\n", con);
161         }
162 }
163
164 /* socket's state has changed */
165 static void ceph_state_change(struct sock *sk)
166 {
167         struct ceph_connection *con =
168                 (struct ceph_connection *)sk->sk_user_data;
169
170         dout("ceph_state_change %p state = %lu sk_state = %u\n",
171              con, con->state, sk->sk_state);
172
173         if (test_bit(CLOSED, &con->state))
174                 return;
175
176         switch (sk->sk_state) {
177         case TCP_CLOSE:
178                 dout("ceph_state_change TCP_CLOSE\n");
179         case TCP_CLOSE_WAIT:
180                 dout("ceph_state_change TCP_CLOSE_WAIT\n");
181                 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
182                         if (test_bit(CONNECTING, &con->state))
183                                 con->error_msg = "connection failed";
184                         else
185                                 con->error_msg = "socket closed";
186                         queue_con(con);
187                 }
188                 break;
189         case TCP_ESTABLISHED:
190                 dout("ceph_state_change TCP_ESTABLISHED\n");
191                 queue_con(con);
192                 break;
193         }
194 }
195
196 /*
197  * set up socket callbacks
198  */
199 static void set_sock_callbacks(struct socket *sock,
200                                struct ceph_connection *con)
201 {
202         struct sock *sk = sock->sk;
203         sk->sk_user_data = (void *)con;
204         sk->sk_data_ready = ceph_data_ready;
205         sk->sk_write_space = ceph_write_space;
206         sk->sk_state_change = ceph_state_change;
207 }
208
209
210 /*
211  * socket helpers
212  */
213
214 /*
215  * initiate connection to a remote socket.
216  */
217 static struct socket *ceph_tcp_connect(struct ceph_connection *con)
218 {
219         struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
220         struct socket *sock;
221         int ret;
222
223         BUG_ON(con->sock);
224         ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
225                                IPPROTO_TCP, &sock);
226         if (ret)
227                 return ERR_PTR(ret);
228         con->sock = sock;
229         sock->sk->sk_allocation = GFP_NOFS;
230
231 #ifdef CONFIG_LOCKDEP
232         lockdep_set_class(&sock->sk->sk_lock, &socket_class);
233 #endif
234
235         set_sock_callbacks(sock, con);
236
237         dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
238
239         ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
240                                  O_NONBLOCK);
241         if (ret == -EINPROGRESS) {
242                 dout("connect %s EINPROGRESS sk_state = %u\n",
243                      ceph_pr_addr(&con->peer_addr.in_addr),
244                      sock->sk->sk_state);
245                 ret = 0;
246         }
247         if (ret < 0) {
248                 pr_err("connect %s error %d\n",
249                        ceph_pr_addr(&con->peer_addr.in_addr), ret);
250                 sock_release(sock);
251                 con->sock = NULL;
252                 con->error_msg = "connect error";
253         }
254
255         if (ret < 0)
256                 return ERR_PTR(ret);
257         return sock;
258 }
259
260 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
261 {
262         struct kvec iov = {buf, len};
263         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
264         int r;
265
266         r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
267         if (r == -EAGAIN)
268                 r = 0;
269         return r;
270 }
271
272 /*
273  * write something.  @more is true if caller will be sending more data
274  * shortly.
275  */
276 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
277                      size_t kvlen, size_t len, int more)
278 {
279         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
280         int r;
281
282         if (more)
283                 msg.msg_flags |= MSG_MORE;
284         else
285                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
286
287         r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
288         if (r == -EAGAIN)
289                 r = 0;
290         return r;
291 }
292
293
294 /*
295  * Shutdown/close the socket for the given connection.
296  */
297 static int con_close_socket(struct ceph_connection *con)
298 {
299         int rc;
300
301         dout("con_close_socket on %p sock %p\n", con, con->sock);
302         if (!con->sock)
303                 return 0;
304         set_bit(SOCK_CLOSED, &con->state);
305         rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
306         sock_release(con->sock);
307         con->sock = NULL;
308         clear_bit(SOCK_CLOSED, &con->state);
309         return rc;
310 }
311
312 /*
313  * Reset a connection.  Discard all incoming and outgoing messages
314  * and clear *_seq state.
315  */
316 static void ceph_msg_remove(struct ceph_msg *msg)
317 {
318         list_del_init(&msg->list_head);
319         ceph_msg_put(msg);
320 }
321 static void ceph_msg_remove_list(struct list_head *head)
322 {
323         while (!list_empty(head)) {
324                 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
325                                                         list_head);
326                 ceph_msg_remove(msg);
327         }
328 }
329
330 static void reset_connection(struct ceph_connection *con)
331 {
332         /* reset connection, out_queue, msg_ and connect_seq */
333         /* discard existing out_queue and msg_seq */
334         ceph_msg_remove_list(&con->out_queue);
335         ceph_msg_remove_list(&con->out_sent);
336
337         if (con->in_msg) {
338                 ceph_msg_put(con->in_msg);
339                 con->in_msg = NULL;
340         }
341
342         con->connect_seq = 0;
343         con->out_seq = 0;
344         if (con->out_msg) {
345                 ceph_msg_put(con->out_msg);
346                 con->out_msg = NULL;
347         }
348         con->in_seq = 0;
349         con->in_seq_acked = 0;
350 }
351
352 /*
353  * mark a peer down.  drop any open connections.
354  */
355 void ceph_con_close(struct ceph_connection *con)
356 {
357         dout("con_close %p peer %s\n", con,
358              ceph_pr_addr(&con->peer_addr.in_addr));
359         set_bit(CLOSED, &con->state);  /* in case there's queued work */
360         clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
361         clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
362         clear_bit(KEEPALIVE_PENDING, &con->state);
363         clear_bit(WRITE_PENDING, &con->state);
364         mutex_lock(&con->mutex);
365         reset_connection(con);
366         con->peer_global_seq = 0;
367         cancel_delayed_work(&con->work);
368         mutex_unlock(&con->mutex);
369         queue_con(con);
370 }
371 EXPORT_SYMBOL(ceph_con_close);
372
373 /*
374  * Reopen a closed connection, with a new peer address.
375  */
376 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
377 {
378         dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
379         set_bit(OPENING, &con->state);
380         clear_bit(CLOSED, &con->state);
381         memcpy(&con->peer_addr, addr, sizeof(*addr));
382         con->delay = 0;      /* reset backoff memory */
383         queue_con(con);
384 }
385 EXPORT_SYMBOL(ceph_con_open);
386
387 /*
388  * return true if this connection ever successfully opened
389  */
390 bool ceph_con_opened(struct ceph_connection *con)
391 {
392         return con->connect_seq > 0;
393 }
394
395 /*
396  * generic get/put
397  */
398 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
399 {
400         dout("con_get %p nref = %d -> %d\n", con,
401              atomic_read(&con->nref), atomic_read(&con->nref) + 1);
402         if (atomic_inc_not_zero(&con->nref))
403                 return con;
404         return NULL;
405 }
406
407 void ceph_con_put(struct ceph_connection *con)
408 {
409         dout("con_put %p nref = %d -> %d\n", con,
410              atomic_read(&con->nref), atomic_read(&con->nref) - 1);
411         BUG_ON(atomic_read(&con->nref) == 0);
412         if (atomic_dec_and_test(&con->nref)) {
413                 BUG_ON(con->sock);
414                 kfree(con);
415         }
416 }
417
418 /*
419  * initialize a new connection.
420  */
421 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
422 {
423         dout("con_init %p\n", con);
424         memset(con, 0, sizeof(*con));
425         atomic_set(&con->nref, 1);
426         con->msgr = msgr;
427         mutex_init(&con->mutex);
428         INIT_LIST_HEAD(&con->out_queue);
429         INIT_LIST_HEAD(&con->out_sent);
430         INIT_DELAYED_WORK(&con->work, con_work);
431 }
432 EXPORT_SYMBOL(ceph_con_init);
433
434
435 /*
436  * We maintain a global counter to order connection attempts.  Get
437  * a unique seq greater than @gt.
438  */
439 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
440 {
441         u32 ret;
442
443         spin_lock(&msgr->global_seq_lock);
444         if (msgr->global_seq < gt)
445                 msgr->global_seq = gt;
446         ret = ++msgr->global_seq;
447         spin_unlock(&msgr->global_seq_lock);
448         return ret;
449 }
450
451
452 /*
453  * Prepare footer for currently outgoing message, and finish things
454  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
455  */
456 static void prepare_write_message_footer(struct ceph_connection *con, int v)
457 {
458         struct ceph_msg *m = con->out_msg;
459
460         dout("prepare_write_message_footer %p\n", con);
461         con->out_kvec_is_msg = true;
462         con->out_kvec[v].iov_base = &m->footer;
463         con->out_kvec[v].iov_len = sizeof(m->footer);
464         con->out_kvec_bytes += sizeof(m->footer);
465         con->out_kvec_left++;
466         con->out_more = m->more_to_follow;
467         con->out_msg_done = true;
468 }
469
470 /*
471  * Prepare headers for the next outgoing message.
472  */
473 static void prepare_write_message(struct ceph_connection *con)
474 {
475         struct ceph_msg *m;
476         int v = 0;
477
478         con->out_kvec_bytes = 0;
479         con->out_kvec_is_msg = true;
480         con->out_msg_done = false;
481
482         /* Sneak an ack in there first?  If we can get it into the same
483          * TCP packet that's a good thing. */
484         if (con->in_seq > con->in_seq_acked) {
485                 con->in_seq_acked = con->in_seq;
486                 con->out_kvec[v].iov_base = &tag_ack;
487                 con->out_kvec[v++].iov_len = 1;
488                 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
489                 con->out_kvec[v].iov_base = &con->out_temp_ack;
490                 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
491                 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
492         }
493
494         m = list_first_entry(&con->out_queue,
495                        struct ceph_msg, list_head);
496         con->out_msg = m;
497
498         /* put message on sent list */
499         ceph_msg_get(m);
500         list_move_tail(&m->list_head, &con->out_sent);
501
502         /*
503          * only assign outgoing seq # if we haven't sent this message
504          * yet.  if it is requeued, resend with it's original seq.
505          */
506         if (m->needs_out_seq) {
507                 m->hdr.seq = cpu_to_le64(++con->out_seq);
508                 m->needs_out_seq = false;
509         }
510
511         dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
512              m, con->out_seq, le16_to_cpu(m->hdr.type),
513              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
514              le32_to_cpu(m->hdr.data_len),
515              m->nr_pages);
516         BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
517
518         /* tag + hdr + front + middle */
519         con->out_kvec[v].iov_base = &tag_msg;
520         con->out_kvec[v++].iov_len = 1;
521         con->out_kvec[v].iov_base = &m->hdr;
522         con->out_kvec[v++].iov_len = sizeof(m->hdr);
523         con->out_kvec[v++] = m->front;
524         if (m->middle)
525                 con->out_kvec[v++] = m->middle->vec;
526         con->out_kvec_left = v;
527         con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
528                 (m->middle ? m->middle->vec.iov_len : 0);
529         con->out_kvec_cur = con->out_kvec;
530
531         /* fill in crc (except data pages), footer */
532         con->out_msg->hdr.crc =
533                 cpu_to_le32(crc32c(0, (void *)&m->hdr,
534                                       sizeof(m->hdr) - sizeof(m->hdr.crc)));
535         con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
536         con->out_msg->footer.front_crc =
537                 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
538         if (m->middle)
539                 con->out_msg->footer.middle_crc =
540                         cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
541                                            m->middle->vec.iov_len));
542         else
543                 con->out_msg->footer.middle_crc = 0;
544         con->out_msg->footer.data_crc = 0;
545         dout("prepare_write_message front_crc %u data_crc %u\n",
546              le32_to_cpu(con->out_msg->footer.front_crc),
547              le32_to_cpu(con->out_msg->footer.middle_crc));
548
549         /* is there a data payload? */
550         if (le32_to_cpu(m->hdr.data_len) > 0) {
551                 /* initialize page iterator */
552                 con->out_msg_pos.page = 0;
553                 if (m->pages)
554                         con->out_msg_pos.page_pos = m->page_alignment;
555                 else
556                         con->out_msg_pos.page_pos = 0;
557                 con->out_msg_pos.data_pos = 0;
558                 con->out_msg_pos.did_page_crc = 0;
559                 con->out_more = 1;  /* data + footer will follow */
560         } else {
561                 /* no, queue up footer too and be done */
562                 prepare_write_message_footer(con, v);
563         }
564
565         set_bit(WRITE_PENDING, &con->state);
566 }
567
568 /*
569  * Prepare an ack.
570  */
571 static void prepare_write_ack(struct ceph_connection *con)
572 {
573         dout("prepare_write_ack %p %llu -> %llu\n", con,
574              con->in_seq_acked, con->in_seq);
575         con->in_seq_acked = con->in_seq;
576
577         con->out_kvec[0].iov_base = &tag_ack;
578         con->out_kvec[0].iov_len = 1;
579         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
580         con->out_kvec[1].iov_base = &con->out_temp_ack;
581         con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
582         con->out_kvec_left = 2;
583         con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
584         con->out_kvec_cur = con->out_kvec;
585         con->out_more = 1;  /* more will follow.. eventually.. */
586         set_bit(WRITE_PENDING, &con->state);
587 }
588
589 /*
590  * Prepare to write keepalive byte.
591  */
592 static void prepare_write_keepalive(struct ceph_connection *con)
593 {
594         dout("prepare_write_keepalive %p\n", con);
595         con->out_kvec[0].iov_base = &tag_keepalive;
596         con->out_kvec[0].iov_len = 1;
597         con->out_kvec_left = 1;
598         con->out_kvec_bytes = 1;
599         con->out_kvec_cur = con->out_kvec;
600         set_bit(WRITE_PENDING, &con->state);
601 }
602
603 /*
604  * Connection negotiation.
605  */
606
607 static int prepare_connect_authorizer(struct ceph_connection *con)
608 {
609         void *auth_buf;
610         int auth_len = 0;
611         int auth_protocol = 0;
612
613         mutex_unlock(&con->mutex);
614         if (con->ops->get_authorizer)
615                 con->ops->get_authorizer(con, &auth_buf, &auth_len,
616                                          &auth_protocol, &con->auth_reply_buf,
617                                          &con->auth_reply_buf_len,
618                                          con->auth_retry);
619         mutex_lock(&con->mutex);
620
621         if (test_bit(CLOSED, &con->state) ||
622             test_bit(OPENING, &con->state))
623                 return -EAGAIN;
624
625         con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
626         con->out_connect.authorizer_len = cpu_to_le32(auth_len);
627
628         if (auth_len) {
629                 con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
630                 con->out_kvec[con->out_kvec_left].iov_len = auth_len;
631                 con->out_kvec_left++;
632                 con->out_kvec_bytes += auth_len;
633         }
634         return 0;
635 }
636
637 /*
638  * We connected to a peer and are saying hello.
639  */
640 static void prepare_write_banner(struct ceph_messenger *msgr,
641                                  struct ceph_connection *con)
642 {
643         int len = strlen(CEPH_BANNER);
644
645         con->out_kvec[0].iov_base = CEPH_BANNER;
646         con->out_kvec[0].iov_len = len;
647         con->out_kvec[1].iov_base = &msgr->my_enc_addr;
648         con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
649         con->out_kvec_left = 2;
650         con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
651         con->out_kvec_cur = con->out_kvec;
652         con->out_more = 0;
653         set_bit(WRITE_PENDING, &con->state);
654 }
655
656 static int prepare_write_connect(struct ceph_messenger *msgr,
657                                  struct ceph_connection *con,
658                                  int after_banner)
659 {
660         unsigned global_seq = get_global_seq(con->msgr, 0);
661         int proto;
662
663         switch (con->peer_name.type) {
664         case CEPH_ENTITY_TYPE_MON:
665                 proto = CEPH_MONC_PROTOCOL;
666                 break;
667         case CEPH_ENTITY_TYPE_OSD:
668                 proto = CEPH_OSDC_PROTOCOL;
669                 break;
670         case CEPH_ENTITY_TYPE_MDS:
671                 proto = CEPH_MDSC_PROTOCOL;
672                 break;
673         default:
674                 BUG();
675         }
676
677         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
678              con->connect_seq, global_seq, proto);
679
680         con->out_connect.features = cpu_to_le64(msgr->supported_features);
681         con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
682         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
683         con->out_connect.global_seq = cpu_to_le32(global_seq);
684         con->out_connect.protocol_version = cpu_to_le32(proto);
685         con->out_connect.flags = 0;
686
687         if (!after_banner) {
688                 con->out_kvec_left = 0;
689                 con->out_kvec_bytes = 0;
690         }
691         con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
692         con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
693         con->out_kvec_left++;
694         con->out_kvec_bytes += sizeof(con->out_connect);
695         con->out_kvec_cur = con->out_kvec;
696         con->out_more = 0;
697         set_bit(WRITE_PENDING, &con->state);
698
699         return prepare_connect_authorizer(con);
700 }
701
702
703 /*
704  * write as much of pending kvecs to the socket as we can.
705  *  1 -> done
706  *  0 -> socket full, but more to do
707  * <0 -> error
708  */
709 static int write_partial_kvec(struct ceph_connection *con)
710 {
711         int ret;
712
713         dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
714         while (con->out_kvec_bytes > 0) {
715                 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
716                                        con->out_kvec_left, con->out_kvec_bytes,
717                                        con->out_more);
718                 if (ret <= 0)
719                         goto out;
720                 con->out_kvec_bytes -= ret;
721                 if (con->out_kvec_bytes == 0)
722                         break;            /* done */
723                 while (ret > 0) {
724                         if (ret >= con->out_kvec_cur->iov_len) {
725                                 ret -= con->out_kvec_cur->iov_len;
726                                 con->out_kvec_cur++;
727                                 con->out_kvec_left--;
728                         } else {
729                                 con->out_kvec_cur->iov_len -= ret;
730                                 con->out_kvec_cur->iov_base += ret;
731                                 ret = 0;
732                                 break;
733                         }
734                 }
735         }
736         con->out_kvec_left = 0;
737         con->out_kvec_is_msg = false;
738         ret = 1;
739 out:
740         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
741              con->out_kvec_bytes, con->out_kvec_left, ret);
742         return ret;  /* done! */
743 }
744
745 #ifdef CONFIG_BLOCK
746 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
747 {
748         if (!bio) {
749                 *iter = NULL;
750                 *seg = 0;
751                 return;
752         }
753         *iter = bio;
754         *seg = bio->bi_idx;
755 }
756
757 static void iter_bio_next(struct bio **bio_iter, int *seg)
758 {
759         if (*bio_iter == NULL)
760                 return;
761
762         BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
763
764         (*seg)++;
765         if (*seg == (*bio_iter)->bi_vcnt)
766                 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
767 }
768 #endif
769
770 /*
771  * Write as much message data payload as we can.  If we finish, queue
772  * up the footer.
773  *  1 -> done, footer is now queued in out_kvec[].
774  *  0 -> socket full, but more to do
775  * <0 -> error
776  */
777 static int write_partial_msg_pages(struct ceph_connection *con)
778 {
779         struct ceph_msg *msg = con->out_msg;
780         unsigned data_len = le32_to_cpu(msg->hdr.data_len);
781         size_t len;
782         int crc = con->msgr->nocrc;
783         int ret;
784         int total_max_write;
785         int in_trail = 0;
786         size_t trail_len = (msg->trail ? msg->trail->length : 0);
787
788         dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
789              con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
790              con->out_msg_pos.page_pos);
791
792 #ifdef CONFIG_BLOCK
793         if (msg->bio && !msg->bio_iter)
794                 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
795 #endif
796
797         while (data_len > con->out_msg_pos.data_pos) {
798                 struct page *page = NULL;
799                 void *kaddr = NULL;
800                 int max_write = PAGE_SIZE;
801                 int page_shift = 0;
802
803                 total_max_write = data_len - trail_len -
804                         con->out_msg_pos.data_pos;
805
806                 /*
807                  * if we are calculating the data crc (the default), we need
808                  * to map the page.  if our pages[] has been revoked, use the
809                  * zero page.
810                  */
811
812                 /* have we reached the trail part of the data? */
813                 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
814                         in_trail = 1;
815
816                         total_max_write = data_len - con->out_msg_pos.data_pos;
817
818                         page = list_first_entry(&msg->trail->head,
819                                                 struct page, lru);
820                         if (crc)
821                                 kaddr = kmap(page);
822                         max_write = PAGE_SIZE;
823                 } else if (msg->pages) {
824                         page = msg->pages[con->out_msg_pos.page];
825                         if (crc)
826                                 kaddr = kmap(page);
827                 } else if (msg->pagelist) {
828                         page = list_first_entry(&msg->pagelist->head,
829                                                 struct page, lru);
830                         if (crc)
831                                 kaddr = kmap(page);
832 #ifdef CONFIG_BLOCK
833                 } else if (msg->bio) {
834                         struct bio_vec *bv;
835
836                         bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
837                         page = bv->bv_page;
838                         page_shift = bv->bv_offset;
839                         if (crc)
840                                 kaddr = kmap(page) + page_shift;
841                         max_write = bv->bv_len;
842 #endif
843                 } else {
844                         page = con->msgr->zero_page;
845                         if (crc)
846                                 kaddr = page_address(con->msgr->zero_page);
847                 }
848                 len = min_t(int, max_write - con->out_msg_pos.page_pos,
849                             total_max_write);
850
851                 if (crc && !con->out_msg_pos.did_page_crc) {
852                         void *base = kaddr + con->out_msg_pos.page_pos;
853                         u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
854
855                         BUG_ON(kaddr == NULL);
856                         con->out_msg->footer.data_crc =
857                                 cpu_to_le32(crc32c(tmpcrc, base, len));
858                         con->out_msg_pos.did_page_crc = 1;
859                 }
860                 ret = kernel_sendpage(con->sock, page,
861                                       con->out_msg_pos.page_pos + page_shift,
862                                       len,
863                                       MSG_DONTWAIT | MSG_NOSIGNAL |
864                                       MSG_MORE);
865
866                 if (crc &&
867                     (msg->pages || msg->pagelist || msg->bio || in_trail))
868                         kunmap(page);
869
870                 if (ret == -EAGAIN)
871                         ret = 0;
872                 if (ret <= 0)
873                         goto out;
874
875                 con->out_msg_pos.data_pos += ret;
876                 con->out_msg_pos.page_pos += ret;
877                 if (ret == len) {
878                         con->out_msg_pos.page_pos = 0;
879                         con->out_msg_pos.page++;
880                         con->out_msg_pos.did_page_crc = 0;
881                         if (in_trail)
882                                 list_move_tail(&page->lru,
883                                                &msg->trail->head);
884                         else if (msg->pagelist)
885                                 list_move_tail(&page->lru,
886                                                &msg->pagelist->head);
887 #ifdef CONFIG_BLOCK
888                         else if (msg->bio)
889                                 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
890 #endif
891                 }
892         }
893
894         dout("write_partial_msg_pages %p msg %p done\n", con, msg);
895
896         /* prepare and queue up footer, too */
897         if (!crc)
898                 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
899         con->out_kvec_bytes = 0;
900         con->out_kvec_left = 0;
901         con->out_kvec_cur = con->out_kvec;
902         prepare_write_message_footer(con, 0);
903         ret = 1;
904 out:
905         return ret;
906 }
907
908 /*
909  * write some zeros
910  */
911 static int write_partial_skip(struct ceph_connection *con)
912 {
913         int ret;
914
915         while (con->out_skip > 0) {
916                 struct kvec iov = {
917                         .iov_base = page_address(con->msgr->zero_page),
918                         .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
919                 };
920
921                 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
922                 if (ret <= 0)
923                         goto out;
924                 con->out_skip -= ret;
925         }
926         ret = 1;
927 out:
928         return ret;
929 }
930
931 /*
932  * Prepare to read connection handshake, or an ack.
933  */
934 static void prepare_read_banner(struct ceph_connection *con)
935 {
936         dout("prepare_read_banner %p\n", con);
937         con->in_base_pos = 0;
938 }
939
940 static void prepare_read_connect(struct ceph_connection *con)
941 {
942         dout("prepare_read_connect %p\n", con);
943         con->in_base_pos = 0;
944 }
945
946 static void prepare_read_ack(struct ceph_connection *con)
947 {
948         dout("prepare_read_ack %p\n", con);
949         con->in_base_pos = 0;
950 }
951
952 static void prepare_read_tag(struct ceph_connection *con)
953 {
954         dout("prepare_read_tag %p\n", con);
955         con->in_base_pos = 0;
956         con->in_tag = CEPH_MSGR_TAG_READY;
957 }
958
959 /*
960  * Prepare to read a message.
961  */
962 static int prepare_read_message(struct ceph_connection *con)
963 {
964         dout("prepare_read_message %p\n", con);
965         BUG_ON(con->in_msg != NULL);
966         con->in_base_pos = 0;
967         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
968         return 0;
969 }
970
971
972 static int read_partial(struct ceph_connection *con,
973                         int *to, int size, void *object)
974 {
975         *to += size;
976         while (con->in_base_pos < *to) {
977                 int left = *to - con->in_base_pos;
978                 int have = size - left;
979                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
980                 if (ret <= 0)
981                         return ret;
982                 con->in_base_pos += ret;
983         }
984         return 1;
985 }
986
987
988 /*
989  * Read all or part of the connect-side handshake on a new connection
990  */
991 static int read_partial_banner(struct ceph_connection *con)
992 {
993         int ret, to = 0;
994
995         dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
996
997         /* peer's banner */
998         ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
999         if (ret <= 0)
1000                 goto out;
1001         ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
1002                            &con->actual_peer_addr);
1003         if (ret <= 0)
1004                 goto out;
1005         ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
1006                            &con->peer_addr_for_me);
1007         if (ret <= 0)
1008                 goto out;
1009 out:
1010         return ret;
1011 }
1012
1013 static int read_partial_connect(struct ceph_connection *con)
1014 {
1015         int ret, to = 0;
1016
1017         dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1018
1019         ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
1020         if (ret <= 0)
1021                 goto out;
1022         ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
1023                            con->auth_reply_buf);
1024         if (ret <= 0)
1025                 goto out;
1026
1027         dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1028              con, (int)con->in_reply.tag,
1029              le32_to_cpu(con->in_reply.connect_seq),
1030              le32_to_cpu(con->in_reply.global_seq));
1031 out:
1032         return ret;
1033
1034 }
1035
1036 /*
1037  * Verify the hello banner looks okay.
1038  */
1039 static int verify_hello(struct ceph_connection *con)
1040 {
1041         if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1042                 pr_err("connect to %s got bad banner\n",
1043                        ceph_pr_addr(&con->peer_addr.in_addr));
1044                 con->error_msg = "protocol error, bad banner";
1045                 return -1;
1046         }
1047         return 0;
1048 }
1049
1050 static bool addr_is_blank(struct sockaddr_storage *ss)
1051 {
1052         switch (ss->ss_family) {
1053         case AF_INET:
1054                 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
1055         case AF_INET6:
1056                 return
1057                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
1058                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
1059                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
1060                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
1061         }
1062         return false;
1063 }
1064
1065 static int addr_port(struct sockaddr_storage *ss)
1066 {
1067         switch (ss->ss_family) {
1068         case AF_INET:
1069                 return ntohs(((struct sockaddr_in *)ss)->sin_port);
1070         case AF_INET6:
1071                 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
1072         }
1073         return 0;
1074 }
1075
1076 static void addr_set_port(struct sockaddr_storage *ss, int p)
1077 {
1078         switch (ss->ss_family) {
1079         case AF_INET:
1080                 ((struct sockaddr_in *)ss)->sin_port = htons(p);
1081                 break;
1082         case AF_INET6:
1083                 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
1084                 break;
1085         }
1086 }
1087
1088 /*
1089  * Unlike other *_pton function semantics, zero indicates success.
1090  */
1091 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1092                 char delim, const char **ipend)
1093 {
1094         struct sockaddr_in *in4 = (void *)ss;
1095         struct sockaddr_in6 *in6 = (void *)ss;
1096
1097         memset(ss, 0, sizeof(*ss));
1098
1099         if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
1100                 ss->ss_family = AF_INET;
1101                 return 0;
1102         }
1103
1104         if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
1105                 ss->ss_family = AF_INET6;
1106                 return 0;
1107         }
1108
1109         return -EINVAL;
1110 }
1111
1112 /*
1113  * Extract hostname string and resolve using kernel DNS facility.
1114  */
1115 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1116 static int ceph_dns_resolve_name(const char *name, size_t namelen,
1117                 struct sockaddr_storage *ss, char delim, const char **ipend)
1118 {
1119         const char *end, *delim_p;
1120         char *colon_p, *ip_addr = NULL;
1121         int ip_len, ret;
1122
1123         /*
1124          * The end of the hostname occurs immediately preceding the delimiter or
1125          * the port marker (':') where the delimiter takes precedence.
1126          */
1127         delim_p = memchr(name, delim, namelen);
1128         colon_p = memchr(name, ':', namelen);
1129
1130         if (delim_p && colon_p)
1131                 end = delim_p < colon_p ? delim_p : colon_p;
1132         else if (!delim_p && colon_p)
1133                 end = colon_p;
1134         else {
1135                 end = delim_p;
1136                 if (!end) /* case: hostname:/ */
1137                         end = name + namelen;
1138         }
1139
1140         if (end <= name)
1141                 return -EINVAL;
1142
1143         /* do dns_resolve upcall */
1144         ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
1145         if (ip_len > 0)
1146                 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
1147         else
1148                 ret = -ESRCH;
1149
1150         kfree(ip_addr);
1151
1152         *ipend = end;
1153
1154         pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1155                         ret, ret ? "failed" : ceph_pr_addr(ss));
1156
1157         return ret;
1158 }
1159 #else
1160 static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1161                 struct sockaddr_storage *ss, char delim, const char **ipend)
1162 {
1163         return -EINVAL;
1164 }
1165 #endif
1166
1167 /*
1168  * Parse a server name (IP or hostname). If a valid IP address is not found
1169  * then try to extract a hostname to resolve using userspace DNS upcall.
1170  */
1171 static int ceph_parse_server_name(const char *name, size_t namelen,
1172                         struct sockaddr_storage *ss, char delim, const char **ipend)
1173 {
1174         int ret;
1175
1176         ret = ceph_pton(name, namelen, ss, delim, ipend);
1177         if (ret)
1178                 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1179
1180         return ret;
1181 }
1182
1183 /*
1184  * Parse an ip[:port] list into an addr array.  Use the default
1185  * monitor port if a port isn't specified.
1186  */
1187 int ceph_parse_ips(const char *c, const char *end,
1188                    struct ceph_entity_addr *addr,
1189                    int max_count, int *count)
1190 {
1191         int i, ret = -EINVAL;
1192         const char *p = c;
1193
1194         dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1195         for (i = 0; i < max_count; i++) {
1196                 const char *ipend;
1197                 struct sockaddr_storage *ss = &addr[i].in_addr;
1198                 int port;
1199                 char delim = ',';
1200
1201                 if (*p == '[') {
1202                         delim = ']';
1203                         p++;
1204                 }
1205
1206                 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1207                 if (ret)
1208                         goto bad;
1209                 ret = -EINVAL;
1210
1211                 p = ipend;
1212
1213                 if (delim == ']') {
1214                         if (*p != ']') {
1215                                 dout("missing matching ']'\n");
1216                                 goto bad;
1217                         }
1218                         p++;
1219                 }
1220
1221                 /* port? */
1222                 if (p < end && *p == ':') {
1223                         port = 0;
1224                         p++;
1225                         while (p < end && *p >= '0' && *p <= '9') {
1226                                 port = (port * 10) + (*p - '0');
1227                                 p++;
1228                         }
1229                         if (port > 65535 || port == 0)
1230                                 goto bad;
1231                 } else {
1232                         port = CEPH_MON_PORT;
1233                 }
1234
1235                 addr_set_port(ss, port);
1236
1237                 dout("parse_ips got %s\n", ceph_pr_addr(ss));
1238
1239                 if (p == end)
1240                         break;
1241                 if (*p != ',')
1242                         goto bad;
1243                 p++;
1244         }
1245
1246         if (p != end)
1247                 goto bad;
1248
1249         if (count)
1250                 *count = i + 1;
1251         return 0;
1252
1253 bad:
1254         pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
1255         return ret;
1256 }
1257 EXPORT_SYMBOL(ceph_parse_ips);
1258
1259 static int process_banner(struct ceph_connection *con)
1260 {
1261         dout("process_banner on %p\n", con);
1262
1263         if (verify_hello(con) < 0)
1264                 return -1;
1265
1266         ceph_decode_addr(&con->actual_peer_addr);
1267         ceph_decode_addr(&con->peer_addr_for_me);
1268
1269         /*
1270          * Make sure the other end is who we wanted.  note that the other
1271          * end may not yet know their ip address, so if it's 0.0.0.0, give
1272          * them the benefit of the doubt.
1273          */
1274         if (memcmp(&con->peer_addr, &con->actual_peer_addr,
1275                    sizeof(con->peer_addr)) != 0 &&
1276             !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1277               con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1278                 pr_warning("wrong peer, want %s/%d, got %s/%d\n",
1279                            ceph_pr_addr(&con->peer_addr.in_addr),
1280                            (int)le32_to_cpu(con->peer_addr.nonce),
1281                            ceph_pr_addr(&con->actual_peer_addr.in_addr),
1282                            (int)le32_to_cpu(con->actual_peer_addr.nonce));
1283                 con->error_msg = "wrong peer at address";
1284                 return -1;
1285         }
1286
1287         /*
1288          * did we learn our address?
1289          */
1290         if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1291                 int port = addr_port(&con->msgr->inst.addr.in_addr);
1292
1293                 memcpy(&con->msgr->inst.addr.in_addr,
1294                        &con->peer_addr_for_me.in_addr,
1295                        sizeof(con->peer_addr_for_me.in_addr));
1296                 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1297                 encode_my_addr(con->msgr);
1298                 dout("process_banner learned my addr is %s\n",
1299                      ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1300         }
1301
1302         set_bit(NEGOTIATING, &con->state);
1303         prepare_read_connect(con);
1304         return 0;
1305 }
1306
1307 static void fail_protocol(struct ceph_connection *con)
1308 {
1309         reset_connection(con);
1310         set_bit(CLOSED, &con->state);  /* in case there's queued work */
1311
1312         mutex_unlock(&con->mutex);
1313         if (con->ops->bad_proto)
1314                 con->ops->bad_proto(con);
1315         mutex_lock(&con->mutex);
1316 }
1317
1318 static int process_connect(struct ceph_connection *con)
1319 {
1320         u64 sup_feat = con->msgr->supported_features;
1321         u64 req_feat = con->msgr->required_features;
1322         u64 server_feat = le64_to_cpu(con->in_reply.features);
1323         int ret;
1324
1325         dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1326
1327         switch (con->in_reply.tag) {
1328         case CEPH_MSGR_TAG_FEATURES:
1329                 pr_err("%s%lld %s feature set mismatch,"
1330                        " my %llx < server's %llx, missing %llx\n",
1331                        ENTITY_NAME(con->peer_name),
1332                        ceph_pr_addr(&con->peer_addr.in_addr),
1333                        sup_feat, server_feat, server_feat & ~sup_feat);
1334                 con->error_msg = "missing required protocol features";
1335                 fail_protocol(con);
1336                 return -1;
1337
1338         case CEPH_MSGR_TAG_BADPROTOVER:
1339                 pr_err("%s%lld %s protocol version mismatch,"
1340                        " my %d != server's %d\n",
1341                        ENTITY_NAME(con->peer_name),
1342                        ceph_pr_addr(&con->peer_addr.in_addr),
1343                        le32_to_cpu(con->out_connect.protocol_version),
1344                        le32_to_cpu(con->in_reply.protocol_version));
1345                 con->error_msg = "protocol version mismatch";
1346                 fail_protocol(con);
1347                 return -1;
1348
1349         case CEPH_MSGR_TAG_BADAUTHORIZER:
1350                 con->auth_retry++;
1351                 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
1352                      con->auth_retry);
1353                 if (con->auth_retry == 2) {
1354                         con->error_msg = "connect authorization failure";
1355                         return -1;
1356                 }
1357                 con->auth_retry = 1;
1358                 ret = prepare_write_connect(con->msgr, con, 0);
1359                 if (ret < 0)
1360                         return ret;
1361                 prepare_read_connect(con);
1362                 break;
1363
1364         case CEPH_MSGR_TAG_RESETSESSION:
1365                 /*
1366                  * If we connected with a large connect_seq but the peer
1367                  * has no record of a session with us (no connection, or
1368                  * connect_seq == 0), they will send RESETSESION to indicate
1369                  * that they must have reset their session, and may have
1370                  * dropped messages.
1371                  */
1372                 dout("process_connect got RESET peer seq %u\n",
1373                      le32_to_cpu(con->in_connect.connect_seq));
1374                 pr_err("%s%lld %s connection reset\n",
1375                        ENTITY_NAME(con->peer_name),
1376                        ceph_pr_addr(&con->peer_addr.in_addr));
1377                 reset_connection(con);
1378                 prepare_write_connect(con->msgr, con, 0);
1379                 prepare_read_connect(con);
1380
1381                 /* Tell ceph about it. */
1382                 mutex_unlock(&con->mutex);
1383                 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1384                 if (con->ops->peer_reset)
1385                         con->ops->peer_reset(con);
1386                 mutex_lock(&con->mutex);
1387                 if (test_bit(CLOSED, &con->state) ||
1388                     test_bit(OPENING, &con->state))
1389                         return -EAGAIN;
1390                 break;
1391
1392         case CEPH_MSGR_TAG_RETRY_SESSION:
1393                 /*
1394                  * If we sent a smaller connect_seq than the peer has, try
1395                  * again with a larger value.
1396                  */
1397                 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1398                      le32_to_cpu(con->out_connect.connect_seq),
1399                      le32_to_cpu(con->in_connect.connect_seq));
1400                 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1401                 prepare_write_connect(con->msgr, con, 0);
1402                 prepare_read_connect(con);
1403                 break;
1404
1405         case CEPH_MSGR_TAG_RETRY_GLOBAL:
1406                 /*
1407                  * If we sent a smaller global_seq than the peer has, try
1408                  * again with a larger value.
1409                  */
1410                 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1411                      con->peer_global_seq,
1412                      le32_to_cpu(con->in_connect.global_seq));
1413                 get_global_seq(con->msgr,
1414                                le32_to_cpu(con->in_connect.global_seq));
1415                 prepare_write_connect(con->msgr, con, 0);
1416                 prepare_read_connect(con);
1417                 break;
1418
1419         case CEPH_MSGR_TAG_READY:
1420                 if (req_feat & ~server_feat) {
1421                         pr_err("%s%lld %s protocol feature mismatch,"
1422                                " my required %llx > server's %llx, need %llx\n",
1423                                ENTITY_NAME(con->peer_name),
1424                                ceph_pr_addr(&con->peer_addr.in_addr),
1425                                req_feat, server_feat, req_feat & ~server_feat);
1426                         con->error_msg = "missing required protocol features";
1427                         fail_protocol(con);
1428                         return -1;
1429                 }
1430                 clear_bit(CONNECTING, &con->state);
1431                 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1432                 con->connect_seq++;
1433                 con->peer_features = server_feat;
1434                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1435                      con->peer_global_seq,
1436                      le32_to_cpu(con->in_reply.connect_seq),
1437                      con->connect_seq);
1438                 WARN_ON(con->connect_seq !=
1439                         le32_to_cpu(con->in_reply.connect_seq));
1440
1441                 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1442                         set_bit(LOSSYTX, &con->state);
1443
1444                 prepare_read_tag(con);
1445                 break;
1446
1447         case CEPH_MSGR_TAG_WAIT:
1448                 /*
1449                  * If there is a connection race (we are opening
1450                  * connections to each other), one of us may just have
1451                  * to WAIT.  This shouldn't happen if we are the
1452                  * client.
1453                  */
1454                 pr_err("process_connect got WAIT as client\n");
1455                 con->error_msg = "protocol error, got WAIT as client";
1456                 return -1;
1457
1458         default:
1459                 pr_err("connect protocol error, will retry\n");
1460                 con->error_msg = "protocol error, garbage tag during connect";
1461                 return -1;
1462         }
1463         return 0;
1464 }
1465
1466
1467 /*
1468  * read (part of) an ack
1469  */
1470 static int read_partial_ack(struct ceph_connection *con)
1471 {
1472         int to = 0;
1473
1474         return read_partial(con, &to, sizeof(con->in_temp_ack),
1475                             &con->in_temp_ack);
1476 }
1477
1478
1479 /*
1480  * We can finally discard anything that's been acked.
1481  */
1482 static void process_ack(struct ceph_connection *con)
1483 {
1484         struct ceph_msg *m;
1485         u64 ack = le64_to_cpu(con->in_temp_ack);
1486         u64 seq;
1487
1488         while (!list_empty(&con->out_sent)) {
1489                 m = list_first_entry(&con->out_sent, struct ceph_msg,
1490                                      list_head);
1491                 seq = le64_to_cpu(m->hdr.seq);
1492                 if (seq > ack)
1493                         break;
1494                 dout("got ack for seq %llu type %d at %p\n", seq,
1495                      le16_to_cpu(m->hdr.type), m);
1496                 m->ack_stamp = jiffies;
1497                 ceph_msg_remove(m);
1498         }
1499         prepare_read_tag(con);
1500 }
1501
1502
1503
1504
1505 static int read_partial_message_section(struct ceph_connection *con,
1506                                         struct kvec *section,
1507                                         unsigned int sec_len, u32 *crc)
1508 {
1509         int ret, left;
1510
1511         BUG_ON(!section);
1512
1513         while (section->iov_len < sec_len) {
1514                 BUG_ON(section->iov_base == NULL);
1515                 left = sec_len - section->iov_len;
1516                 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
1517                                        section->iov_len, left);
1518                 if (ret <= 0)
1519                         return ret;
1520                 section->iov_len += ret;
1521                 if (section->iov_len == sec_len)
1522                         *crc = crc32c(0, section->iov_base,
1523                                       section->iov_len);
1524         }
1525
1526         return 1;
1527 }
1528
1529 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1530                                 struct ceph_msg_header *hdr,
1531                                 int *skip);
1532
1533
1534 static int read_partial_message_pages(struct ceph_connection *con,
1535                                       struct page **pages,
1536                                       unsigned data_len, int datacrc)
1537 {
1538         void *p;
1539         int ret;
1540         int left;
1541
1542         left = min((int)(data_len - con->in_msg_pos.data_pos),
1543                    (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1544         /* (page) data */
1545         BUG_ON(pages == NULL);
1546         p = kmap(pages[con->in_msg_pos.page]);
1547         ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1548                                left);
1549         if (ret > 0 && datacrc)
1550                 con->in_data_crc =
1551                         crc32c(con->in_data_crc,
1552                                   p + con->in_msg_pos.page_pos, ret);
1553         kunmap(pages[con->in_msg_pos.page]);
1554         if (ret <= 0)
1555                 return ret;
1556         con->in_msg_pos.data_pos += ret;
1557         con->in_msg_pos.page_pos += ret;
1558         if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1559                 con->in_msg_pos.page_pos = 0;
1560                 con->in_msg_pos.page++;
1561         }
1562
1563         return ret;
1564 }
1565
1566 #ifdef CONFIG_BLOCK
1567 static int read_partial_message_bio(struct ceph_connection *con,
1568                                     struct bio **bio_iter, int *bio_seg,
1569                                     unsigned data_len, int datacrc)
1570 {
1571         struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1572         void *p;
1573         int ret, left;
1574
1575         if (IS_ERR(bv))
1576                 return PTR_ERR(bv);
1577
1578         left = min((int)(data_len - con->in_msg_pos.data_pos),
1579                    (int)(bv->bv_len - con->in_msg_pos.page_pos));
1580
1581         p = kmap(bv->bv_page) + bv->bv_offset;
1582
1583         ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1584                                left);
1585         if (ret > 0 && datacrc)
1586                 con->in_data_crc =
1587                         crc32c(con->in_data_crc,
1588                                   p + con->in_msg_pos.page_pos, ret);
1589         kunmap(bv->bv_page);
1590         if (ret <= 0)
1591                 return ret;
1592         con->in_msg_pos.data_pos += ret;
1593         con->in_msg_pos.page_pos += ret;
1594         if (con->in_msg_pos.page_pos == bv->bv_len) {
1595                 con->in_msg_pos.page_pos = 0;
1596                 iter_bio_next(bio_iter, bio_seg);
1597         }
1598
1599         return ret;
1600 }
1601 #endif
1602
1603 /*
1604  * read (part of) a message.
1605  */
1606 static int read_partial_message(struct ceph_connection *con)
1607 {
1608         struct ceph_msg *m = con->in_msg;
1609         int ret;
1610         int to, left;
1611         unsigned front_len, middle_len, data_len;
1612         int datacrc = con->msgr->nocrc;
1613         int skip;
1614         u64 seq;
1615
1616         dout("read_partial_message con %p msg %p\n", con, m);
1617
1618         /* header */
1619         while (con->in_base_pos < sizeof(con->in_hdr)) {
1620                 left = sizeof(con->in_hdr) - con->in_base_pos;
1621                 ret = ceph_tcp_recvmsg(con->sock,
1622                                        (char *)&con->in_hdr + con->in_base_pos,
1623                                        left);
1624                 if (ret <= 0)
1625                         return ret;
1626                 con->in_base_pos += ret;
1627                 if (con->in_base_pos == sizeof(con->in_hdr)) {
1628                         u32 crc = crc32c(0, (void *)&con->in_hdr,
1629                                  sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1630                         if (crc != le32_to_cpu(con->in_hdr.crc)) {
1631                                 pr_err("read_partial_message bad hdr "
1632                                        " crc %u != expected %u\n",
1633                                        crc, con->in_hdr.crc);
1634                                 return -EBADMSG;
1635                         }
1636                 }
1637         }
1638         front_len = le32_to_cpu(con->in_hdr.front_len);
1639         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1640                 return -EIO;
1641         middle_len = le32_to_cpu(con->in_hdr.middle_len);
1642         if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1643                 return -EIO;
1644         data_len = le32_to_cpu(con->in_hdr.data_len);
1645         if (data_len > CEPH_MSG_MAX_DATA_LEN)
1646                 return -EIO;
1647
1648         /* verify seq# */
1649         seq = le64_to_cpu(con->in_hdr.seq);
1650         if ((s64)seq - (s64)con->in_seq < 1) {
1651                 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1652                         ENTITY_NAME(con->peer_name),
1653                         ceph_pr_addr(&con->peer_addr.in_addr),
1654                         seq, con->in_seq + 1);
1655                 con->in_base_pos = -front_len - middle_len - data_len -
1656                         sizeof(m->footer);
1657                 con->in_tag = CEPH_MSGR_TAG_READY;
1658                 return 0;
1659         } else if ((s64)seq - (s64)con->in_seq > 1) {
1660                 pr_err("read_partial_message bad seq %lld expected %lld\n",
1661                        seq, con->in_seq + 1);
1662                 con->error_msg = "bad message sequence # for incoming message";
1663                 return -EBADMSG;
1664         }
1665
1666         /* allocate message? */
1667         if (!con->in_msg) {
1668                 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1669                      con->in_hdr.front_len, con->in_hdr.data_len);
1670                 skip = 0;
1671                 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1672                 if (skip) {
1673                         /* skip this message */
1674                         dout("alloc_msg said skip message\n");
1675                         BUG_ON(con->in_msg);
1676                         con->in_base_pos = -front_len - middle_len - data_len -
1677                                 sizeof(m->footer);
1678                         con->in_tag = CEPH_MSGR_TAG_READY;
1679                         con->in_seq++;
1680                         return 0;
1681                 }
1682                 if (!con->in_msg) {
1683                         con->error_msg =
1684                                 "error allocating memory for incoming message";
1685                         return -ENOMEM;
1686                 }
1687                 m = con->in_msg;
1688                 m->front.iov_len = 0;    /* haven't read it yet */
1689                 if (m->middle)
1690                         m->middle->vec.iov_len = 0;
1691
1692                 con->in_msg_pos.page = 0;
1693                 if (m->pages)
1694                         con->in_msg_pos.page_pos = m->page_alignment;
1695                 else
1696                         con->in_msg_pos.page_pos = 0;
1697                 con->in_msg_pos.data_pos = 0;
1698         }
1699
1700         /* front */
1701         ret = read_partial_message_section(con, &m->front, front_len,
1702                                            &con->in_front_crc);
1703         if (ret <= 0)
1704                 return ret;
1705
1706         /* middle */
1707         if (m->middle) {
1708                 ret = read_partial_message_section(con, &m->middle->vec,
1709                                                    middle_len,
1710                                                    &con->in_middle_crc);
1711                 if (ret <= 0)
1712                         return ret;
1713         }
1714 #ifdef CONFIG_BLOCK
1715         if (m->bio && !m->bio_iter)
1716                 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1717 #endif
1718
1719         /* (page) data */
1720         while (con->in_msg_pos.data_pos < data_len) {
1721                 if (m->pages) {
1722                         ret = read_partial_message_pages(con, m->pages,
1723                                                  data_len, datacrc);
1724                         if (ret <= 0)
1725                                 return ret;
1726 #ifdef CONFIG_BLOCK
1727                 } else if (m->bio) {
1728
1729                         ret = read_partial_message_bio(con,
1730                                                  &m->bio_iter, &m->bio_seg,
1731                                                  data_len, datacrc);
1732                         if (ret <= 0)
1733                                 return ret;
1734 #endif
1735                 } else {
1736                         BUG_ON(1);
1737                 }
1738         }
1739
1740         /* footer */
1741         to = sizeof(m->hdr) + sizeof(m->footer);
1742         while (con->in_base_pos < to) {
1743                 left = to - con->in_base_pos;
1744                 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1745                                        (con->in_base_pos - sizeof(m->hdr)),
1746                                        left);
1747                 if (ret <= 0)
1748                         return ret;
1749                 con->in_base_pos += ret;
1750         }
1751         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1752              m, front_len, m->footer.front_crc, middle_len,
1753              m->footer.middle_crc, data_len, m->footer.data_crc);
1754
1755         /* crc ok? */
1756         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1757                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1758                        m, con->in_front_crc, m->footer.front_crc);
1759                 return -EBADMSG;
1760         }
1761         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1762                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1763                        m, con->in_middle_crc, m->footer.middle_crc);
1764                 return -EBADMSG;
1765         }
1766         if (datacrc &&
1767             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1768             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1769                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1770                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1771                 return -EBADMSG;
1772         }
1773
1774         return 1; /* done! */
1775 }
1776
1777 /*
1778  * Process message.  This happens in the worker thread.  The callback should
1779  * be careful not to do anything that waits on other incoming messages or it
1780  * may deadlock.
1781  */
1782 static void process_message(struct ceph_connection *con)
1783 {
1784         struct ceph_msg *msg;
1785
1786         msg = con->in_msg;
1787         con->in_msg = NULL;
1788
1789         /* if first message, set peer_name */
1790         if (con->peer_name.type == 0)
1791                 con->peer_name = msg->hdr.src;
1792
1793         con->in_seq++;
1794         mutex_unlock(&con->mutex);
1795
1796         dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1797              msg, le64_to_cpu(msg->hdr.seq),
1798              ENTITY_NAME(msg->hdr.src),
1799              le16_to_cpu(msg->hdr.type),
1800              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1801              le32_to_cpu(msg->hdr.front_len),
1802              le32_to_cpu(msg->hdr.data_len),
1803              con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1804         con->ops->dispatch(con, msg);
1805
1806         mutex_lock(&con->mutex);
1807         prepare_read_tag(con);
1808 }
1809
1810
1811 /*
1812  * Write something to the socket.  Called in a worker thread when the
1813  * socket appears to be writeable and we have something ready to send.
1814  */
1815 static int try_write(struct ceph_connection *con)
1816 {
1817         struct ceph_messenger *msgr = con->msgr;
1818         int ret = 1;
1819
1820         dout("try_write start %p state %lu nref %d\n", con, con->state,
1821              atomic_read(&con->nref));
1822
1823 more:
1824         dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1825
1826         /* open the socket first? */
1827         if (con->sock == NULL) {
1828                 prepare_write_banner(msgr, con);
1829                 prepare_write_connect(msgr, con, 1);
1830                 prepare_read_banner(con);
1831                 set_bit(CONNECTING, &con->state);
1832                 clear_bit(NEGOTIATING, &con->state);
1833
1834                 BUG_ON(con->in_msg);
1835                 con->in_tag = CEPH_MSGR_TAG_READY;
1836                 dout("try_write initiating connect on %p new state %lu\n",
1837                      con, con->state);
1838                 con->sock = ceph_tcp_connect(con);
1839                 if (IS_ERR(con->sock)) {
1840                         con->sock = NULL;
1841                         con->error_msg = "connect error";
1842                         ret = -1;
1843                         goto out;
1844                 }
1845         }
1846
1847 more_kvec:
1848         /* kvec data queued? */
1849         if (con->out_skip) {
1850                 ret = write_partial_skip(con);
1851                 if (ret <= 0)
1852                         goto out;
1853         }
1854         if (con->out_kvec_left) {
1855                 ret = write_partial_kvec(con);
1856                 if (ret <= 0)
1857                         goto out;
1858         }
1859
1860         /* msg pages? */
1861         if (con->out_msg) {
1862                 if (con->out_msg_done) {
1863                         ceph_msg_put(con->out_msg);
1864                         con->out_msg = NULL;   /* we're done with this one */
1865                         goto do_next;
1866                 }
1867
1868                 ret = write_partial_msg_pages(con);
1869                 if (ret == 1)
1870                         goto more_kvec;  /* we need to send the footer, too! */
1871                 if (ret == 0)
1872                         goto out;
1873                 if (ret < 0) {
1874                         dout("try_write write_partial_msg_pages err %d\n",
1875                              ret);
1876                         goto out;
1877                 }
1878         }
1879
1880 do_next:
1881         if (!test_bit(CONNECTING, &con->state)) {
1882                 /* is anything else pending? */
1883                 if (!list_empty(&con->out_queue)) {
1884                         prepare_write_message(con);
1885                         goto more;
1886                 }
1887                 if (con->in_seq > con->in_seq_acked) {
1888                         prepare_write_ack(con);
1889                         goto more;
1890                 }
1891                 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
1892                         prepare_write_keepalive(con);
1893                         goto more;
1894                 }
1895         }
1896
1897         /* Nothing to do! */
1898         clear_bit(WRITE_PENDING, &con->state);
1899         dout("try_write nothing else to write.\n");
1900         ret = 0;
1901 out:
1902         dout("try_write done on %p ret %d\n", con, ret);
1903         return ret;
1904 }
1905
1906
1907
1908 /*
1909  * Read what we can from the socket.
1910  */
1911 static int try_read(struct ceph_connection *con)
1912 {
1913         int ret = -1;
1914
1915         if (!con->sock)
1916                 return 0;
1917
1918         if (test_bit(STANDBY, &con->state))
1919                 return 0;
1920
1921         dout("try_read start on %p\n", con);
1922
1923 more:
1924         dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1925              con->in_base_pos);
1926
1927         /*
1928          * process_connect and process_message drop and re-take
1929          * con->mutex.  make sure we handle a racing close or reopen.
1930          */
1931         if (test_bit(CLOSED, &con->state) ||
1932             test_bit(OPENING, &con->state)) {
1933                 ret = -EAGAIN;
1934                 goto out;
1935         }
1936
1937         if (test_bit(CONNECTING, &con->state)) {
1938                 if (!test_bit(NEGOTIATING, &con->state)) {
1939                         dout("try_read connecting\n");
1940                         ret = read_partial_banner(con);
1941                         if (ret <= 0)
1942                                 goto out;
1943                         ret = process_banner(con);
1944                         if (ret < 0)
1945                                 goto out;
1946                 }
1947                 ret = read_partial_connect(con);
1948                 if (ret <= 0)
1949                         goto out;
1950                 ret = process_connect(con);
1951                 if (ret < 0)
1952                         goto out;
1953                 goto more;
1954         }
1955
1956         if (con->in_base_pos < 0) {
1957                 /*
1958                  * skipping + discarding content.
1959                  *
1960                  * FIXME: there must be a better way to do this!
1961                  */
1962                 static char buf[1024];
1963                 int skip = min(1024, -con->in_base_pos);
1964                 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1965                 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1966                 if (ret <= 0)
1967                         goto out;
1968                 con->in_base_pos += ret;
1969                 if (con->in_base_pos)
1970                         goto more;
1971         }
1972         if (con->in_tag == CEPH_MSGR_TAG_READY) {
1973                 /*
1974                  * what's next?
1975                  */
1976                 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
1977                 if (ret <= 0)
1978                         goto out;
1979                 dout("try_read got tag %d\n", (int)con->in_tag);
1980                 switch (con->in_tag) {
1981                 case CEPH_MSGR_TAG_MSG:
1982                         prepare_read_message(con);
1983                         break;
1984                 case CEPH_MSGR_TAG_ACK:
1985                         prepare_read_ack(con);
1986                         break;
1987                 case CEPH_MSGR_TAG_CLOSE:
1988                         set_bit(CLOSED, &con->state);   /* fixme */
1989                         goto out;
1990                 default:
1991                         goto bad_tag;
1992                 }
1993         }
1994         if (con->in_tag == CEPH_MSGR_TAG_MSG) {
1995                 ret = read_partial_message(con);
1996                 if (ret <= 0) {
1997                         switch (ret) {
1998                         case -EBADMSG:
1999                                 con->error_msg = "bad crc";
2000                                 ret = -EIO;
2001                                 break;
2002                         case -EIO:
2003                                 con->error_msg = "io error";
2004                                 break;
2005                         }
2006                         goto out;
2007                 }
2008                 if (con->in_tag == CEPH_MSGR_TAG_READY)
2009                         goto more;
2010                 process_message(con);
2011                 goto more;
2012         }
2013         if (con->in_tag == CEPH_MSGR_TAG_ACK) {
2014                 ret = read_partial_ack(con);
2015                 if (ret <= 0)
2016                         goto out;
2017                 process_ack(con);
2018                 goto more;
2019         }
2020
2021 out:
2022         dout("try_read done on %p ret %d\n", con, ret);
2023         return ret;
2024
2025 bad_tag:
2026         pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2027         con->error_msg = "protocol error, garbage tag";
2028         ret = -1;
2029         goto out;
2030 }
2031
2032
2033 /*
2034  * Atomically queue work on a connection.  Bump @con reference to
2035  * avoid races with connection teardown.
2036  */
2037 static void queue_con(struct ceph_connection *con)
2038 {
2039         if (test_bit(DEAD, &con->state)) {
2040                 dout("queue_con %p ignoring: DEAD\n",
2041                      con);
2042                 return;
2043         }
2044
2045         if (!con->ops->get(con)) {
2046                 dout("queue_con %p ref count 0\n", con);
2047                 return;
2048         }
2049
2050         if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
2051                 dout("queue_con %p - already queued\n", con);
2052                 con->ops->put(con);
2053         } else {
2054                 dout("queue_con %p\n", con);
2055         }
2056 }
2057
2058 /*
2059  * Do some work on a connection.  Drop a connection ref when we're done.
2060  */
2061 static void con_work(struct work_struct *work)
2062 {
2063         struct ceph_connection *con = container_of(work, struct ceph_connection,
2064                                                    work.work);
2065         int ret;
2066
2067         mutex_lock(&con->mutex);
2068 restart:
2069         if (test_and_clear_bit(BACKOFF, &con->state)) {
2070                 dout("con_work %p backing off\n", con);
2071                 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2072                                        round_jiffies_relative(con->delay))) {
2073                         dout("con_work %p backoff %lu\n", con, con->delay);
2074                         mutex_unlock(&con->mutex);
2075                         return;
2076                 } else {
2077                         con->ops->put(con);
2078                         dout("con_work %p FAILED to back off %lu\n", con,
2079                              con->delay);
2080                 }
2081         }
2082
2083         if (test_bit(STANDBY, &con->state)) {
2084                 dout("con_work %p STANDBY\n", con);
2085                 goto done;
2086         }
2087         if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
2088                 dout("con_work CLOSED\n");
2089                 con_close_socket(con);
2090                 goto done;
2091         }
2092         if (test_and_clear_bit(OPENING, &con->state)) {
2093                 /* reopen w/ new peer */
2094                 dout("con_work OPENING\n");
2095                 con_close_socket(con);
2096         }
2097
2098         if (test_and_clear_bit(SOCK_CLOSED, &con->state))
2099                 goto fault;
2100
2101         ret = try_read(con);
2102         if (ret == -EAGAIN)
2103                 goto restart;
2104         if (ret < 0)
2105                 goto fault;
2106
2107         ret = try_write(con);
2108         if (ret == -EAGAIN)
2109                 goto restart;
2110         if (ret < 0)
2111                 goto fault;
2112
2113 done:
2114         mutex_unlock(&con->mutex);
2115 done_unlocked:
2116         con->ops->put(con);
2117         return;
2118
2119 fault:
2120         mutex_unlock(&con->mutex);
2121         ceph_fault(con);     /* error/fault path */
2122         goto done_unlocked;
2123 }
2124
2125
2126 /*
2127  * Generic error/fault handler.  A retry mechanism is used with
2128  * exponential backoff
2129  */
2130 static void ceph_fault(struct ceph_connection *con)
2131 {
2132         pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2133                ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2134         dout("fault %p state %lu to peer %s\n",
2135              con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2136
2137         if (test_bit(LOSSYTX, &con->state)) {
2138                 dout("fault on LOSSYTX channel\n");
2139                 goto out;
2140         }
2141
2142         mutex_lock(&con->mutex);
2143         if (test_bit(CLOSED, &con->state))
2144                 goto out_unlock;
2145
2146         con_close_socket(con);
2147
2148         if (con->in_msg) {
2149                 ceph_msg_put(con->in_msg);
2150                 con->in_msg = NULL;
2151         }
2152
2153         /* Requeue anything that hasn't been acked */
2154         list_splice_init(&con->out_sent, &con->out_queue);
2155
2156         /* If there are no messages queued or keepalive pending, place
2157          * the connection in a STANDBY state */
2158         if (list_empty(&con->out_queue) &&
2159             !test_bit(KEEPALIVE_PENDING, &con->state)) {
2160                 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2161                 clear_bit(WRITE_PENDING, &con->state);
2162                 set_bit(STANDBY, &con->state);
2163         } else {
2164                 /* retry after a delay. */
2165                 if (con->delay == 0)
2166                         con->delay = BASE_DELAY_INTERVAL;
2167                 else if (con->delay < MAX_DELAY_INTERVAL)
2168                         con->delay *= 2;
2169                 con->ops->get(con);
2170                 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2171                                        round_jiffies_relative(con->delay))) {
2172                         dout("fault queued %p delay %lu\n", con, con->delay);
2173                 } else {
2174                         con->ops->put(con);
2175                         dout("fault failed to queue %p delay %lu, backoff\n",
2176                              con, con->delay);
2177                         /*
2178                          * In many cases we see a socket state change
2179                          * while con_work is running and end up
2180                          * queuing (non-delayed) work, such that we
2181                          * can't backoff with a delay.  Set a flag so
2182                          * that when con_work restarts we schedule the
2183                          * delay then.
2184                          */
2185                         set_bit(BACKOFF, &con->state);
2186                 }
2187         }
2188
2189 out_unlock:
2190         mutex_unlock(&con->mutex);
2191 out:
2192         /*
2193          * in case we faulted due to authentication, invalidate our
2194          * current tickets so that we can get new ones.
2195          */
2196         if (con->auth_retry && con->ops->invalidate_authorizer) {
2197                 dout("calling invalidate_authorizer()\n");
2198                 con->ops->invalidate_authorizer(con);
2199         }
2200
2201         if (con->ops->fault)
2202                 con->ops->fault(con);
2203 }
2204
2205
2206
2207 /*
2208  * create a new messenger instance
2209  */
2210 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
2211                                              u32 supported_features,
2212                                              u32 required_features)
2213 {
2214         struct ceph_messenger *msgr;
2215
2216         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
2217         if (msgr == NULL)
2218                 return ERR_PTR(-ENOMEM);
2219
2220         msgr->supported_features = supported_features;
2221         msgr->required_features = required_features;
2222
2223         spin_lock_init(&msgr->global_seq_lock);
2224
2225         /* the zero page is needed if a request is "canceled" while the message
2226          * is being written over the socket */
2227         msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
2228         if (!msgr->zero_page) {
2229                 kfree(msgr);
2230                 return ERR_PTR(-ENOMEM);
2231         }
2232         kmap(msgr->zero_page);
2233
2234         if (myaddr)
2235                 msgr->inst.addr = *myaddr;
2236
2237         /* select a random nonce */
2238         msgr->inst.addr.type = 0;
2239         get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2240         encode_my_addr(msgr);
2241
2242         dout("messenger_create %p\n", msgr);
2243         return msgr;
2244 }
2245 EXPORT_SYMBOL(ceph_messenger_create);
2246
2247 void ceph_messenger_destroy(struct ceph_messenger *msgr)
2248 {
2249         dout("destroy %p\n", msgr);
2250         kunmap(msgr->zero_page);
2251         __free_page(msgr->zero_page);
2252         kfree(msgr);
2253         dout("destroyed messenger %p\n", msgr);
2254 }
2255 EXPORT_SYMBOL(ceph_messenger_destroy);
2256
2257 static void clear_standby(struct ceph_connection *con)
2258 {
2259         /* come back from STANDBY? */
2260         if (test_and_clear_bit(STANDBY, &con->state)) {
2261                 mutex_lock(&con->mutex);
2262                 dout("clear_standby %p and ++connect_seq\n", con);
2263                 con->connect_seq++;
2264                 WARN_ON(test_bit(WRITE_PENDING, &con->state));
2265                 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
2266                 mutex_unlock(&con->mutex);
2267         }
2268 }
2269
2270 /*
2271  * Queue up an outgoing message on the given connection.
2272  */
2273 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2274 {
2275         if (test_bit(CLOSED, &con->state)) {
2276                 dout("con_send %p closed, dropping %p\n", con, msg);
2277                 ceph_msg_put(msg);
2278                 return;
2279         }
2280
2281         /* set src+dst */
2282         msg->hdr.src = con->msgr->inst.name;
2283
2284         BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2285
2286         msg->needs_out_seq = true;
2287
2288         /* queue */
2289         mutex_lock(&con->mutex);
2290         BUG_ON(!list_empty(&msg->list_head));
2291         list_add_tail(&msg->list_head, &con->out_queue);
2292         dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
2293              ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
2294              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
2295              le32_to_cpu(msg->hdr.front_len),
2296              le32_to_cpu(msg->hdr.middle_len),
2297              le32_to_cpu(msg->hdr.data_len));
2298         mutex_unlock(&con->mutex);
2299
2300         /* if there wasn't anything waiting to send before, queue
2301          * new work */
2302         clear_standby(con);
2303         if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2304                 queue_con(con);
2305 }
2306 EXPORT_SYMBOL(ceph_con_send);
2307
2308 /*
2309  * Revoke a message that was previously queued for send
2310  */
2311 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2312 {
2313         mutex_lock(&con->mutex);
2314         if (!list_empty(&msg->list_head)) {
2315                 dout("con_revoke %p msg %p - was on queue\n", con, msg);
2316                 list_del_init(&msg->list_head);
2317                 ceph_msg_put(msg);
2318                 msg->hdr.seq = 0;
2319         }
2320         if (con->out_msg == msg) {
2321                 dout("con_revoke %p msg %p - was sending\n", con, msg);
2322                 con->out_msg = NULL;
2323                 if (con->out_kvec_is_msg) {
2324                         con->out_skip = con->out_kvec_bytes;
2325                         con->out_kvec_is_msg = false;
2326                 }
2327                 ceph_msg_put(msg);
2328                 msg->hdr.seq = 0;
2329         }
2330         mutex_unlock(&con->mutex);
2331 }
2332
2333 /*
2334  * Revoke a message that we may be reading data into
2335  */
2336 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2337 {
2338         mutex_lock(&con->mutex);
2339         if (con->in_msg && con->in_msg == msg) {
2340                 unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
2341                 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
2342                 unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
2343
2344                 /* skip rest of message */
2345                 dout("con_revoke_pages %p msg %p revoked\n", con, msg);
2346                         con->in_base_pos = con->in_base_pos -
2347                                 sizeof(struct ceph_msg_header) -
2348                                 front_len -
2349                                 middle_len -
2350                                 data_len -
2351                                 sizeof(struct ceph_msg_footer);
2352                 ceph_msg_put(con->in_msg);
2353                 con->in_msg = NULL;
2354                 con->in_tag = CEPH_MSGR_TAG_READY;
2355                 con->in_seq++;
2356         } else {
2357                 dout("con_revoke_pages %p msg %p pages %p no-op\n",
2358                      con, con->in_msg, msg);
2359         }
2360         mutex_unlock(&con->mutex);
2361 }
2362
2363 /*
2364  * Queue a keepalive byte to ensure the tcp connection is alive.
2365  */
2366 void ceph_con_keepalive(struct ceph_connection *con)
2367 {
2368         dout("con_keepalive %p\n", con);
2369         clear_standby(con);
2370         if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
2371             test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2372                 queue_con(con);
2373 }
2374 EXPORT_SYMBOL(ceph_con_keepalive);
2375
2376
2377 /*
2378  * construct a new message with given type, size
2379  * the new msg has a ref count of 1.
2380  */
2381 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2382                               bool can_fail)
2383 {
2384         struct ceph_msg *m;
2385
2386         m = kmalloc(sizeof(*m), flags);
2387         if (m == NULL)
2388                 goto out;
2389         kref_init(&m->kref);
2390         INIT_LIST_HEAD(&m->list_head);
2391
2392         m->hdr.tid = 0;
2393         m->hdr.type = cpu_to_le16(type);
2394         m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2395         m->hdr.version = 0;
2396         m->hdr.front_len = cpu_to_le32(front_len);
2397         m->hdr.middle_len = 0;
2398         m->hdr.data_len = 0;
2399         m->hdr.data_off = 0;
2400         m->hdr.reserved = 0;
2401         m->footer.front_crc = 0;
2402         m->footer.middle_crc = 0;
2403         m->footer.data_crc = 0;
2404         m->footer.flags = 0;
2405         m->front_max = front_len;
2406         m->front_is_vmalloc = false;
2407         m->more_to_follow = false;
2408         m->ack_stamp = 0;
2409         m->pool = NULL;
2410
2411         /* middle */
2412         m->middle = NULL;
2413
2414         /* data */
2415         m->nr_pages = 0;
2416         m->page_alignment = 0;
2417         m->pages = NULL;
2418         m->pagelist = NULL;
2419         m->bio = NULL;
2420         m->bio_iter = NULL;
2421         m->bio_seg = 0;
2422         m->trail = NULL;
2423
2424         /* front */
2425         if (front_len) {
2426                 if (front_len > PAGE_CACHE_SIZE) {
2427                         m->front.iov_base = __vmalloc(front_len, flags,
2428                                                       PAGE_KERNEL);
2429                         m->front_is_vmalloc = true;
2430                 } else {
2431                         m->front.iov_base = kmalloc(front_len, flags);
2432                 }
2433                 if (m->front.iov_base == NULL) {
2434                         dout("ceph_msg_new can't allocate %d bytes\n",
2435                              front_len);
2436                         goto out2;
2437                 }
2438         } else {
2439                 m->front.iov_base = NULL;
2440         }
2441         m->front.iov_len = front_len;
2442
2443         dout("ceph_msg_new %p front %d\n", m, front_len);
2444         return m;
2445
2446 out2:
2447         ceph_msg_put(m);
2448 out:
2449         if (!can_fail) {
2450                 pr_err("msg_new can't create type %d front %d\n", type,
2451                        front_len);
2452                 WARN_ON(1);
2453         } else {
2454                 dout("msg_new can't create type %d front %d\n", type,
2455                      front_len);
2456         }
2457         return NULL;
2458 }
2459 EXPORT_SYMBOL(ceph_msg_new);
2460
2461 /*
2462  * Allocate "middle" portion of a message, if it is needed and wasn't
2463  * allocated by alloc_msg.  This allows us to read a small fixed-size
2464  * per-type header in the front and then gracefully fail (i.e.,
2465  * propagate the error to the caller based on info in the front) when
2466  * the middle is too large.
2467  */
2468 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2469 {
2470         int type = le16_to_cpu(msg->hdr.type);
2471         int middle_len = le32_to_cpu(msg->hdr.middle_len);
2472
2473         dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
2474              ceph_msg_type_name(type), middle_len);
2475         BUG_ON(!middle_len);
2476         BUG_ON(msg->middle);
2477
2478         msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
2479         if (!msg->middle)
2480                 return -ENOMEM;
2481         return 0;
2482 }
2483
2484 /*
2485  * Generic message allocator, for incoming messages.
2486  */
2487 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2488                                 struct ceph_msg_header *hdr,
2489                                 int *skip)
2490 {
2491         int type = le16_to_cpu(hdr->type);
2492         int front_len = le32_to_cpu(hdr->front_len);
2493         int middle_len = le32_to_cpu(hdr->middle_len);
2494         struct ceph_msg *msg = NULL;
2495         int ret;
2496
2497         if (con->ops->alloc_msg) {
2498                 mutex_unlock(&con->mutex);
2499                 msg = con->ops->alloc_msg(con, hdr, skip);
2500                 mutex_lock(&con->mutex);
2501                 if (!msg || *skip)
2502                         return NULL;
2503         }
2504         if (!msg) {
2505                 *skip = 0;
2506                 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2507                 if (!msg) {
2508                         pr_err("unable to allocate msg type %d len %d\n",
2509                                type, front_len);
2510                         return NULL;
2511                 }
2512                 msg->page_alignment = le16_to_cpu(hdr->data_off);
2513         }
2514         memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2515
2516         if (middle_len && !msg->middle) {
2517                 ret = ceph_alloc_middle(con, msg);
2518                 if (ret < 0) {
2519                         ceph_msg_put(msg);
2520                         return NULL;
2521                 }
2522         }
2523
2524         return msg;
2525 }
2526
2527
2528 /*
2529  * Free a generically kmalloc'd message.
2530  */
2531 void ceph_msg_kfree(struct ceph_msg *m)
2532 {
2533         dout("msg_kfree %p\n", m);
2534         if (m->front_is_vmalloc)
2535                 vfree(m->front.iov_base);
2536         else
2537                 kfree(m->front.iov_base);
2538         kfree(m);
2539 }
2540
2541 /*
2542  * Drop a msg ref.  Destroy as needed.
2543  */
2544 void ceph_msg_last_put(struct kref *kref)
2545 {
2546         struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
2547
2548         dout("ceph_msg_put last one on %p\n", m);
2549         WARN_ON(!list_empty(&m->list_head));
2550
2551         /* drop middle, data, if any */
2552         if (m->middle) {
2553                 ceph_buffer_put(m->middle);
2554                 m->middle = NULL;
2555         }
2556         m->nr_pages = 0;
2557         m->pages = NULL;
2558
2559         if (m->pagelist) {
2560                 ceph_pagelist_release(m->pagelist);
2561                 kfree(m->pagelist);
2562                 m->pagelist = NULL;
2563         }
2564
2565         m->trail = NULL;
2566
2567         if (m->pool)
2568                 ceph_msgpool_put(m->pool, m);
2569         else
2570                 ceph_msg_kfree(m);
2571 }
2572 EXPORT_SYMBOL(ceph_msg_last_put);
2573
2574 void ceph_msg_dump(struct ceph_msg *msg)
2575 {
2576         pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
2577                  msg->front_max, msg->nr_pages);
2578         print_hex_dump(KERN_DEBUG, "header: ",
2579                        DUMP_PREFIX_OFFSET, 16, 1,
2580                        &msg->hdr, sizeof(msg->hdr), true);
2581         print_hex_dump(KERN_DEBUG, " front: ",
2582                        DUMP_PREFIX_OFFSET, 16, 1,
2583                        msg->front.iov_base, msg->front.iov_len, true);
2584         if (msg->middle)
2585                 print_hex_dump(KERN_DEBUG, "middle: ",
2586                                DUMP_PREFIX_OFFSET, 16, 1,
2587                                msg->middle->vec.iov_base,
2588                                msg->middle->vec.iov_len, true);
2589         print_hex_dump(KERN_DEBUG, "footer: ",
2590                        DUMP_PREFIX_OFFSET, 16, 1,
2591                        &msg->footer, sizeof(msg->footer), true);
2592 }
2593 EXPORT_SYMBOL(ceph_msg_dump);