RDS: Use page_remainder_alloc() for recv bufs
[linux-2.6.git] / net / rds / ib_recv.c
1 /*
2  * Copyright (c) 2006 Oracle.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/slab.h>
35 #include <linux/pci.h>
36 #include <linux/dma-mapping.h>
37 #include <rdma/rdma_cm.h>
38
39 #include "rds.h"
40 #include "ib.h"
41
42 static struct kmem_cache *rds_ib_incoming_slab;
43 static struct kmem_cache *rds_ib_frag_slab;
44 static atomic_t rds_ib_allocation = ATOMIC_INIT(0);
45
46 /* Free frag and attached recv buffer f_sg */
47 static void rds_ib_frag_free(struct rds_page_frag *frag)
48 {
49         rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
50         __free_page(sg_page(&frag->f_sg));
51         kmem_cache_free(rds_ib_frag_slab, frag);
52 }
53
54 /*
55  * We map a page at a time.  Its fragments are posted in order.  This
56  * is called in fragment order as the fragments get send completion events.
57  * Only the last frag in the page performs the unmapping.
58  *
59  * It's OK for ring cleanup to call this in whatever order it likes because
60  * DMA is not in flight and so we can unmap while other ring entries still
61  * hold page references in their frags.
62  */
63 static void rds_ib_recv_unmap_page(struct rds_ib_connection *ic,
64                                    struct rds_ib_recv_work *recv)
65 {
66         struct rds_page_frag *frag = recv->r_frag;
67
68         rdsdebug("recv %p frag %p page %p\n", recv, frag, sg_page(&frag->f_sg));
69         ib_dma_unmap_sg(ic->i_cm_id->device, &frag->f_sg, 1, DMA_FROM_DEVICE);
70 }
71
72 void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
73 {
74         struct rds_ib_recv_work *recv;
75         u32 i;
76
77         for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
78                 struct ib_sge *sge;
79
80                 recv->r_ibinc = NULL;
81                 recv->r_frag = NULL;
82
83                 recv->r_wr.next = NULL;
84                 recv->r_wr.wr_id = i;
85                 recv->r_wr.sg_list = recv->r_sge;
86                 recv->r_wr.num_sge = RDS_IB_RECV_SGE;
87
88                 sge = &recv->r_sge[0];
89                 sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
90                 sge->length = sizeof(struct rds_header);
91                 sge->lkey = ic->i_mr->lkey;
92
93                 sge = &recv->r_sge[1];
94                 sge->addr = 0;
95                 sge->length = RDS_FRAG_SIZE;
96                 sge->lkey = ic->i_mr->lkey;
97         }
98 }
99
100 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
101                                   struct rds_ib_recv_work *recv)
102 {
103         if (recv->r_ibinc) {
104                 rds_inc_put(&recv->r_ibinc->ii_inc);
105                 recv->r_ibinc = NULL;
106         }
107         if (recv->r_frag) {
108                 rds_ib_recv_unmap_page(ic, recv);
109                 rds_ib_frag_free(recv->r_frag);
110                 recv->r_frag = NULL;
111         }
112 }
113
114 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic)
115 {
116         u32 i;
117
118         for (i = 0; i < ic->i_recv_ring.w_nr; i++)
119                 rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
120 }
121
122 static int rds_ib_recv_refill_one(struct rds_connection *conn,
123                                   struct rds_ib_recv_work *recv)
124 {
125         struct rds_ib_connection *ic = conn->c_transport_data;
126         struct ib_sge *sge;
127         int ret = -ENOMEM;
128
129         if (!recv->r_ibinc) {
130                 if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) {
131                         rds_ib_stats_inc(s_ib_rx_alloc_limit);
132                         goto out;
133                 }
134                 recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
135                 if (!recv->r_ibinc) {
136                         atomic_dec(&rds_ib_allocation);
137                         goto out;
138                 }
139                 INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
140                 rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
141         }
142
143         if (!recv->r_frag) {
144                 recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
145                 if (!recv->r_frag)
146                         goto out;
147                 INIT_LIST_HEAD(&recv->r_frag->f_item);
148                 sg_init_table(&recv->r_frag->f_sg, 1);
149                 ret = rds_page_remainder_alloc(&recv->r_frag->f_sg,
150                                                RDS_FRAG_SIZE, GFP_NOWAIT);
151                 if (ret) {
152                         kmem_cache_free(rds_ib_frag_slab, recv->r_frag);
153                         recv->r_frag = NULL;
154                         goto out;
155                 }
156         }
157
158         ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
159                             1, DMA_FROM_DEVICE);
160         WARN_ON(ret != 1);
161
162         sge = &recv->r_sge[0];
163         sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
164         sge->length = sizeof(struct rds_header);
165
166         sge = &recv->r_sge[1];
167         sge->addr = sg_dma_address(&recv->r_frag->f_sg);
168         sge->length = sg_dma_len(&recv->r_frag->f_sg);
169
170         ret = 0;
171 out:
172         return ret;
173 }
174
175 /*
176  * This tries to allocate and post unused work requests after making sure that
177  * they have all the allocations they need to queue received fragments into
178  * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
179  * pairs don't go unmatched.
180  *
181  * -1 is returned if posting fails due to temporary resource exhaustion.
182  */
183 int rds_ib_recv_refill(struct rds_connection *conn, int prefill)
184 {
185         struct rds_ib_connection *ic = conn->c_transport_data;
186         struct rds_ib_recv_work *recv;
187         struct ib_recv_wr *failed_wr;
188         unsigned int posted = 0;
189         int ret = 0;
190         u32 pos;
191
192         while ((prefill || rds_conn_up(conn)) &&
193                rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
194                 if (pos >= ic->i_recv_ring.w_nr) {
195                         printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
196                                         pos);
197                         ret = -EINVAL;
198                         break;
199                 }
200
201                 recv = &ic->i_recvs[pos];
202                 ret = rds_ib_recv_refill_one(conn, recv);
203                 if (ret) {
204                         ret = -1;
205                         break;
206                 }
207
208                 /* XXX when can this fail? */
209                 ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
210                 rdsdebug("recv %p ibinc %p page %p addr %lu ret %d\n", recv,
211                          recv->r_ibinc, sg_page(&recv->r_frag->f_sg),
212                          (long) sg_dma_address(&recv->r_frag->f_sg), ret);
213                 if (ret) {
214                         rds_ib_conn_error(conn, "recv post on "
215                                "%pI4 returned %d, disconnecting and "
216                                "reconnecting\n", &conn->c_faddr,
217                                ret);
218                         ret = -1;
219                         break;
220                 }
221
222                 posted++;
223         }
224
225         /* We're doing flow control - update the window. */
226         if (ic->i_flowctl && posted)
227                 rds_ib_advertise_credits(conn, posted);
228
229         if (ret)
230                 rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
231         return ret;
232 }
233
234 static void rds_ib_inc_purge(struct rds_incoming *inc)
235 {
236         struct rds_ib_incoming *ibinc;
237         struct rds_page_frag *frag;
238         struct rds_page_frag *pos;
239
240         ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
241         rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
242
243         list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
244                 list_del_init(&frag->f_item);
245                 rds_ib_frag_free(frag);
246         }
247 }
248
249 void rds_ib_inc_free(struct rds_incoming *inc)
250 {
251         struct rds_ib_incoming *ibinc;
252
253         ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
254
255         rds_ib_inc_purge(inc);
256         rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
257         BUG_ON(!list_empty(&ibinc->ii_frags));
258         kmem_cache_free(rds_ib_incoming_slab, ibinc);
259         atomic_dec(&rds_ib_allocation);
260         BUG_ON(atomic_read(&rds_ib_allocation) < 0);
261 }
262
263 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
264                             size_t size)
265 {
266         struct rds_ib_incoming *ibinc;
267         struct rds_page_frag *frag;
268         struct iovec *iov = first_iov;
269         unsigned long to_copy;
270         unsigned long frag_off = 0;
271         unsigned long iov_off = 0;
272         int copied = 0;
273         int ret;
274         u32 len;
275
276         ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
277         frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
278         len = be32_to_cpu(inc->i_hdr.h_len);
279
280         while (copied < size && copied < len) {
281                 if (frag_off == RDS_FRAG_SIZE) {
282                         frag = list_entry(frag->f_item.next,
283                                           struct rds_page_frag, f_item);
284                         frag_off = 0;
285                 }
286                 while (iov_off == iov->iov_len) {
287                         iov_off = 0;
288                         iov++;
289                 }
290
291                 to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
292                 to_copy = min_t(size_t, to_copy, size - copied);
293                 to_copy = min_t(unsigned long, to_copy, len - copied);
294
295                 rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
296                          "[%p, %u] + %lu\n",
297                          to_copy, iov->iov_base, iov->iov_len, iov_off,
298                          sg_page(&frag->f_sg), frag->f_sg.offset, frag_off);
299
300                 /* XXX needs + offset for multiple recvs per page */
301                 ret = rds_page_copy_to_user(sg_page(&frag->f_sg),
302                                             frag->f_sg.offset + frag_off,
303                                             iov->iov_base + iov_off,
304                                             to_copy);
305                 if (ret) {
306                         copied = ret;
307                         break;
308                 }
309
310                 iov_off += to_copy;
311                 frag_off += to_copy;
312                 copied += to_copy;
313         }
314
315         return copied;
316 }
317
318 /* ic starts out kzalloc()ed */
319 void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
320 {
321         struct ib_send_wr *wr = &ic->i_ack_wr;
322         struct ib_sge *sge = &ic->i_ack_sge;
323
324         sge->addr = ic->i_ack_dma;
325         sge->length = sizeof(struct rds_header);
326         sge->lkey = ic->i_mr->lkey;
327
328         wr->sg_list = sge;
329         wr->num_sge = 1;
330         wr->opcode = IB_WR_SEND;
331         wr->wr_id = RDS_IB_ACK_WR_ID;
332         wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
333 }
334
335 /*
336  * You'd think that with reliable IB connections you wouldn't need to ack
337  * messages that have been received.  The problem is that IB hardware generates
338  * an ack message before it has DMAed the message into memory.  This creates a
339  * potential message loss if the HCA is disabled for any reason between when it
340  * sends the ack and before the message is DMAed and processed.  This is only a
341  * potential issue if another HCA is available for fail-over.
342  *
343  * When the remote host receives our ack they'll free the sent message from
344  * their send queue.  To decrease the latency of this we always send an ack
345  * immediately after we've received messages.
346  *
347  * For simplicity, we only have one ack in flight at a time.  This puts
348  * pressure on senders to have deep enough send queues to absorb the latency of
349  * a single ack frame being in flight.  This might not be good enough.
350  *
351  * This is implemented by have a long-lived send_wr and sge which point to a
352  * statically allocated ack frame.  This ack wr does not fall under the ring
353  * accounting that the tx and rx wrs do.  The QP attribute specifically makes
354  * room for it beyond the ring size.  Send completion notices its special
355  * wr_id and avoids working with the ring in that case.
356  */
357 #ifndef KERNEL_HAS_ATOMIC64
358 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
359                                 int ack_required)
360 {
361         unsigned long flags;
362
363         spin_lock_irqsave(&ic->i_ack_lock, flags);
364         ic->i_ack_next = seq;
365         if (ack_required)
366                 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
367         spin_unlock_irqrestore(&ic->i_ack_lock, flags);
368 }
369
370 static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
371 {
372         unsigned long flags;
373         u64 seq;
374
375         clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
376
377         spin_lock_irqsave(&ic->i_ack_lock, flags);
378         seq = ic->i_ack_next;
379         spin_unlock_irqrestore(&ic->i_ack_lock, flags);
380
381         return seq;
382 }
383 #else
384 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
385                                 int ack_required)
386 {
387         atomic64_set(&ic->i_ack_next, seq);
388         if (ack_required) {
389                 smp_mb__before_clear_bit();
390                 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
391         }
392 }
393
394 static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
395 {
396         clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
397         smp_mb__after_clear_bit();
398
399         return atomic64_read(&ic->i_ack_next);
400 }
401 #endif
402
403
404 static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
405 {
406         struct rds_header *hdr = ic->i_ack;
407         struct ib_send_wr *failed_wr;
408         u64 seq;
409         int ret;
410
411         seq = rds_ib_get_ack(ic);
412
413         rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
414         rds_message_populate_header(hdr, 0, 0, 0);
415         hdr->h_ack = cpu_to_be64(seq);
416         hdr->h_credit = adv_credits;
417         rds_message_make_checksum(hdr);
418         ic->i_ack_queued = jiffies;
419
420         ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
421         if (unlikely(ret)) {
422                 /* Failed to send. Release the WR, and
423                  * force another ACK.
424                  */
425                 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
426                 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
427
428                 rds_ib_stats_inc(s_ib_ack_send_failure);
429
430                 rds_ib_conn_error(ic->conn, "sending ack failed\n");
431         } else
432                 rds_ib_stats_inc(s_ib_ack_sent);
433 }
434
435 /*
436  * There are 3 ways of getting acknowledgements to the peer:
437  *  1.  We call rds_ib_attempt_ack from the recv completion handler
438  *      to send an ACK-only frame.
439  *      However, there can be only one such frame in the send queue
440  *      at any time, so we may have to postpone it.
441  *  2.  When another (data) packet is transmitted while there's
442  *      an ACK in the queue, we piggyback the ACK sequence number
443  *      on the data packet.
444  *  3.  If the ACK WR is done sending, we get called from the
445  *      send queue completion handler, and check whether there's
446  *      another ACK pending (postponed because the WR was on the
447  *      queue). If so, we transmit it.
448  *
449  * We maintain 2 variables:
450  *  -   i_ack_flags, which keeps track of whether the ACK WR
451  *      is currently in the send queue or not (IB_ACK_IN_FLIGHT)
452  *  -   i_ack_next, which is the last sequence number we received
453  *
454  * Potentially, send queue and receive queue handlers can run concurrently.
455  * It would be nice to not have to use a spinlock to synchronize things,
456  * but the one problem that rules this out is that 64bit updates are
457  * not atomic on all platforms. Things would be a lot simpler if
458  * we had atomic64 or maybe cmpxchg64 everywhere.
459  *
460  * Reconnecting complicates this picture just slightly. When we
461  * reconnect, we may be seeing duplicate packets. The peer
462  * is retransmitting them, because it hasn't seen an ACK for
463  * them. It is important that we ACK these.
464  *
465  * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
466  * this flag set *MUST* be acknowledged immediately.
467  */
468
469 /*
470  * When we get here, we're called from the recv queue handler.
471  * Check whether we ought to transmit an ACK.
472  */
473 void rds_ib_attempt_ack(struct rds_ib_connection *ic)
474 {
475         unsigned int adv_credits;
476
477         if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
478                 return;
479
480         if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
481                 rds_ib_stats_inc(s_ib_ack_send_delayed);
482                 return;
483         }
484
485         /* Can we get a send credit? */
486         if (!rds_ib_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
487                 rds_ib_stats_inc(s_ib_tx_throttle);
488                 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
489                 return;
490         }
491
492         clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
493         rds_ib_send_ack(ic, adv_credits);
494 }
495
496 /*
497  * We get here from the send completion handler, when the
498  * adapter tells us the ACK frame was sent.
499  */
500 void rds_ib_ack_send_complete(struct rds_ib_connection *ic)
501 {
502         clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
503         rds_ib_attempt_ack(ic);
504 }
505
506 /*
507  * This is called by the regular xmit code when it wants to piggyback
508  * an ACK on an outgoing frame.
509  */
510 u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic)
511 {
512         if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
513                 rds_ib_stats_inc(s_ib_ack_send_piggybacked);
514         return rds_ib_get_ack(ic);
515 }
516
517 /*
518  * It's kind of lame that we're copying from the posted receive pages into
519  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
520  * them.  But receiving new congestion bitmaps should be a *rare* event, so
521  * hopefully we won't need to invest that complexity in making it more
522  * efficient.  By copying we can share a simpler core with TCP which has to
523  * copy.
524  */
525 static void rds_ib_cong_recv(struct rds_connection *conn,
526                               struct rds_ib_incoming *ibinc)
527 {
528         struct rds_cong_map *map;
529         unsigned int map_off;
530         unsigned int map_page;
531         struct rds_page_frag *frag;
532         unsigned long frag_off;
533         unsigned long to_copy;
534         unsigned long copied;
535         uint64_t uncongested = 0;
536         void *addr;
537
538         /* catch completely corrupt packets */
539         if (be32_to_cpu(ibinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
540                 return;
541
542         map = conn->c_fcong;
543         map_page = 0;
544         map_off = 0;
545
546         frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
547         frag_off = 0;
548
549         copied = 0;
550
551         while (copied < RDS_CONG_MAP_BYTES) {
552                 uint64_t *src, *dst;
553                 unsigned int k;
554
555                 to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
556                 BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
557
558                 addr = kmap_atomic(sg_page(&frag->f_sg), KM_SOFTIRQ0);
559
560                 src = addr + frag_off;
561                 dst = (void *)map->m_page_addrs[map_page] + map_off;
562                 for (k = 0; k < to_copy; k += 8) {
563                         /* Record ports that became uncongested, ie
564                          * bits that changed from 0 to 1. */
565                         uncongested |= ~(*src) & *dst;
566                         *dst++ = *src++;
567                 }
568                 kunmap_atomic(addr, KM_SOFTIRQ0);
569
570                 copied += to_copy;
571
572                 map_off += to_copy;
573                 if (map_off == PAGE_SIZE) {
574                         map_off = 0;
575                         map_page++;
576                 }
577
578                 frag_off += to_copy;
579                 if (frag_off == RDS_FRAG_SIZE) {
580                         frag = list_entry(frag->f_item.next,
581                                           struct rds_page_frag, f_item);
582                         frag_off = 0;
583                 }
584         }
585
586         /* the congestion map is in little endian order */
587         uncongested = le64_to_cpu(uncongested);
588
589         rds_cong_map_updated(map, uncongested);
590 }
591
592 /*
593  * Rings are posted with all the allocations they'll need to queue the
594  * incoming message to the receiving socket so this can't fail.
595  * All fragments start with a header, so we can make sure we're not receiving
596  * garbage, and we can tell a small 8 byte fragment from an ACK frame.
597  */
598 struct rds_ib_ack_state {
599         u64             ack_next;
600         u64             ack_recv;
601         unsigned int    ack_required:1;
602         unsigned int    ack_next_valid:1;
603         unsigned int    ack_recv_valid:1;
604 };
605
606 static void rds_ib_process_recv(struct rds_connection *conn,
607                                 struct rds_ib_recv_work *recv, u32 data_len,
608                                 struct rds_ib_ack_state *state)
609 {
610         struct rds_ib_connection *ic = conn->c_transport_data;
611         struct rds_ib_incoming *ibinc = ic->i_ibinc;
612         struct rds_header *ihdr, *hdr;
613
614         /* XXX shut down the connection if port 0,0 are seen? */
615
616         rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
617                  data_len);
618
619         if (data_len < sizeof(struct rds_header)) {
620                 rds_ib_conn_error(conn, "incoming message "
621                        "from %pI4 didn't inclue a "
622                        "header, disconnecting and "
623                        "reconnecting\n",
624                        &conn->c_faddr);
625                 return;
626         }
627         data_len -= sizeof(struct rds_header);
628
629         ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
630
631         /* Validate the checksum. */
632         if (!rds_message_verify_checksum(ihdr)) {
633                 rds_ib_conn_error(conn, "incoming message "
634                        "from %pI4 has corrupted header - "
635                        "forcing a reconnect\n",
636                        &conn->c_faddr);
637                 rds_stats_inc(s_recv_drop_bad_checksum);
638                 return;
639         }
640
641         /* Process the ACK sequence which comes with every packet */
642         state->ack_recv = be64_to_cpu(ihdr->h_ack);
643         state->ack_recv_valid = 1;
644
645         /* Process the credits update if there was one */
646         if (ihdr->h_credit)
647                 rds_ib_send_add_credits(conn, ihdr->h_credit);
648
649         if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
650                 /* This is an ACK-only packet. The fact that it gets
651                  * special treatment here is that historically, ACKs
652                  * were rather special beasts.
653                  */
654                 rds_ib_stats_inc(s_ib_ack_received);
655
656                 /*
657                  * Usually the frags make their way on to incs and are then freed as
658                  * the inc is freed.  We don't go that route, so we have to drop the
659                  * page ref ourselves.  We can't just leave the page on the recv
660                  * because that confuses the dma mapping of pages and each recv's use
661                  * of a partial page.
662                  *
663                  * FIXME: Fold this into the code path below.
664                  */
665                 rds_ib_frag_free(recv->r_frag);
666                 recv->r_frag = NULL;
667                 return;
668         }
669
670         /*
671          * If we don't already have an inc on the connection then this
672          * fragment has a header and starts a message.. copy its header
673          * into the inc and save the inc so we can hang upcoming fragments
674          * off its list.
675          */
676         if (!ibinc) {
677                 ibinc = recv->r_ibinc;
678                 recv->r_ibinc = NULL;
679                 ic->i_ibinc = ibinc;
680
681                 hdr = &ibinc->ii_inc.i_hdr;
682                 memcpy(hdr, ihdr, sizeof(*hdr));
683                 ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
684
685                 rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc,
686                          ic->i_recv_data_rem, hdr->h_flags);
687         } else {
688                 hdr = &ibinc->ii_inc.i_hdr;
689                 /* We can't just use memcmp here; fragments of a
690                  * single message may carry different ACKs */
691                 if (hdr->h_sequence != ihdr->h_sequence ||
692                     hdr->h_len != ihdr->h_len ||
693                     hdr->h_sport != ihdr->h_sport ||
694                     hdr->h_dport != ihdr->h_dport) {
695                         rds_ib_conn_error(conn,
696                                 "fragment header mismatch; forcing reconnect\n");
697                         return;
698                 }
699         }
700
701         list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags);
702         recv->r_frag = NULL;
703
704         if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
705                 ic->i_recv_data_rem -= RDS_FRAG_SIZE;
706         else {
707                 ic->i_recv_data_rem = 0;
708                 ic->i_ibinc = NULL;
709
710                 if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
711                         rds_ib_cong_recv(conn, ibinc);
712                 else {
713                         rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
714                                           &ibinc->ii_inc, GFP_ATOMIC,
715                                           KM_SOFTIRQ0);
716                         state->ack_next = be64_to_cpu(hdr->h_sequence);
717                         state->ack_next_valid = 1;
718                 }
719
720                 /* Evaluate the ACK_REQUIRED flag *after* we received
721                  * the complete frame, and after bumping the next_rx
722                  * sequence. */
723                 if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
724                         rds_stats_inc(s_recv_ack_required);
725                         state->ack_required = 1;
726                 }
727
728                 rds_inc_put(&ibinc->ii_inc);
729         }
730 }
731
732 /*
733  * Plucking the oldest entry from the ring can be done concurrently with
734  * the thread refilling the ring.  Each ring operation is protected by
735  * spinlocks and the transient state of refilling doesn't change the
736  * recording of which entry is oldest.
737  *
738  * This relies on IB only calling one cq comp_handler for each cq so that
739  * there will only be one caller of rds_recv_incoming() per RDS connection.
740  */
741 void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
742 {
743         struct rds_connection *conn = context;
744         struct rds_ib_connection *ic = conn->c_transport_data;
745
746         rdsdebug("conn %p cq %p\n", conn, cq);
747
748         rds_ib_stats_inc(s_ib_rx_cq_call);
749
750         tasklet_schedule(&ic->i_recv_tasklet);
751 }
752
753 static inline void rds_poll_cq(struct rds_ib_connection *ic,
754                                struct rds_ib_ack_state *state)
755 {
756         struct rds_connection *conn = ic->conn;
757         struct ib_wc wc;
758         struct rds_ib_recv_work *recv;
759
760         while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
761                 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
762                          (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
763                          be32_to_cpu(wc.ex.imm_data));
764                 rds_ib_stats_inc(s_ib_rx_cq_event);
765
766                 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
767
768                 rds_ib_recv_unmap_page(ic, recv);
769
770                 /*
771                  * Also process recvs in connecting state because it is possible
772                  * to get a recv completion _before_ the rdmacm ESTABLISHED
773                  * event is processed.
774                  */
775                 if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
776                         /* We expect errors as the qp is drained during shutdown */
777                         if (wc.status == IB_WC_SUCCESS) {
778                                 rds_ib_process_recv(conn, recv, wc.byte_len, state);
779                         } else {
780                                 rds_ib_conn_error(conn, "recv completion on "
781                                        "%pI4 had status %u, disconnecting and "
782                                        "reconnecting\n", &conn->c_faddr,
783                                        wc.status);
784                         }
785                 }
786
787                 rds_ib_ring_free(&ic->i_recv_ring, 1);
788         }
789 }
790
791 void rds_ib_recv_tasklet_fn(unsigned long data)
792 {
793         struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
794         struct rds_connection *conn = ic->conn;
795         struct rds_ib_ack_state state = { 0, };
796
797         rds_poll_cq(ic, &state);
798         ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
799         rds_poll_cq(ic, &state);
800
801         if (state.ack_next_valid)
802                 rds_ib_set_ack(ic, state.ack_next, state.ack_required);
803         if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
804                 rds_send_drop_acked(conn, state.ack_recv, NULL);
805                 ic->i_ack_recv = state.ack_recv;
806         }
807         if (rds_conn_up(conn))
808                 rds_ib_attempt_ack(ic);
809
810         /* If we ever end up with a really empty receive ring, we're
811          * in deep trouble, as the sender will definitely see RNR
812          * timeouts. */
813         if (rds_ib_ring_empty(&ic->i_recv_ring))
814                 rds_ib_stats_inc(s_ib_rx_ring_empty);
815
816         if (rds_ib_ring_low(&ic->i_recv_ring))
817                 rds_ib_recv_refill(conn, 0);
818 }
819
820 int rds_ib_recv(struct rds_connection *conn)
821 {
822         struct rds_ib_connection *ic = conn->c_transport_data;
823         int ret = 0;
824
825         rdsdebug("conn %p\n", conn);
826         if (rds_conn_up(conn))
827                 rds_ib_attempt_ack(ic);
828
829         return ret;
830 }
831
832 int __init rds_ib_recv_init(void)
833 {
834         struct sysinfo si;
835         int ret = -ENOMEM;
836
837         /* Default to 30% of all available RAM for recv memory */
838         si_meminfo(&si);
839         rds_ib_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
840
841         rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",
842                                         sizeof(struct rds_ib_incoming),
843                                         0, 0, NULL);
844         if (!rds_ib_incoming_slab)
845                 goto out;
846
847         rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
848                                         sizeof(struct rds_page_frag),
849                                         0, 0, NULL);
850         if (!rds_ib_frag_slab)
851                 kmem_cache_destroy(rds_ib_incoming_slab);
852         else
853                 ret = 0;
854 out:
855         return ret;
856 }
857
858 void rds_ib_recv_exit(void)
859 {
860         kmem_cache_destroy(rds_ib_incoming_slab);
861         kmem_cache_destroy(rds_ib_frag_slab);
862 }