RDS/IB: Use SLAB_HWCACHE_ALIGN flag for kmem_cache_create()
[linux-2.6.git] / net / rds / ib_recv.c
index e294d00..37dab28 100644 (file)
@@ -43,42 +43,6 @@ static struct kmem_cache *rds_ib_incoming_slab;
 static struct kmem_cache *rds_ib_frag_slab;
 static atomic_t        rds_ib_allocation = ATOMIC_INIT(0);
 
-static void rds_ib_frag_drop_page(struct rds_page_frag *frag)
-{
-       rdsdebug("frag %p page %p\n", frag, frag->f_page);
-       __free_page(frag->f_page);
-       frag->f_page = NULL;
-}
-
-static void rds_ib_frag_free(struct rds_page_frag *frag)
-{
-       rdsdebug("frag %p page %p\n", frag, frag->f_page);
-       BUG_ON(frag->f_page);
-       kmem_cache_free(rds_ib_frag_slab, frag);
-}
-
-/*
- * We map a page at a time.  Its fragments are posted in order.  This
- * is called in fragment order as the fragments get send completion events.
- * Only the last frag in the page performs the unmapping.
- *
- * It's OK for ring cleanup to call this in whatever order it likes because
- * DMA is not in flight and so we can unmap while other ring entries still
- * hold page references in their frags.
- */
-static void rds_ib_recv_unmap_page(struct rds_ib_connection *ic,
-                                  struct rds_ib_recv_work *recv)
-{
-       struct rds_page_frag *frag = recv->r_frag;
-
-       rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
-       if (frag->f_mapped)
-               ib_dma_unmap_page(ic->i_cm_id->device,
-                              frag->f_mapped,
-                              RDS_FRAG_SIZE, DMA_FROM_DEVICE);
-       frag->f_mapped = 0;
-}
-
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
 {
        struct rds_ib_recv_work *recv;
@@ -95,18 +59,163 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
                recv->r_wr.sg_list = recv->r_sge;
                recv->r_wr.num_sge = RDS_IB_RECV_SGE;
 
-               sge = rds_ib_data_sge(ic, recv->r_sge);
+               sge = &recv->r_sge[0];
+               sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
+               sge->length = sizeof(struct rds_header);
+               sge->lkey = ic->i_mr->lkey;
+
+               sge = &recv->r_sge[1];
                sge->addr = 0;
                sge->length = RDS_FRAG_SIZE;
                sge->lkey = ic->i_mr->lkey;
+       }
+}
 
-               sge = rds_ib_header_sge(ic, recv->r_sge);
-               sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
-               sge->length = sizeof(struct rds_header);
-               sge->lkey = ic->i_mr->lkey;
+/*
+ * The entire 'from' list, including the from element itself, is put on
+ * to the tail of the 'to' list.
+ */
+static void list_splice_entire_tail(struct list_head *from,
+                                   struct list_head *to)
+{
+       struct list_head *from_last = from->prev;
+
+       list_splice_tail(from_last, to);
+       list_add_tail(from_last, to);
+}
+
+static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache)
+{
+       struct list_head *tmp;
+
+       tmp = xchg(&cache->xfer, NULL);
+       if (tmp) {
+               if (cache->ready)
+                       list_splice_entire_tail(tmp, cache->ready);
+               else
+                       cache->ready = tmp;
        }
 }
 
