RDS: Add flag for silent ops. Do atomic op before RDMA
Andy Grover [Tue, 2 Mar 2010 00:10:40 +0000 (16:10 -0800)]
Add a flag to the API so users can indicate they want
silent operations. This is needed because silent ops
cannot be used with USE_ONCE MRs, so we can't just
assume silent.

Also, change send_xmit to do atomic op before rdma op if
both are present, and centralize the hairy logic to determine if
we want to attempt silent, or not.

Signed-off-by: Andy Grover <andy.grover@oracle.com>

include/linux/rds.h
net/rds/rdma.c
net/rds/rds.h
net/rds/send.c

index 9239152..109f1d3 100644 (file)
@@ -276,5 +276,6 @@ struct rds_rdma_notify {
 #define RDS_RDMA_USE_ONCE      0x0008  /* free MR after use */
 #define RDS_RDMA_DONTWAIT      0x0010  /* Don't wait in SET_BARRIER */
 #define RDS_RDMA_NOTIFY_ME     0x0020  /* Notify when operation completes */
+#define RDS_RDMA_SILENT                0x0040  /* Do not interrupt remote */
 
 #endif /* IB_RDS_H */
index 5ba5146..48781fe 100644 (file)
@@ -559,6 +559,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
        op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
        op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
        op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+       op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
        op->op_active = 1;
        op->op_recverr = rs->rs_recverr;
        WARN_ON(!nr_pages);
@@ -747,6 +748,7 @@ int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
        }
 
        rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+       rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
        rm->atomic.op_active = 1;
        rm->atomic.op_recverr = rs->rs_recverr;
        rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
index 46d190d..23b9210 100644 (file)
@@ -319,6 +319,7 @@ struct rds_message {
                        unsigned int            op_notify:1;
                        unsigned int            op_recverr:1;
                        unsigned int            op_mapped:1;
+                       unsigned int            op_silent:1;
                        unsigned int            op_active:1;
                        struct scatterlist      *op_sg;
                        struct rds_notifier     *op_notifier;
@@ -333,6 +334,7 @@ struct rds_message {
                        unsigned int            op_notify:1;
                        unsigned int            op_recverr:1;
                        unsigned int            op_mapped:1;
+                       unsigned int            op_silent:1;
                        unsigned int            op_active:1;
                        unsigned int            op_bytes;
                        unsigned int            op_nents;
index cdca974..38567f3 100644 (file)
@@ -250,42 +250,50 @@ int rds_send_xmit(struct rds_connection *conn)
                        conn->c_xmit_rm = rm;
                }
 
-               if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
-                       ret = conn->c_trans->xmit_atomic(conn, rm);
+               /* The transport either sends the whole rdma or none of it */
+               if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+                       ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
                        if (ret)
                                break;
-                       conn->c_xmit_atomic_sent = 1;
+                       conn->c_xmit_rdma_sent = 1;
+
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue */
                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
-
-                       /*
-                        * This is evil, muahaha.
-                        * We permit 0-byte sends. (rds-ping depends on this.)
-                        * BUT if there is an atomic op and no sent data,
-                        * we turn off sending the header, to achieve
-                        * "silent" atomics.
-                        * But see below; RDMA op might toggle this back on!
-                        */
-                       if (rm->data.op_nents == 0)
-                               rm->data.op_active = 0;
                }
 
-               /* The transport either sends the whole rdma or none of it */
-               if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
-                       ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
+               if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+                       ret = conn->c_trans->xmit_atomic(conn, rm);
                        if (ret)
                                break;
-                       conn->c_xmit_rdma_sent = 1;
-
-                       /* rdmas need data sent, even if just the header */
-                       rm->data.op_active = 1;
-
+                       conn->c_xmit_atomic_sent = 1;
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue */
                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
+               /*
+                * A number of cases require an RDS header to be sent
+                * even if there is no data.
+                * We permit 0-byte sends; rds-ping depends on this.
+                * However, if there are exclusively attached silent ops,
+                * we skip the hdr/data send, to enable silent operation.
+                */
+               if (rm->data.op_nents == 0) {
+                       int ops_present;
+                       int all_ops_are_silent = 1;
+
+                       ops_present = (rm->atomic.op_active || rm->rdma.op_active);
+                       if (rm->atomic.op_active && !rm->atomic.op_silent)
+                               all_ops_are_silent = 0;
+                       if (rm->rdma.op_active && !rm->rdma.op_silent)
+                               all_ops_are_silent = 0;
+
+                       if (ops_present && all_ops_are_silent
+                           && !rm->m_rdma_cookie)
+                               rm->data.op_active = 0;
+               }
+
                if (rm->data.op_active && !conn->c_xmit_data_sent) {
                        ret = conn->c_trans->xmit(conn, rm,
                                                  conn->c_xmit_hdr_off,
@@ -1009,8 +1017,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
        if (ret)
                goto out;
 
-       if ((rm->m_rdma_cookie || rm->rdma.op_active) &&
-           !conn->c_trans->xmit_rdma) {
+       if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
                if (printk_ratelimit())
                        printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
                               &rm->rdma, conn->c_trans->xmit_rdma);