Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/sparc-2.6
[linux-2.6.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279 #ifdef RPC_DEBUG
280         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
281 #endif
282         struct ib_qp_attr attr;
283         struct ib_qp_init_attr iattr;
284         int connstate = 0;
285
286         switch (event->event) {
287         case RDMA_CM_EVENT_ADDR_RESOLVED:
288         case RDMA_CM_EVENT_ROUTE_RESOLVED:
289                 ia->ri_async_rc = 0;
290                 complete(&ia->ri_done);
291                 break;
292         case RDMA_CM_EVENT_ADDR_ERROR:
293                 ia->ri_async_rc = -EHOSTUNREACH;
294                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
295                         __func__, ep);
296                 complete(&ia->ri_done);
297                 break;
298         case RDMA_CM_EVENT_ROUTE_ERROR:
299                 ia->ri_async_rc = -ENETUNREACH;
300                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
301                         __func__, ep);
302                 complete(&ia->ri_done);
303                 break;
304         case RDMA_CM_EVENT_ESTABLISHED:
305                 connstate = 1;
306                 ib_query_qp(ia->ri_id->qp, &attr,
307                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
308                         &iattr);
309                 dprintk("RPC:       %s: %d responder resources"
310                         " (%d initiator)\n",
311                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
312                 goto connected;
313         case RDMA_CM_EVENT_CONNECT_ERROR:
314                 connstate = -ENOTCONN;
315                 goto connected;
316         case RDMA_CM_EVENT_UNREACHABLE:
317                 connstate = -ENETDOWN;
318                 goto connected;
319         case RDMA_CM_EVENT_REJECTED:
320                 connstate = -ECONNREFUSED;
321                 goto connected;
322         case RDMA_CM_EVENT_DISCONNECTED:
323                 connstate = -ECONNABORTED;
324                 goto connected;
325         case RDMA_CM_EVENT_DEVICE_REMOVAL:
326                 connstate = -ENODEV;
327 connected:
328                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
329                         __func__,
330                         (event->event <= 11) ? conn[event->event] :
331                                                 "unknown connection error",
332                         &addr->sin_addr.s_addr,
333                         ntohs(addr->sin_port),
334                         ep, event->event);
335                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
336                 dprintk("RPC:       %s: %sconnected\n",
337                                         __func__, connstate > 0 ? "" : "dis");
338                 ep->rep_connected = connstate;
339                 ep->rep_func(ep);
340                 wake_up_all(&ep->rep_connect_wait);
341                 break;
342         default:
343                 dprintk("RPC:       %s: unexpected CM event %d\n",
344                         __func__, event->event);
345                 break;
346         }
347
348 #ifdef RPC_DEBUG
349         if (connstate == 1) {
350                 int ird = attr.max_dest_rd_atomic;
351                 int tird = ep->rep_remote_cma.responder_resources;
352                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
353                         "on %s, memreg %d slots %d ird %d%s\n",
354                         &addr->sin_addr.s_addr,
355                         ntohs(addr->sin_port),
356                         ia->ri_id->device->name,
357                         ia->ri_memreg_strategy,
358                         xprt->rx_buf.rb_max_requests,
359                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
360         } else if (connstate < 0) {
361                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
362                         &addr->sin_addr.s_addr,
363                         ntohs(addr->sin_port),
364                         connstate);
365         }
366 #endif
367
368         return 0;
369 }
370
371 static struct rdma_cm_id *
372 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373                         struct rpcrdma_ia *ia, struct sockaddr *addr)
374 {
375         struct rdma_cm_id *id;
376         int rc;
377
378         init_completion(&ia->ri_done);
379
380         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381         if (IS_ERR(id)) {
382                 rc = PTR_ERR(id);
383                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
384                         __func__, rc);
385                 return id;
386         }
387
388         ia->ri_async_rc = -ETIMEDOUT;
389         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390         if (rc) {
391                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
392                         __func__, rc);
393                 goto out;
394         }
395         wait_for_completion_interruptible_timeout(&ia->ri_done,
396                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397         rc = ia->ri_async_rc;
398         if (rc)
399                 goto out;
400
401         ia->ri_async_rc = -ETIMEDOUT;
402         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403         if (rc) {
404                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
405                         __func__, rc);
406                 goto out;
407         }
408         wait_for_completion_interruptible_timeout(&ia->ri_done,
409                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410         rc = ia->ri_async_rc;
411         if (rc)
412                 goto out;
413
414         return id;
415
416 out:
417         rdma_destroy_id(id);
418         return ERR_PTR(rc);
419 }
420
421 /*
422  * Drain any cq, prior to teardown.
423  */
424 static void
425 rpcrdma_clean_cq(struct ib_cq *cq)
426 {
427         struct ib_wc wc;
428         int count = 0;
429
430         while (1 == ib_poll_cq(cq, 1, &wc))
431                 ++count;
432
433         if (count)
434                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
435                         __func__, count, wc.opcode);
436 }
437
438 /*
439  * Exported functions.
440  */
441
442 /*
443  * Open and initialize an Interface Adapter.
444  *  o initializes fields of struct rpcrdma_ia, including
445  *    interface and provider attributes and protection zone.
446  */
447 int
448 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
449 {
450         int rc, mem_priv;
451         struct ib_device_attr devattr;
452         struct rpcrdma_ia *ia = &xprt->rx_ia;
453
454         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455         if (IS_ERR(ia->ri_id)) {
456                 rc = PTR_ERR(ia->ri_id);
457                 goto out1;
458         }
459
460         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461         if (IS_ERR(ia->ri_pd)) {
462                 rc = PTR_ERR(ia->ri_pd);
463                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
464                         __func__, rc);
465                 goto out2;
466         }
467
468         /*
469          * Query the device to determine if the requested memory
470          * registration strategy is supported. If it isn't, set the
471          * strategy to a globally supported model.
472          */
473         rc = ib_query_device(ia->ri_id->device, &devattr);
474         if (rc) {
475                 dprintk("RPC:       %s: ib_query_device failed %d\n",
476                         __func__, rc);
477                 goto out2;
478         }
479
480         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481                 ia->ri_have_dma_lkey = 1;
482                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
483         }
484
485         switch (memreg) {
486         case RPCRDMA_MEMWINDOWS:
487         case RPCRDMA_MEMWINDOWS_ASYNC:
488                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489                         dprintk("RPC:       %s: MEMWINDOWS registration "
490                                 "specified but not supported by adapter, "
491                                 "using slower RPCRDMA_REGISTER\n",
492                                 __func__);
493                         memreg = RPCRDMA_REGISTER;
494                 }
495                 break;
496         case RPCRDMA_MTHCAFMR:
497                 if (!ia->ri_id->device->alloc_fmr) {
498 #if RPCRDMA_PERSISTENT_REGISTRATION
499                         dprintk("RPC:       %s: MTHCAFMR registration "
500                                 "specified but not supported by adapter, "
501                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
502                                 __func__);
503                         memreg = RPCRDMA_ALLPHYSICAL;
504 #else
505                         dprintk("RPC:       %s: MTHCAFMR registration "
506                                 "specified but not supported by adapter, "
507                                 "using slower RPCRDMA_REGISTER\n",
508                                 __func__);
509                         memreg = RPCRDMA_REGISTER;
510 #endif
511                 }
512                 break;
513         case RPCRDMA_FRMR:
514                 /* Requires both frmr reg and local dma lkey */
515                 if ((devattr.device_cap_flags &
516                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518 #if RPCRDMA_PERSISTENT_REGISTRATION
519                         dprintk("RPC:       %s: FRMR registration "
520                                 "specified but not supported by adapter, "
521                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
522                                 __func__);
523                         memreg = RPCRDMA_ALLPHYSICAL;
524 #else
525                         dprintk("RPC:       %s: FRMR registration "
526                                 "specified but not supported by adapter, "
527                                 "using slower RPCRDMA_REGISTER\n",
528                                 __func__);
529                         memreg = RPCRDMA_REGISTER;
530 #endif
531                 }
532                 break;
533         }
534
535         /*
536          * Optionally obtain an underlying physical identity mapping in
537          * order to do a memory window-based bind. This base registration
538          * is protected from remote access - that is enabled only by binding
539          * for the specific bytes targeted during each RPC operation, and
540          * revoked after the corresponding completion similar to a storage
541          * adapter.
542          */
543         switch (memreg) {
544         case RPCRDMA_BOUNCEBUFFERS:
545         case RPCRDMA_REGISTER:
546         case RPCRDMA_FRMR:
547                 break;
548 #if RPCRDMA_PERSISTENT_REGISTRATION
549         case RPCRDMA_ALLPHYSICAL:
550                 mem_priv = IB_ACCESS_LOCAL_WRITE |
551                                 IB_ACCESS_REMOTE_WRITE |
552                                 IB_ACCESS_REMOTE_READ;
553                 goto register_setup;
554 #endif
555         case RPCRDMA_MEMWINDOWS_ASYNC:
556         case RPCRDMA_MEMWINDOWS:
557                 mem_priv = IB_ACCESS_LOCAL_WRITE |
558                                 IB_ACCESS_MW_BIND;
559                 goto register_setup;
560         case RPCRDMA_MTHCAFMR:
561                 if (ia->ri_have_dma_lkey)
562                         break;
563                 mem_priv = IB_ACCESS_LOCAL_WRITE;
564         register_setup:
565                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566                 if (IS_ERR(ia->ri_bind_mem)) {
567                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
568                                 "phys register failed with %lX\n\t"
569                                 "Will continue with degraded performance\n",
570                                 __func__, PTR_ERR(ia->ri_bind_mem));
571                         memreg = RPCRDMA_REGISTER;
572                         ia->ri_bind_mem = NULL;
573                 }
574                 break;
575         default:
576                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577                                 __func__, memreg);
578                 rc = -EINVAL;
579                 goto out2;
580         }
581         dprintk("RPC:       %s: memory registration strategy is %d\n",
582                 __func__, memreg);
583
584         /* Else will do memory reg/dereg for each chunk */
585         ia->ri_memreg_strategy = memreg;
586
587         return 0;
588 out2:
589         rdma_destroy_id(ia->ri_id);
590         ia->ri_id = NULL;
591 out1:
592         return rc;
593 }
594
595 /*
596  * Clean up/close an IA.
597  *   o if event handles and PD have been initialized, free them.
598  *   o close the IA
599  */
600 void
601 rpcrdma_ia_close(struct rpcrdma_ia *ia)
602 {
603         int rc;
604
605         dprintk("RPC:       %s: entering\n", __func__);
606         if (ia->ri_bind_mem != NULL) {
607                 rc = ib_dereg_mr(ia->ri_bind_mem);
608                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
609                         __func__, rc);
610         }
611         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612                 if (ia->ri_id->qp)
613                         rdma_destroy_qp(ia->ri_id);
614                 rdma_destroy_id(ia->ri_id);
615                 ia->ri_id = NULL;
616         }
617         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618                 rc = ib_dealloc_pd(ia->ri_pd);
619                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
620                         __func__, rc);
621         }
622 }
623
624 /*
625  * Create unconnected endpoint.
626  */
627 int
628 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629                                 struct rpcrdma_create_data_internal *cdata)
630 {
631         struct ib_device_attr devattr;
632         int rc, err;
633
634         rc = ib_query_device(ia->ri_id->device, &devattr);
635         if (rc) {
636                 dprintk("RPC:       %s: ib_query_device failed %d\n",
637                         __func__, rc);
638                 return rc;
639         }
640
641         /* check provider's send/recv wr limits */
642         if (cdata->max_requests > devattr.max_qp_wr)
643                 cdata->max_requests = devattr.max_qp_wr;
644
645         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646         ep->rep_attr.qp_context = ep;
647         /* send_cq and recv_cq initialized below */
648         ep->rep_attr.srq = NULL;
649         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650         switch (ia->ri_memreg_strategy) {
651         case RPCRDMA_FRMR:
652                 /* Add room for frmr register and invalidate WRs */
653                 ep->rep_attr.cap.max_send_wr *= 3;
654                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655                         return -EINVAL;
656                 break;
657         case RPCRDMA_MEMWINDOWS_ASYNC:
658         case RPCRDMA_MEMWINDOWS:
659                 /* Add room for mw_binds+unbinds - overkill! */
660                 ep->rep_attr.cap.max_send_wr++;
661                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663                         return -EINVAL;
664                 break;
665         default:
666                 break;
667         }
668         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670         ep->rep_attr.cap.max_recv_sge = 1;
671         ep->rep_attr.cap.max_inline_data = 0;
672         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673         ep->rep_attr.qp_type = IB_QPT_RC;
674         ep->rep_attr.port_num = ~0;
675
676         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
677                 "iovs: send %d recv %d\n",
678                 __func__,
679                 ep->rep_attr.cap.max_send_wr,
680                 ep->rep_attr.cap.max_recv_wr,
681                 ep->rep_attr.cap.max_send_sge,
682                 ep->rep_attr.cap.max_recv_sge);
683
684         /* set trigger for requesting send completion */
685         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
686         switch (ia->ri_memreg_strategy) {
687         case RPCRDMA_MEMWINDOWS_ASYNC:
688         case RPCRDMA_MEMWINDOWS:
689                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690                 break;
691         default:
692                 break;
693         }
694         if (ep->rep_cqinit <= 2)
695                 ep->rep_cqinit = 0;
696         INIT_CQCOUNT(ep);
697         ep->rep_ia = ia;
698         init_waitqueue_head(&ep->rep_connect_wait);
699
700         /*
701          * Create a single cq for receive dto and mw_bind (only ever
702          * care about unbind, really). Send completions are suppressed.
703          * Use single threaded tasklet upcalls to maintain ordering.
704          */
705         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706                                   rpcrdma_cq_async_error_upcall, NULL,
707                                   ep->rep_attr.cap.max_recv_wr +
708                                   ep->rep_attr.cap.max_send_wr + 1, 0);
709         if (IS_ERR(ep->rep_cq)) {
710                 rc = PTR_ERR(ep->rep_cq);
711                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
712                         __func__, rc);
713                 goto out1;
714         }
715
716         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717         if (rc) {
718                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
719                         __func__, rc);
720                 goto out2;
721         }
722
723         ep->rep_attr.send_cq = ep->rep_cq;
724         ep->rep_attr.recv_cq = ep->rep_cq;
725
726         /* Initialize cma parameters */
727
728         /* RPC/RDMA does not use private data */
729         ep->rep_remote_cma.private_data = NULL;
730         ep->rep_remote_cma.private_data_len = 0;
731
732         /* Client offers RDMA Read but does not initiate */
733         ep->rep_remote_cma.initiator_depth = 0;
734         if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735                 ep->rep_remote_cma.responder_resources = 0;
736         else if (devattr.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
737                 ep->rep_remote_cma.responder_resources = 32;
738         else
739                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
740
741         ep->rep_remote_cma.retry_count = 7;
742         ep->rep_remote_cma.flow_control = 0;
743         ep->rep_remote_cma.rnr_retry_count = 0;
744
745         return 0;
746
747 out2:
748         err = ib_destroy_cq(ep->rep_cq);
749         if (err)
750                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
751                         __func__, err);
752 out1:
753         return rc;
754 }
755
756 /*
757  * rpcrdma_ep_destroy
758  *
759  * Disconnect and destroy endpoint. After this, the only
760  * valid operations on the ep are to free it (if dynamically
761  * allocated) or re-create it.
762  *
763  * The caller's error handling must be sure to not leak the endpoint
764  * if this function fails.
765  */
766 int
767 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
768 {
769         int rc;
770
771         dprintk("RPC:       %s: entering, connected is %d\n",
772                 __func__, ep->rep_connected);
773
774         if (ia->ri_id->qp) {
775                 rc = rpcrdma_ep_disconnect(ep, ia);
776                 if (rc)
777                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
778                                 " returned %i\n", __func__, rc);
779                 rdma_destroy_qp(ia->ri_id);
780                 ia->ri_id->qp = NULL;
781         }
782
783         /* padding - could be done in rpcrdma_buffer_destroy... */
784         if (ep->rep_pad_mr) {
785                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786                 ep->rep_pad_mr = NULL;
787         }
788
789         rpcrdma_clean_cq(ep->rep_cq);
790         rc = ib_destroy_cq(ep->rep_cq);
791         if (rc)
792                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
793                         __func__, rc);
794
795         return rc;
796 }
797
798 /*
799  * Connect unconnected endpoint.
800  */
801 int
802 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
803 {
804         struct rdma_cm_id *id;
805         int rc = 0;
806         int retry_count = 0;
807
808         if (ep->rep_connected != 0) {
809                 struct rpcrdma_xprt *xprt;
810 retry:
811                 rc = rpcrdma_ep_disconnect(ep, ia);
812                 if (rc && rc != -ENOTCONN)
813                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
814                                 " status %i\n", __func__, rc);
815                 rpcrdma_clean_cq(ep->rep_cq);
816
817                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
818                 id = rpcrdma_create_id(xprt, ia,
819                                 (struct sockaddr *)&xprt->rx_data.addr);
820                 if (IS_ERR(id)) {
821                         rc = PTR_ERR(id);
822                         goto out;
823                 }
824                 /* TEMP TEMP TEMP - fail if new device:
825                  * Deregister/remarshal *all* requests!
826                  * Close and recreate adapter, pd, etc!
827                  * Re-determine all attributes still sane!
828                  * More stuff I haven't thought of!
829                  * Rrrgh!
830                  */
831                 if (ia->ri_id->device != id->device) {
832                         printk("RPC:       %s: can't reconnect on "
833                                 "different device!\n", __func__);
834                         rdma_destroy_id(id);
835                         rc = -ENETDOWN;
836                         goto out;
837                 }
838                 /* END TEMP */
839                 rdma_destroy_qp(ia->ri_id);
840                 rdma_destroy_id(ia->ri_id);
841                 ia->ri_id = id;
842         }
843
844         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
845         if (rc) {
846                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
847                         __func__, rc);
848                 goto out;
849         }
850
851 /* XXX Tavor device performs badly with 2K MTU! */
852 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
853         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
854         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
855             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
856              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
857                 struct ib_qp_attr attr = {
858                         .path_mtu = IB_MTU_1024
859                 };
860                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
861         }
862 }
863
864         ep->rep_connected = 0;
865
866         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
867         if (rc) {
868                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
869                                 __func__, rc);
870                 goto out;
871         }
872
873         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
874
875         /*
876          * Check state. A non-peer reject indicates no listener
877          * (ECONNREFUSED), which may be a transient state. All
878          * others indicate a transport condition which has already
879          * undergone a best-effort.
880          */
881         if (ep->rep_connected == -ECONNREFUSED &&
882             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
883                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
884                 goto retry;
885         }
886         if (ep->rep_connected <= 0) {
887                 /* Sometimes, the only way to reliably connect to remote
888                  * CMs is to use same nonzero values for ORD and IRD. */
889                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
890                     (ep->rep_remote_cma.responder_resources == 0 ||
891                      ep->rep_remote_cma.initiator_depth !=
892                                 ep->rep_remote_cma.responder_resources)) {
893                         if (ep->rep_remote_cma.responder_resources == 0)
894                                 ep->rep_remote_cma.responder_resources = 1;
895                         ep->rep_remote_cma.initiator_depth =
896                                 ep->rep_remote_cma.responder_resources;
897                         goto retry;
898                 }
899                 rc = ep->rep_connected;
900         } else {
901                 dprintk("RPC:       %s: connected\n", __func__);
902         }
903
904 out:
905         if (rc)
906                 ep->rep_connected = rc;
907         return rc;
908 }
909
910 /*
911  * rpcrdma_ep_disconnect
912  *
913  * This is separate from destroy to facilitate the ability
914  * to reconnect without recreating the endpoint.
915  *
916  * This call is not reentrant, and must not be made in parallel
917  * on the same endpoint.
918  */
919 int
920 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
921 {
922         int rc;
923
924         rpcrdma_clean_cq(ep->rep_cq);
925         rc = rdma_disconnect(ia->ri_id);
926         if (!rc) {
927                 /* returns without wait if not connected */
928                 wait_event_interruptible(ep->rep_connect_wait,
929                                                         ep->rep_connected != 1);
930                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
931                         (ep->rep_connected == 1) ? "still " : "dis");
932         } else {
933                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
934                 ep->rep_connected = rc;
935         }
936         return rc;
937 }
938
939 /*
940  * Initialize buffer memory
941  */
942 int
943 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
944         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
945 {
946         char *p;
947         size_t len;
948         int i, rc;
949         struct rpcrdma_mw *r;
950
951         buf->rb_max_requests = cdata->max_requests;
952         spin_lock_init(&buf->rb_lock);
953         atomic_set(&buf->rb_credits, 1);
954
955         /* Need to allocate:
956          *   1.  arrays for send and recv pointers
957          *   2.  arrays of struct rpcrdma_req to fill in pointers
958          *   3.  array of struct rpcrdma_rep for replies
959          *   4.  padding, if any
960          *   5.  mw's, fmr's or frmr's, if any
961          * Send/recv buffers in req/rep need to be registered
962          */
963
964         len = buf->rb_max_requests *
965                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
966         len += cdata->padding;
967         switch (ia->ri_memreg_strategy) {
968         case RPCRDMA_FRMR:
969                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
970                                 sizeof(struct rpcrdma_mw);
971                 break;
972         case RPCRDMA_MTHCAFMR:
973                 /* TBD we are perhaps overallocating here */
974                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
975                                 sizeof(struct rpcrdma_mw);
976                 break;
977         case RPCRDMA_MEMWINDOWS_ASYNC:
978         case RPCRDMA_MEMWINDOWS:
979                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
980                                 sizeof(struct rpcrdma_mw);
981                 break;
982         default:
983                 break;
984         }
985
986         /* allocate 1, 4 and 5 in one shot */
987         p = kzalloc(len, GFP_KERNEL);
988         if (p == NULL) {
989                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
990                         __func__, len);
991                 rc = -ENOMEM;
992                 goto out;
993         }
994         buf->rb_pool = p;       /* for freeing it later */
995
996         buf->rb_send_bufs = (struct rpcrdma_req **) p;
997         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
998         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
999         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1000
1001         /*
1002          * Register the zeroed pad buffer, if any.
1003          */
1004         if (cdata->padding) {
1005                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1006                                             &ep->rep_pad_mr, &ep->rep_pad);
1007                 if (rc)
1008                         goto out;
1009         }
1010         p += cdata->padding;
1011
1012         /*
1013          * Allocate the fmr's, or mw's for mw_bind chunk registration.
1014          * We "cycle" the mw's in order to minimize rkey reuse,
1015          * and also reduce unbind-to-bind collision.
1016          */
1017         INIT_LIST_HEAD(&buf->rb_mws);
1018         r = (struct rpcrdma_mw *)p;
1019         switch (ia->ri_memreg_strategy) {
1020         case RPCRDMA_FRMR:
1021                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1022                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1023                                                          RPCRDMA_MAX_SEGS);
1024                         if (IS_ERR(r->r.frmr.fr_mr)) {
1025                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1026                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1027                                         " failed %i\n", __func__, rc);
1028                                 goto out;
1029                         }
1030                         r->r.frmr.fr_pgl =
1031                                 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1032                                                             RPCRDMA_MAX_SEGS);
1033                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1034                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1035                                 dprintk("RPC:       %s: "
1036                                         "ib_alloc_fast_reg_page_list "
1037                                         "failed %i\n", __func__, rc);
1038                                 goto out;
1039                         }
1040                         list_add(&r->mw_list, &buf->rb_mws);
1041                         ++r;
1042                 }
1043                 break;
1044         case RPCRDMA_MTHCAFMR:
1045                 /* TBD we are perhaps overallocating here */
1046                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1047                         static struct ib_fmr_attr fa =
1048                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1049                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1050                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1051                                 &fa);
1052                         if (IS_ERR(r->r.fmr)) {
1053                                 rc = PTR_ERR(r->r.fmr);
1054                                 dprintk("RPC:       %s: ib_alloc_fmr"
1055                                         " failed %i\n", __func__, rc);
1056                                 goto out;
1057                         }
1058                         list_add(&r->mw_list, &buf->rb_mws);
1059                         ++r;
1060                 }
1061                 break;
1062         case RPCRDMA_MEMWINDOWS_ASYNC:
1063         case RPCRDMA_MEMWINDOWS:
1064                 /* Allocate one extra request's worth, for full cycling */
1065                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1066                         r->r.mw = ib_alloc_mw(ia->ri_pd);
1067                         if (IS_ERR(r->r.mw)) {
1068                                 rc = PTR_ERR(r->r.mw);
1069                                 dprintk("RPC:       %s: ib_alloc_mw"
1070                                         " failed %i\n", __func__, rc);
1071                                 goto out;
1072                         }
1073                         list_add(&r->mw_list, &buf->rb_mws);
1074                         ++r;
1075                 }
1076                 break;
1077         default:
1078                 break;
1079         }
1080
1081         /*
1082          * Allocate/init the request/reply buffers. Doing this
1083          * using kmalloc for now -- one for each buf.
1084          */
1085         for (i = 0; i < buf->rb_max_requests; i++) {
1086                 struct rpcrdma_req *req;
1087                 struct rpcrdma_rep *rep;
1088
1089                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1090                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1091                 /* Typical ~2400b, so rounding up saves work later */
1092                 if (len < 4096)
1093                         len = 4096;
1094                 req = kmalloc(len, GFP_KERNEL);
1095                 if (req == NULL) {
1096                         dprintk("RPC:       %s: request buffer %d alloc"
1097                                 " failed\n", __func__, i);
1098                         rc = -ENOMEM;
1099                         goto out;
1100                 }
1101                 memset(req, 0, sizeof(struct rpcrdma_req));
1102                 buf->rb_send_bufs[i] = req;
1103                 buf->rb_send_bufs[i]->rl_buffer = buf;
1104
1105                 rc = rpcrdma_register_internal(ia, req->rl_base,
1106                                 len - offsetof(struct rpcrdma_req, rl_base),
1107                                 &buf->rb_send_bufs[i]->rl_handle,
1108                                 &buf->rb_send_bufs[i]->rl_iov);
1109                 if (rc)
1110                         goto out;
1111
1112                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1113
1114                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1115                 rep = kmalloc(len, GFP_KERNEL);
1116                 if (rep == NULL) {
1117                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1118                                 __func__, i);
1119                         rc = -ENOMEM;
1120                         goto out;
1121                 }
1122                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1123                 buf->rb_recv_bufs[i] = rep;
1124                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1125                 init_waitqueue_head(&rep->rr_unbind);
1126
1127                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1128                                 len - offsetof(struct rpcrdma_rep, rr_base),
1129                                 &buf->rb_recv_bufs[i]->rr_handle,
1130                                 &buf->rb_recv_bufs[i]->rr_iov);
1131                 if (rc)
1132                         goto out;
1133
1134         }
1135         dprintk("RPC:       %s: max_requests %d\n",
1136                 __func__, buf->rb_max_requests);
1137         /* done */
1138         return 0;
1139 out:
1140         rpcrdma_buffer_destroy(buf);
1141         return rc;
1142 }
1143
1144 /*
1145  * Unregister and destroy buffer memory. Need to deal with
1146  * partial initialization, so it's callable from failed create.
1147  * Must be called before destroying endpoint, as registrations
1148  * reference it.
1149  */
1150 void
1151 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1152 {
1153         int rc, i;
1154         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1155         struct rpcrdma_mw *r;
1156
1157         /* clean up in reverse order from create
1158          *   1.  recv mr memory (mr free, then kfree)
1159          *   1a. bind mw memory
1160          *   2.  send mr memory (mr free, then kfree)
1161          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1162          *   4.  arrays
1163          */
1164         dprintk("RPC:       %s: entering\n", __func__);
1165
1166         for (i = 0; i < buf->rb_max_requests; i++) {
1167                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1168                         rpcrdma_deregister_internal(ia,
1169                                         buf->rb_recv_bufs[i]->rr_handle,
1170                                         &buf->rb_recv_bufs[i]->rr_iov);
1171                         kfree(buf->rb_recv_bufs[i]);
1172                 }
1173                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1174                         while (!list_empty(&buf->rb_mws)) {
1175                                 r = list_entry(buf->rb_mws.next,
1176                                         struct rpcrdma_mw, mw_list);
1177                                 list_del(&r->mw_list);
1178                                 switch (ia->ri_memreg_strategy) {
1179                                 case RPCRDMA_FRMR:
1180                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1181                                         if (rc)
1182                                                 dprintk("RPC:       %s:"
1183                                                         " ib_dereg_mr"
1184                                                         " failed %i\n",
1185                                                         __func__, rc);
1186                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1187                                         break;
1188                                 case RPCRDMA_MTHCAFMR:
1189                                         rc = ib_dealloc_fmr(r->r.fmr);
1190                                         if (rc)
1191                                                 dprintk("RPC:       %s:"
1192                                                         " ib_dealloc_fmr"
1193                                                         " failed %i\n",
1194                                                         __func__, rc);
1195                                         break;
1196                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1197                                 case RPCRDMA_MEMWINDOWS:
1198                                         rc = ib_dealloc_mw(r->r.mw);
1199                                         if (rc)
1200                                                 dprintk("RPC:       %s:"
1201                                                         " ib_dealloc_mw"
1202                                                         " failed %i\n",
1203                                                         __func__, rc);
1204                                         break;
1205                                 default:
1206                                         break;
1207                                 }
1208                         }
1209                         rpcrdma_deregister_internal(ia,
1210                                         buf->rb_send_bufs[i]->rl_handle,
1211                                         &buf->rb_send_bufs[i]->rl_iov);
1212                         kfree(buf->rb_send_bufs[i]);
1213                 }
1214         }
1215
1216         kfree(buf->rb_pool);
1217 }
1218
1219 /*
1220  * Get a set of request/reply buffers.
1221  *
1222  * Reply buffer (if needed) is attached to send buffer upon return.
1223  * Rule:
1224  *    rb_send_index and rb_recv_index MUST always be pointing to the
1225  *    *next* available buffer (non-NULL). They are incremented after
1226  *    removing buffers, and decremented *before* returning them.
1227  */
1228 struct rpcrdma_req *
1229 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1230 {
1231         struct rpcrdma_req *req;
1232         unsigned long flags;
1233         int i;
1234         struct rpcrdma_mw *r;
1235
1236         spin_lock_irqsave(&buffers->rb_lock, flags);
1237         if (buffers->rb_send_index == buffers->rb_max_requests) {
1238                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1239                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1240                 return ((struct rpcrdma_req *)NULL);
1241         }
1242
1243         req = buffers->rb_send_bufs[buffers->rb_send_index];
1244         if (buffers->rb_send_index < buffers->rb_recv_index) {
1245                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1246                         __func__,
1247                         buffers->rb_recv_index - buffers->rb_send_index);
1248                 req->rl_reply = NULL;
1249         } else {
1250                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1251                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1252         }
1253         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1254         if (!list_empty(&buffers->rb_mws)) {
1255                 i = RPCRDMA_MAX_SEGS - 1;
1256                 do {
1257                         r = list_entry(buffers->rb_mws.next,
1258                                         struct rpcrdma_mw, mw_list);
1259                         list_del(&r->mw_list);
1260                         req->rl_segments[i].mr_chunk.rl_mw = r;
1261                 } while (--i >= 0);
1262         }
1263         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1264         return req;
1265 }
1266
1267 /*
1268  * Put request/reply buffers back into pool.
1269  * Pre-decrement counter/array index.
1270  */
1271 void
1272 rpcrdma_buffer_put(struct rpcrdma_req *req)
1273 {
1274         struct rpcrdma_buffer *buffers = req->rl_buffer;
1275         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1276         int i;
1277         unsigned long flags;
1278
1279         BUG_ON(req->rl_nchunks != 0);
1280         spin_lock_irqsave(&buffers->rb_lock, flags);
1281         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1282         req->rl_niovs = 0;
1283         if (req->rl_reply) {
1284                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1285                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1286                 req->rl_reply->rr_func = NULL;
1287                 req->rl_reply = NULL;
1288         }
1289         switch (ia->ri_memreg_strategy) {
1290         case RPCRDMA_FRMR:
1291         case RPCRDMA_MTHCAFMR:
1292         case RPCRDMA_MEMWINDOWS_ASYNC:
1293         case RPCRDMA_MEMWINDOWS:
1294                 /*
1295                  * Cycle mw's back in reverse order, and "spin" them.
1296                  * This delays and scrambles reuse as much as possible.
1297                  */
1298                 i = 1;
1299                 do {
1300                         struct rpcrdma_mw **mw;
1301                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1302                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1303                         *mw = NULL;
1304                 } while (++i < RPCRDMA_MAX_SEGS);
1305                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1306                                         &buffers->rb_mws);
1307                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1308                 break;
1309         default:
1310                 break;
1311         }
1312         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1313 }
1314
1315 /*
1316  * Recover reply buffers from pool.
1317  * This happens when recovering from error conditions.
1318  * Post-increment counter/array index.
1319  */
1320 void
1321 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1322 {
1323         struct rpcrdma_buffer *buffers = req->rl_buffer;
1324         unsigned long flags;
1325
1326         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1327                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1328         spin_lock_irqsave(&buffers->rb_lock, flags);
1329         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1330                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1331                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1332         }
1333         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1334 }
1335
1336 /*
1337  * Put reply buffers back into pool when not attached to
1338  * request. This happens in error conditions, and when
1339  * aborting unbinds. Pre-decrement counter/array index.
1340  */
1341 void
1342 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1343 {
1344         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1345         unsigned long flags;
1346
1347         rep->rr_func = NULL;
1348         spin_lock_irqsave(&buffers->rb_lock, flags);
1349         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1350         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1351 }
1352
1353 /*
1354  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1355  */
1356
1357 int
1358 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1359                                 struct ib_mr **mrp, struct ib_sge *iov)
1360 {
1361         struct ib_phys_buf ipb;
1362         struct ib_mr *mr;
1363         int rc;
1364
1365         /*
1366          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1367          */
1368         iov->addr = ib_dma_map_single(ia->ri_id->device,
1369                         va, len, DMA_BIDIRECTIONAL);
1370         iov->length = len;
1371
1372         if (ia->ri_have_dma_lkey) {
1373                 *mrp = NULL;
1374                 iov->lkey = ia->ri_dma_lkey;
1375                 return 0;
1376         } else if (ia->ri_bind_mem != NULL) {
1377                 *mrp = NULL;
1378                 iov->lkey = ia->ri_bind_mem->lkey;
1379                 return 0;
1380         }
1381
1382         ipb.addr = iov->addr;
1383         ipb.size = iov->length;
1384         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1385                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1386
1387         dprintk("RPC:       %s: phys convert: 0x%llx "
1388                         "registered 0x%llx length %d\n",
1389                         __func__, (unsigned long long)ipb.addr,
1390                         (unsigned long long)iov->addr, len);
1391
1392         if (IS_ERR(mr)) {
1393                 *mrp = NULL;
1394                 rc = PTR_ERR(mr);
1395                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1396         } else {
1397                 *mrp = mr;
1398                 iov->lkey = mr->lkey;
1399                 rc = 0;
1400         }
1401
1402         return rc;
1403 }
1404
1405 int
1406 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1407                                 struct ib_mr *mr, struct ib_sge *iov)
1408 {
1409         int rc;
1410
1411         ib_dma_unmap_single(ia->ri_id->device,
1412                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1413
1414         if (NULL == mr)
1415                 return 0;
1416
1417         rc = ib_dereg_mr(mr);
1418         if (rc)
1419                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1420         return rc;
1421 }
1422
1423 /*
1424  * Wrappers for chunk registration, shared by read/write chunk code.
1425  */
1426
1427 static void
1428 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1429 {
1430         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1431         seg->mr_dmalen = seg->mr_len;
1432         if (seg->mr_page)
1433                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1434                                 seg->mr_page, offset_in_page(seg->mr_offset),
1435                                 seg->mr_dmalen, seg->mr_dir);
1436         else
1437                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1438                                 seg->mr_offset,
1439                                 seg->mr_dmalen, seg->mr_dir);
1440 }
1441
1442 static void
1443 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1444 {
1445         if (seg->mr_page)
1446                 ib_dma_unmap_page(ia->ri_id->device,
1447                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1448         else
1449                 ib_dma_unmap_single(ia->ri_id->device,
1450                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1451 }
1452
1453 static int
1454 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1455                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1456                         struct rpcrdma_xprt *r_xprt)
1457 {
1458         struct rpcrdma_mr_seg *seg1 = seg;
1459         struct ib_send_wr frmr_wr, *bad_wr;
1460         u8 key;
1461         int len, pageoff;
1462         int i, rc;
1463
1464         pageoff = offset_in_page(seg1->mr_offset);
1465         seg1->mr_offset -= pageoff;     /* start of page */
1466         seg1->mr_len += pageoff;
1467         len = -pageoff;
1468         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1469                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1470         for (i = 0; i < *nsegs;) {
1471                 rpcrdma_map_one(ia, seg, writing);
1472                 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1473                 len += seg->mr_len;
1474                 ++seg;
1475                 ++i;
1476                 /* Check for holes */
1477                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1478                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1479                         break;
1480         }
1481         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1482                 __func__, seg1->mr_chunk.rl_mw, i);
1483
1484         /* Bump the key */
1485         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1486         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1487
1488         /* Prepare FRMR WR */
1489         memset(&frmr_wr, 0, sizeof frmr_wr);
1490         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1491         frmr_wr.send_flags = 0;                 /* unsignaled */
1492         frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1493         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1494         frmr_wr.wr.fast_reg.page_list_len = i;
1495         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1496         frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1497         frmr_wr.wr.fast_reg.access_flags = (writing ?
1498                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1499                                 IB_ACCESS_REMOTE_READ);
1500         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1501         DECR_CQCOUNT(&r_xprt->rx_ep);
1502
1503         rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1504
1505         if (rc) {
1506                 dprintk("RPC:       %s: failed ib_post_send for register,"
1507                         " status %i\n", __func__, rc);
1508                 while (i--)
1509                         rpcrdma_unmap_one(ia, --seg);
1510         } else {
1511                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1512                 seg1->mr_base = seg1->mr_dma + pageoff;
1513                 seg1->mr_nsegs = i;
1514                 seg1->mr_len = len;
1515         }
1516         *nsegs = i;
1517         return rc;
1518 }
1519
1520 static int
1521 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1522                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1523 {
1524         struct rpcrdma_mr_seg *seg1 = seg;
1525         struct ib_send_wr invalidate_wr, *bad_wr;
1526         int rc;
1527
1528         while (seg1->mr_nsegs--)
1529                 rpcrdma_unmap_one(ia, seg++);
1530
1531         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1532         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1533         invalidate_wr.send_flags = 0;                   /* unsignaled */
1534         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1535         DECR_CQCOUNT(&r_xprt->rx_ep);
1536
1537         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1538         if (rc)
1539                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1540                         " status %i\n", __func__, rc);
1541         return rc;
1542 }
1543
1544 static int
1545 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1546                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1547 {
1548         struct rpcrdma_mr_seg *seg1 = seg;
1549         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1550         int len, pageoff, i, rc;
1551
1552         pageoff = offset_in_page(seg1->mr_offset);
1553         seg1->mr_offset -= pageoff;     /* start of page */
1554         seg1->mr_len += pageoff;
1555         len = -pageoff;
1556         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1557                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1558         for (i = 0; i < *nsegs;) {
1559                 rpcrdma_map_one(ia, seg, writing);
1560                 physaddrs[i] = seg->mr_dma;
1561                 len += seg->mr_len;
1562                 ++seg;
1563                 ++i;
1564                 /* Check for holes */
1565                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1566                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1567                         break;
1568         }
1569         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1570                                 physaddrs, i, seg1->mr_dma);
1571         if (rc) {
1572                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1573                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1574                         len, (unsigned long long)seg1->mr_dma,
1575                         pageoff, i, rc);
1576                 while (i--)
1577                         rpcrdma_unmap_one(ia, --seg);
1578         } else {
1579                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1580                 seg1->mr_base = seg1->mr_dma + pageoff;
1581                 seg1->mr_nsegs = i;
1582                 seg1->mr_len = len;
1583         }
1584         *nsegs = i;
1585         return rc;
1586 }
1587
1588 static int
1589 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1590                         struct rpcrdma_ia *ia)
1591 {
1592         struct rpcrdma_mr_seg *seg1 = seg;
1593         LIST_HEAD(l);
1594         int rc;
1595
1596         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1597         rc = ib_unmap_fmr(&l);
1598         while (seg1->mr_nsegs--)
1599                 rpcrdma_unmap_one(ia, seg++);
1600         if (rc)
1601                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1602                         " status %i\n", __func__, rc);
1603         return rc;
1604 }
1605
1606 static int
1607 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1608                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1609                         struct rpcrdma_xprt *r_xprt)
1610 {
1611         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1612                                   IB_ACCESS_REMOTE_READ);
1613         struct ib_mw_bind param;
1614         int rc;
1615
1616         *nsegs = 1;
1617         rpcrdma_map_one(ia, seg, writing);
1618         param.mr = ia->ri_bind_mem;
1619         param.wr_id = 0ULL;     /* no send cookie */
1620         param.addr = seg->mr_dma;
1621         param.length = seg->mr_len;
1622         param.send_flags = 0;
1623         param.mw_access_flags = mem_priv;
1624
1625         DECR_CQCOUNT(&r_xprt->rx_ep);
1626         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1627         if (rc) {
1628                 dprintk("RPC:       %s: failed ib_bind_mw "
1629                         "%u@0x%llx status %i\n",
1630                         __func__, seg->mr_len,
1631                         (unsigned long long)seg->mr_dma, rc);
1632                 rpcrdma_unmap_one(ia, seg);
1633         } else {
1634                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1635                 seg->mr_base = param.addr;
1636                 seg->mr_nsegs = 1;
1637         }
1638         return rc;
1639 }
1640
1641 static int
1642 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1643                         struct rpcrdma_ia *ia,
1644                         struct rpcrdma_xprt *r_xprt, void **r)
1645 {
1646         struct ib_mw_bind param;
1647         LIST_HEAD(l);
1648         int rc;
1649
1650         BUG_ON(seg->mr_nsegs != 1);
1651         param.mr = ia->ri_bind_mem;
1652         param.addr = 0ULL;      /* unbind */
1653         param.length = 0;
1654         param.mw_access_flags = 0;
1655         if (*r) {
1656                 param.wr_id = (u64) (unsigned long) *r;
1657                 param.send_flags = IB_SEND_SIGNALED;
1658                 INIT_CQCOUNT(&r_xprt->rx_ep);
1659         } else {
1660                 param.wr_id = 0ULL;
1661                 param.send_flags = 0;
1662                 DECR_CQCOUNT(&r_xprt->rx_ep);
1663         }
1664         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1665         rpcrdma_unmap_one(ia, seg);
1666         if (rc)
1667                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1668                         " status %i\n", __func__, rc);
1669         else
1670                 *r = NULL;      /* will upcall on completion */
1671         return rc;
1672 }
1673
1674 static int
1675 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1676                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1677 {
1678         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1679                                   IB_ACCESS_REMOTE_READ);
1680         struct rpcrdma_mr_seg *seg1 = seg;
1681         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1682         int len, i, rc = 0;
1683
1684         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1685                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1686         for (len = 0, i = 0; i < *nsegs;) {
1687                 rpcrdma_map_one(ia, seg, writing);
1688                 ipb[i].addr = seg->mr_dma;
1689                 ipb[i].size = seg->mr_len;
1690                 len += seg->mr_len;
1691                 ++seg;
1692                 ++i;
1693                 /* Check for holes */
1694                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1695                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1696                         break;
1697         }
1698         seg1->mr_base = seg1->mr_dma;
1699         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1700                                 ipb, i, mem_priv, &seg1->mr_base);
1701         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1702                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1703                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1704                         "%u@0x%llx (%d)... status %i\n",
1705                         __func__, len,
1706                         (unsigned long long)seg1->mr_dma, i, rc);
1707                 while (i--)
1708                         rpcrdma_unmap_one(ia, --seg);
1709         } else {
1710                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1711                 seg1->mr_nsegs = i;
1712                 seg1->mr_len = len;
1713         }
1714         *nsegs = i;
1715         return rc;
1716 }
1717
1718 static int
1719 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1720                         struct rpcrdma_ia *ia)
1721 {
1722         struct rpcrdma_mr_seg *seg1 = seg;
1723         int rc;
1724
1725         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1726         seg1->mr_chunk.rl_mr = NULL;
1727         while (seg1->mr_nsegs--)
1728                 rpcrdma_unmap_one(ia, seg++);
1729         if (rc)
1730                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1731                         " status %i\n", __func__, rc);
1732         return rc;
1733 }
1734
1735 int
1736 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1737                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1738 {
1739         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1740         int rc = 0;
1741
1742         switch (ia->ri_memreg_strategy) {
1743
1744 #if RPCRDMA_PERSISTENT_REGISTRATION
1745         case RPCRDMA_ALLPHYSICAL:
1746                 rpcrdma_map_one(ia, seg, writing);
1747                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1748                 seg->mr_base = seg->mr_dma;
1749                 seg->mr_nsegs = 1;
1750                 nsegs = 1;
1751                 break;
1752 #endif
1753
1754         /* Registration using frmr registration */
1755         case RPCRDMA_FRMR:
1756                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1757                 break;
1758
1759         /* Registration using fmr memory registration */
1760         case RPCRDMA_MTHCAFMR:
1761                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1762                 break;
1763
1764         /* Registration using memory windows */
1765         case RPCRDMA_MEMWINDOWS_ASYNC:
1766         case RPCRDMA_MEMWINDOWS:
1767                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1768                 break;
1769
1770         /* Default registration each time */
1771         default:
1772                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1773                 break;
1774         }
1775         if (rc)
1776                 return -1;
1777
1778         return nsegs;
1779 }
1780
1781 int
1782 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1783                 struct rpcrdma_xprt *r_xprt, void *r)
1784 {
1785         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1786         int nsegs = seg->mr_nsegs, rc;
1787
1788         switch (ia->ri_memreg_strategy) {
1789
1790 #if RPCRDMA_PERSISTENT_REGISTRATION
1791         case RPCRDMA_ALLPHYSICAL:
1792                 BUG_ON(nsegs != 1);
1793                 rpcrdma_unmap_one(ia, seg);
1794                 rc = 0;
1795                 break;
1796 #endif
1797
1798         case RPCRDMA_FRMR:
1799                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1800                 break;
1801
1802         case RPCRDMA_MTHCAFMR:
1803                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1804                 break;
1805
1806         case RPCRDMA_MEMWINDOWS_ASYNC:
1807         case RPCRDMA_MEMWINDOWS:
1808                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1809                 break;
1810
1811         default:
1812                 rc = rpcrdma_deregister_default_external(seg, ia);
1813                 break;
1814         }
1815         if (r) {
1816                 struct rpcrdma_rep *rep = r;
1817                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1818                 rep->rr_func = NULL;
1819                 func(rep);      /* dereg done, callback now */
1820         }
1821         return nsegs;
1822 }
1823
1824 /*
1825  * Prepost any receive buffer, then post send.
1826  *
1827  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1828  */
1829 int
1830 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1831                 struct rpcrdma_ep *ep,
1832                 struct rpcrdma_req *req)
1833 {
1834         struct ib_send_wr send_wr, *send_wr_fail;
1835         struct rpcrdma_rep *rep = req->rl_reply;
1836         int rc;
1837
1838         if (rep) {
1839                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1840                 if (rc)
1841                         goto out;
1842                 req->rl_reply = NULL;
1843         }
1844
1845         send_wr.next = NULL;
1846         send_wr.wr_id = 0ULL;   /* no send cookie */
1847         send_wr.sg_list = req->rl_send_iov;
1848         send_wr.num_sge = req->rl_niovs;
1849         send_wr.opcode = IB_WR_SEND;
1850         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1851                 ib_dma_sync_single_for_device(ia->ri_id->device,
1852                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1853                         DMA_TO_DEVICE);
1854         ib_dma_sync_single_for_device(ia->ri_id->device,
1855                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1856                 DMA_TO_DEVICE);
1857         ib_dma_sync_single_for_device(ia->ri_id->device,
1858                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1859                 DMA_TO_DEVICE);
1860
1861         if (DECR_CQCOUNT(ep) > 0)
1862                 send_wr.send_flags = 0;
1863         else { /* Provider must take a send completion every now and then */
1864                 INIT_CQCOUNT(ep);
1865                 send_wr.send_flags = IB_SEND_SIGNALED;
1866         }
1867
1868         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1869         if (rc)
1870                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1871                         rc);
1872 out:
1873         return rc;
1874 }
1875
1876 /*
1877  * (Re)post a receive buffer.
1878  */
1879 int
1880 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1881                      struct rpcrdma_ep *ep,
1882                      struct rpcrdma_rep *rep)
1883 {
1884         struct ib_recv_wr recv_wr, *recv_wr_fail;
1885         int rc;
1886
1887         recv_wr.next = NULL;
1888         recv_wr.wr_id = (u64) (unsigned long) rep;
1889         recv_wr.sg_list = &rep->rr_iov;
1890         recv_wr.num_sge = 1;
1891
1892         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1893                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1894
1895         DECR_CQCOUNT(ep);
1896         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1897
1898         if (rc)
1899                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1900                         rc);
1901         return rc;
1902 }