+static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache)
+{
+       struct rds_ib_cache_head *head;
+       int cpu;
+
+       cache->percpu = alloc_percpu(struct rds_ib_cache_head);
+       if (!cache->percpu)
+              return -ENOMEM;
+
+       for_each_possible_cpu(cpu) {
+               head = per_cpu_ptr(cache->percpu, cpu);
+               head->first = NULL;
+               head->count = 0;
+       }
+       cache->xfer = NULL;
+       cache->ready = NULL;
+
+       return 0;
+}
+
+int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic)
+{
+       int ret;
+
+       ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs);
+       if (!ret) {
+               ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags);
+               if (ret)
+                       free_percpu(ic->i_cache_incs.percpu);
+       }
+
+       return ret;
+}
+
+static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache,
+                                         struct list_head *caller_list)
+{
+       struct rds_ib_cache_head *head;
+       int cpu;
+
+       for_each_possible_cpu(cpu) {
+               head = per_cpu_ptr(cache->percpu, cpu);
+               if (head->first) {
+                       list_splice_entire_tail(head->first, caller_list);
+                       head->first = NULL;
+               }
+       }
+
+       if (cache->ready) {
+               list_splice_entire_tail(cache->ready, caller_list);
+               cache->ready = NULL;
+       }
+}
+
+void rds_ib_recv_free_caches(struct rds_ib_connection *ic)
+{
+       struct rds_ib_incoming *inc;
+       struct rds_ib_incoming *inc_tmp;
+       struct rds_page_frag *frag;
+       struct rds_page_frag *frag_tmp;
+       LIST_HEAD(list);
+
+       rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+       rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list);
+       free_percpu(ic->i_cache_incs.percpu);
+
+       list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) {
+               list_del(&inc->ii_cache_entry);
+               WARN_ON(!list_empty(&inc->ii_frags));
+               kmem_cache_free(rds_ib_incoming_slab, inc);
+       }
+
+       rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+       rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list);
+       free_percpu(ic->i_cache_frags.percpu);
+
+       list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) {
+               list_del(&frag->f_cache_entry);
+               WARN_ON(!list_empty(&frag->f_item));
+               kmem_cache_free(rds_ib_frag_slab, frag);
+       }
+}
+
+/* fwd decl */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+                                 struct rds_ib_refill_cache *cache);
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache);
+
+
+/* Recycle frag and attached recv buffer f_sg */
+static void rds_ib_frag_free(struct rds_ib_connection *ic,
+                            struct rds_page_frag *frag)
+{
+       rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
+
+       rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
+}
+
+/* Recycle inc after freeing attached frags */
+void rds_ib_inc_free(struct rds_incoming *inc)
+{
+       struct rds_ib_incoming *ibinc;
+       struct rds_page_frag *frag;
+       struct rds_page_frag *pos;
+       struct rds_ib_connection *ic = inc->i_conn->c_transport_data;
+
+       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+
+       /* Free attached frags */
+       list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
+               list_del_init(&frag->f_item);
+               rds_ib_frag_free(ic, frag);
+       }
+       BUG_ON(!list_empty(&ibinc->ii_frags));
+
+       rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
+       rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs);
+}
+
 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
                                  struct rds_ib_recv_work *recv)
 {
@@ -115,10 +224,8 @@ static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
                recv->r_ibinc = NULL;
        }
        if (recv->r_frag) {
-               rds_ib_recv_unmap_page(ic, recv);
-               if (recv->r_frag->f_page)
-                       rds_ib_frag_drop_page(recv->r_frag);
-               rds_ib_frag_free(recv->r_frag);
+               ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
+               rds_ib_frag_free(ic, recv->r_frag);
                recv->r_frag = NULL;
        }
 }
@@ -129,84 +236,110 @@ void rds_ib_recv_clear_ring(struct rds_ib_connection *ic)
 
        for (i = 0; i < ic->i_recv_ring.w_nr; i++)
                rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
-
-       if (ic->i_frag.f_page)
-               rds_ib_frag_drop_page(&ic->i_frag);
 }
 
-static int rds_ib_recv_refill_one(struct rds_connection *conn,
-                                 struct rds_ib_recv_work *recv,
-                                 gfp_t kptr_gfp, gfp_t page_gfp)
+static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic,
+                                                    gfp_t slab_mask)
 {
-       struct rds_ib_connection *ic = conn->c_transport_data;
-       dma_addr_t dma_addr;
-       struct ib_sge *sge;
-       int ret = -ENOMEM;
+       struct rds_ib_incoming *ibinc;
+       struct list_head *cache_item;
+       int avail_allocs;
 
-       if (!recv->r_ibinc) {
-               if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) {
+       cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs);
+       if (cache_item) {
+               ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry);
+       } else {
+               avail_allocs = atomic_add_unless(&rds_ib_allocation,
+                                                1, rds_ib_sysctl_max_recv_allocation);
+               if (!avail_allocs) {
                        rds_ib_stats_inc(s_ib_rx_alloc_limit);
-                       goto out;
+                       return NULL;
                }
-               recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab,
-                                                kptr_gfp);
-               if (!recv->r_ibinc) {
+               ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask);
+               if (!ibinc) {
                        atomic_dec(&rds_ib_allocation);
-                       goto out;
+                       return NULL;
                }
-               INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
-               rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
        }
+       INIT_LIST_HEAD(&ibinc->ii_frags);
+       rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
 
-       if (!recv->r_frag) {
-               recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, kptr_gfp);
-               if (!recv->r_frag)
-                       goto out;
-               INIT_LIST_HEAD(&recv->r_frag->f_item);
-               recv->r_frag->f_page = NULL;
+       return ibinc;
+}
+
+static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic,
+                                                   gfp_t slab_mask, gfp_t page_mask)
+{
+       struct rds_page_frag *frag;
+       struct list_head *cache_item;
+       int ret;
+
+       cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
+       if (cache_item) {
+               frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
+       } else {
+               frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask);
+               if (!frag)
+                       return NULL;
+
+               ret = rds_page_remainder_alloc(&frag->f_sg,
+                                              RDS_FRAG_SIZE, page_mask);
+               if (ret) {
+                       kmem_cache_free(rds_ib_frag_slab, frag);
+                       return NULL;
+               }
        }
 
-       if (!ic->i_frag.f_page) {
-               ic->i_frag.f_page = alloc_page(page_gfp);
-               if (!ic->i_frag.f_page)
-                       goto out;
-               ic->i_frag.f_offset = 0;
+       INIT_LIST_HEAD(&frag->f_item);
+
+       return frag;
+}
+
+static int rds_ib_recv_refill_one(struct rds_connection *conn,
+                                 struct rds_ib_recv_work *recv, int prefill)
+{
+       struct rds_ib_connection *ic = conn->c_transport_data;
+       struct ib_sge *sge;
+       int ret = -ENOMEM;
+       gfp_t slab_mask = GFP_NOWAIT;
+       gfp_t page_mask = GFP_NOWAIT;
+
+       if (prefill) {
+               slab_mask = GFP_KERNEL;
+               page_mask = GFP_HIGHUSER;
        }
 
-       dma_addr = ib_dma_map_page(ic->i_cm_id->device,
-                                 ic->i_frag.f_page,
-                                 ic->i_frag.f_offset,
-                                 RDS_FRAG_SIZE,
-                                 DMA_FROM_DEVICE);
-       if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
-               goto out;
+       if (!ic->i_cache_incs.ready)
+               rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+       if (!ic->i_cache_frags.ready)
+               rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
 
        /*
-        * Once we get the RDS_PAGE_LAST_OFF frag then rds_ib_frag_unmap()
-        * must be called on this recv.  This happens as completions hit
-        * in order or on connection shutdown.
+        * ibinc was taken from recv if recv contained the start of a message.
+        * recvs that were continuations will still have this allocated.
         */
-       recv->r_frag->f_page = ic->i_frag.f_page;
-       recv->r_frag->f_offset = ic->i_frag.f_offset;
-       recv->r_frag->f_mapped = dma_addr;
+       if (!recv->r_ibinc) {
+               recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask);
+               if (!recv->r_ibinc)
+                       goto out;
+       }
 
-       sge = rds_ib_data_sge(ic, recv->r_sge);
-       sge->addr = dma_addr;
-       sge->length = RDS_FRAG_SIZE;
+       WARN_ON(recv->r_frag); /* leak! */
+       recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask);
+       if (!recv->r_frag)
+               goto out;
+
+       ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
+                           1, DMA_FROM_DEVICE);
+       WARN_ON(ret != 1);
 
-       sge = rds_ib_header_sge(ic, recv->r_sge);
+       sge = &recv->r_sge[0];
        sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
        sge->length = sizeof(struct rds_header);
 
-       get_page(recv->r_frag->f_page);
-
-       if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
-               ic->i_frag.f_offset += RDS_FRAG_SIZE;
-       } else {
-               put_page(ic->i_frag.f_page);
-               ic->i_frag.f_page = NULL;
-               ic->i_frag.f_offset = 0;
-       }
+       sge = &recv->r_sge[1];
+       sge->addr = sg_dma_address(&recv->r_frag->f_sg);
+       sge->length = sg_dma_len(&recv->r_frag->f_sg);
 
        ret = 0;
 out:
@@ -216,13 +349,11 @@ out:
 /*
  * This tries to allocate and post unused work requests after making sure that
  * they have all the allocations they need to queue received fragments into
- * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
- * pairs don't go unmatched.
+ * sockets.
  *
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
-int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
-                      gfp_t page_gfp, int prefill)
+void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
        struct rds_ib_recv_work *recv;
@@ -236,28 +367,25 @@ int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
                if (pos >= ic->i_recv_ring.w_nr) {
                        printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
                                        pos);
-                       ret = -EINVAL;
                        break;
                }
 
                recv = &ic->i_recvs[pos];
-               ret = rds_ib_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
+               ret = rds_ib_recv_refill_one(conn, recv, prefill);
                if (ret) {
-                       ret = -1;
                        break;
                }
 
                /* XXX when can this fail? */
                ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
                rdsdebug("recv %p ibinc %p page %p addr %lu ret %d\n", recv,
-                        recv->r_ibinc, recv->r_frag->f_page,
-                        (long) recv->r_frag->f_mapped, ret);
+                        recv->r_ibinc, sg_page(&recv->r_frag->f_sg),
+                        (long) sg_dma_address(&recv->r_frag->f_sg), ret);
                if (ret) {
                        rds_ib_conn_error(conn, "recv post on "
                               "%pI4 returned %d, disconnecting and "
                               "reconnecting\n", &conn->c_faddr,
                               ret);
-                       ret = -1;
                        break;
                }
 
@@ -270,37 +398,73 @@ int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
 
        if (ret)
                rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
-       return ret;
 }
 
-static void rds_ib_inc_purge(struct rds_incoming *inc)
+/*
+ * We want to recycle several types of recv allocations, like incs and frags.
+ * To use this, the *_free() function passes in the ptr to a list_head within
+ * the recyclee, as well as the cache to put it on.
+ *
+ * First, we put the memory on a percpu list. When this reaches a certain size,
+ * We move it to an intermediate non-percpu list in a lockless manner, with some
+ * xchg/compxchg wizardry.
+ *
+ * N.B. Instead of a list_head as the anchor, we use a single pointer, which can
+ * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and
+ * list_empty() will return true with one element is actually present.
+ */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+                                struct rds_ib_refill_cache *cache)
 {
-       struct rds_ib_incoming *ibinc;
-       struct rds_page_frag *frag;
-       struct rds_page_frag *pos;
+       unsigned long flags;
+       struct rds_ib_cache_head *chp;
+       struct list_head *old;
 
-       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
-       rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
+       local_irq_save(flags);
 
-       list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
-               list_del_init(&frag->f_item);
-               rds_ib_frag_drop_page(frag);
-               rds_ib_frag_free(frag);
-       }
+       chp = per_cpu_ptr(cache->percpu, smp_processor_id());
+       if (!chp->first)
+               INIT_LIST_HEAD(new_item);
+       else /* put on front */
+               list_add_tail(new_item, chp->first);
+       chp->first = new_item;
+       chp->count++;
+
+       if (chp->count < RDS_IB_RECYCLE_BATCH_COUNT)
+               goto end;
+
+       /*
+        * Return our per-cpu first list to the cache's xfer by atomically
+        * grabbing the current xfer list, appending it to our per-cpu list,
+        * and then atomically returning that entire list back to the
+        * cache's xfer list as long as it's still empty.
+        */
+       do {
+               old = xchg(&cache->xfer, NULL);
+               if (old)
+                       list_splice_entire_tail(old, chp->first);
+               old = cmpxchg(&cache->xfer, NULL, chp->first);
+       } while (old);
+
+       chp->first = NULL;
+       chp->count = 0;
+end:
+       local_irq_restore(flags);
 }
 
-void rds_ib_inc_free(struct rds_incoming *inc)
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache)
 {
-       struct rds_ib_incoming *ibinc;
-
-       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+       struct list_head *head = cache->ready;
+
+       if (head) {
+               if (!list_empty(head)) {
+                       cache->ready = head->next;
+                       list_del_init(head);
+               } else
+                       cache->ready = NULL;
+       }
 
-       rds_ib_inc_purge(inc);
-       rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
-       BUG_ON(!list_empty(&ibinc->ii_frags));
-       kmem_cache_free(rds_ib_incoming_slab, ibinc);
-       atomic_dec(&rds_ib_allocation);
-       BUG_ON(atomic_read(&rds_ib_allocation) < 0);
+       return head;
 }
 
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
@@ -336,13 +500,13 @@ int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
                to_copy = min_t(unsigned long, to_copy, len - copied);
 
                rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
-                        "[%p, %lu] + %lu\n",
+                        "[%p, %u] + %lu\n",
                         to_copy, iov->iov_base, iov->iov_len, iov_off,
-                        frag->f_page, frag->f_offset, frag_off);
+                        sg_page(&frag->f_sg), frag->f_sg.offset, frag_off);
 
                /* XXX needs + offset for multiple recvs per page */
-               ret = rds_page_copy_to_user(frag->f_page,
-                                           frag->f_offset + frag_off,
+               ret = rds_page_copy_to_user(sg_page(&frag->f_sg),
+                                           frag->f_sg.offset + frag_off,
                                            iov->iov_base + iov_off,
                                            to_copy);
                if (ret) {
@@ -557,47 +721,6 @@ u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic)
        return rds_ib_get_ack(ic);
 }
 
-static struct rds_header *rds_ib_get_header(struct rds_connection *conn,
-                                           struct rds_ib_recv_work *recv,
-                                           u32 data_len)
-{
-       struct rds_ib_connection *ic = conn->c_transport_data;
-       void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
-       void *addr;
-       u32 misplaced_hdr_bytes;
-
-       /*
-        * Support header at the front (RDS 3.1+) as well as header-at-end.
-        *
-        * Cases:
-        * 1) header all in header buff (great!)
-        * 2) header all in data page (copy all to header buff)
-        * 3) header split across hdr buf + data page
-        *    (move bit in hdr buff to end before copying other bit from data page)
-        */
-       if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDS_FRAG_SIZE)
-               return hdr_buff;
-
-       if (data_len <= (RDS_FRAG_SIZE - sizeof(struct rds_header))) {
-               addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
-               memcpy(hdr_buff,
-                      addr + recv->r_frag->f_offset + data_len,
-                      sizeof(struct rds_header));
-               kunmap_atomic(addr, KM_SOFTIRQ0);
-               return hdr_buff;
-       }
-
-       misplaced_hdr_bytes = (sizeof(struct rds_header) - (RDS_FRAG_SIZE - data_len));
-
-       memmove(hdr_buff + misplaced_hdr_bytes, hdr_buff, misplaced_hdr_bytes);
-
-       addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
-       memcpy(hdr_buff, addr + recv->r_frag->f_offset + data_len,
-              sizeof(struct rds_header) - misplaced_hdr_bytes);
-       kunmap_atomic(addr, KM_SOFTIRQ0);
-       return hdr_buff;
-}
-
 /*
  * It's kind of lame that we're copying from the posted receive pages into
  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
@@ -639,7 +762,7 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
                to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
                BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
 
-               addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
+               addr = kmap_atomic(sg_page(&frag->f_sg), KM_SOFTIRQ0);
 
                src = addr + frag_off;
                dst = (void *)map->m_page_addrs[map_page] + map_off;
@@ -710,7 +833,7 @@ static void rds_ib_process_recv(struct rds_connection *conn,
        }
        data_len -= sizeof(struct rds_header);
 
-       ihdr = rds_ib_get_header(conn, recv, data_len);
+       ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
 
        /* Validate the checksum. */
        if (!rds_message_verify_checksum(ihdr)) {
@@ -742,12 +865,12 @@ static void rds_ib_process_recv(struct rds_connection *conn,
                 * the inc is freed.  We don't go that route, so we have to drop the
                 * page ref ourselves.  We can't just leave the page on the recv
                 * because that confuses the dma mapping of pages and each recv's use
-                * of a partial page.  We can leave the frag, though, it will be
-                * reused.
+                * of a partial page.
                 *
                 * FIXME: Fold this into the code path below.
                 */
-               rds_ib_frag_drop_page(recv->r_frag);
+               rds_ib_frag_free(ic, recv->r_frag);
+               recv->r_frag = NULL;
                return;
        }
 
@@ -849,25 +972,29 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic,
 
                recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
 
-               rds_ib_recv_unmap_page(ic, recv);
+               ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
 
                /*
                 * Also process recvs in connecting state because it is possible
                 * to get a recv completion _before_ the rdmacm ESTABLISHED
                 * event is processed.
                 */
-               if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
+               if (wc.status == IB_WC_SUCCESS) {
+                       rds_ib_process_recv(conn, recv, wc.byte_len, state);
+               } else {
                        /* We expect errors as the qp is drained during shutdown */
-                       if (wc.status == IB_WC_SUCCESS) {
-                               rds_ib_process_recv(conn, recv, wc.byte_len, state);
-                       } else {
+                       if (rds_conn_up(conn) || rds_conn_connecting(conn))
                                rds_ib_conn_error(conn, "recv completion on "
-                                      "%pI4 had status %u, disconnecting and "
-                                      "reconnecting\n", &conn->c_faddr,
-                                      wc.status);
-                       }
+                                                 "%pI4 had status %u, disconnecting and "
+                                                 "reconnecting\n", &conn->c_faddr,
+                                                 wc.status);
                }
 
+               /*
+                * It's very important that we only free this ring entry if we've truly
+                * freed the resources allocated to the entry.  The refilling path can
+                * leak if we don't.
+                */
                rds_ib_ring_free(&ic->i_recv_ring, 1);
        }
 }
@@ -897,11 +1024,8 @@ void rds_ib_recv_tasklet_fn(unsigned long data)
        if (rds_ib_ring_empty(&ic->i_recv_ring))
                rds_ib_stats_inc(s_ib_rx_ring_empty);
 
-       /*
-        * If the ring is running low, then schedule the thread to refill.
-        */
        if (rds_ib_ring_low(&ic->i_recv_ring))
-               queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+               rds_ib_recv_refill(conn, 0);
 }
 
 int rds_ib_recv(struct rds_connection *conn)
@@ -910,18 +1034,6 @@ int rds_ib_recv(struct rds_connection *conn)
        int ret = 0;
 
        rdsdebug("conn %p\n", conn);
-
-       /*
-        * If we get a temporary posting failure in this context then
-        * we're really low and we want the caller to back off for a bit.
-        */
-       mutex_lock(&ic->i_recv_mutex);
-       if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
-               ret = -ENOMEM;
-       else
-               rds_ib_stats_inc(s_ib_rx_refill_from_thread);
-       mutex_unlock(&ic->i_recv_mutex);
-
        if (rds_conn_up(conn))
                rds_ib_attempt_ack(ic);
 
@@ -939,13 +1051,13 @@ int __init rds_ib_recv_init(void)
 
        rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",
                                        sizeof(struct rds_ib_incoming),
-                                       0, 0, NULL);
+                                       0, SLAB_HWCACHE_ALIGN, NULL);
        if (!rds_ib_incoming_slab)
                goto out;
 
        rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
                                        sizeof(struct rds_page_frag),
-                                       0, 0, NULL);
+                                       0, SLAB_HWCACHE_ALIGN, NULL);
        if (!rds_ib_frag_slab)
                kmem_cache_destroy(rds_ib_incoming_slab);
        else