inetpeer: fix a race in inetpeer_gc_worker()
[linux-2.6.git] / net / ceph / messenger.c
index 0e8157e..f0993af 100644 (file)
 #include <linux/string.h>
 #include <linux/bio.h>
 #include <linux/blkdev.h>
+#include <linux/dns_resolver.h>
 #include <net/tcp.h>
 
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/decode.h>
 #include <linux/ceph/pagelist.h>
+#include <linux/export.h>
 
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
@@ -36,47 +38,54 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 static struct lock_class_key socket_class;
 #endif
 
+/*
+ * When skipping (ignoring) a block of input we read it into a "skip
+ * buffer," which is this many bytes in size.
+ */
+#define SKIP_BUF_SIZE  1024
 
 static void queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
 static void ceph_fault(struct ceph_connection *con);
 
 /*
- * nicely render a sockaddr as a string.
+ * Nicely render a sockaddr as a string.  An array of formatted
+ * strings is used, to approximate reentrancy.
  */
-#define MAX_ADDR_STR 20
-#define MAX_ADDR_STR_LEN 60
-static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
-static DEFINE_SPINLOCK(addr_str_lock);
-static int last_addr_str;
+#define ADDR_STR_COUNT_LOG     5       /* log2(# address strings in array) */
+#define ADDR_STR_COUNT         (1 << ADDR_STR_COUNT_LOG)
+#define ADDR_STR_COUNT_MASK    (ADDR_STR_COUNT - 1)
+#define MAX_ADDR_STR_LEN       64      /* 54 is enough */
+
+static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
+static atomic_t addr_str_seq = ATOMIC_INIT(0);
+
+static struct page *zero_page;         /* used in certain error cases */
 
 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 {
        int i;
        char *s;
-       struct sockaddr_in *in4 = (void *)ss;
-       struct sockaddr_in6 *in6 = (void *)ss;
-
-       spin_lock(&addr_str_lock);
-       i = last_addr_str++;
-       if (last_addr_str == MAX_ADDR_STR)
-               last_addr_str = 0;
-       spin_unlock(&addr_str_lock);
+       struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
+       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
+
+       i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
        s = addr_str[i];
 
        switch (ss->ss_family) {
        case AF_INET:
-               snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
-                        (unsigned int)ntohs(in4->sin_port));
+               snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
+                        ntohs(in4->sin_port));
                break;
 
        case AF_INET6:
-               snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
-                        (unsigned int)ntohs(in6->sin6_port));
+               snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
+                        ntohs(in6->sin6_port));
                break;
 
        default:
-               sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family);
+               snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
+                        ss->ss_family);
        }
 
        return s;
@@ -92,24 +101,43 @@ static void encode_my_addr(struct ceph_messenger *msgr)
 /*
  * work queue for all reading and writing to/from the socket.
  */
-struct workqueue_struct *ceph_msgr_wq;
+static struct workqueue_struct *ceph_msgr_wq;
 
-int ceph_msgr_init(void)
+void _ceph_msgr_exit(void)
 {
-       ceph_msgr_wq = create_workqueue("ceph-msgr");
-       if (IS_ERR(ceph_msgr_wq)) {
-               int ret = PTR_ERR(ceph_msgr_wq);
-               pr_err("msgr_init failed to create workqueue: %d\n", ret);
+       if (ceph_msgr_wq) {
+               destroy_workqueue(ceph_msgr_wq);
                ceph_msgr_wq = NULL;
-               return ret;
        }
-       return 0;
+
+       BUG_ON(zero_page == NULL);
+       kunmap(zero_page);
+       page_cache_release(zero_page);
+       zero_page = NULL;
+}
+
+int ceph_msgr_init(void)
+{
+       BUG_ON(zero_page != NULL);
+       zero_page = ZERO_PAGE(0);
+       page_cache_get(zero_page);
+
+       ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+       if (ceph_msgr_wq)
+               return 0;
+
+       pr_err("msgr_init failed to create workqueue\n");
+       _ceph_msgr_exit();
+
+       return -ENOMEM;
 }
 EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
-       destroy_workqueue(ceph_msgr_wq);
+       BUG_ON(ceph_msgr_wq == NULL);
+
+       _ceph_msgr_exit();
 }
 EXPORT_SYMBOL(ceph_msgr_exit);
 
@@ -127,8 +155,8 @@ EXPORT_SYMBOL(ceph_msgr_flush);
 /* data available on socket, or listen socket received a connect */
 static void ceph_data_ready(struct sock *sk, int count_unused)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
+
        if (sk->sk_state != TCP_CLOSE_WAIT) {
                dout("ceph_data_ready on %p state = %lu, queueing work\n",
                     con, con->state);
@@ -139,26 +167,30 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
 /* socket has buffer space for writing */
 static void ceph_write_space(struct sock *sk)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
 
-       /* only queue to workqueue if there is data we want to write. */
+       /* only queue to workqueue if there is data we want to write,
+        * and there is sufficient space in the socket buffer to accept
+        * more data.  clear SOCK_NOSPACE so that ceph_write_space()
+        * doesn't get called again until try_write() fills the socket
+        * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
+        * and net/core/stream.c:sk_stream_write_space().
+        */
        if (test_bit(WRITE_PENDING, &con->state)) {
-               dout("ceph_write_space %p queueing write work\n", con);
-               queue_con(con);
+               if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
+                       dout("ceph_write_space %p queueing write work\n", con);
+                       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+                       queue_con(con);
+               }
        } else {
                dout("ceph_write_space %p nothing to write\n", con);
        }
-
-       /* since we have our own write_space, clear the SOCK_NOSPACE flag */
-       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 }
 
 /* socket's state has changed */
 static void ceph_state_change(struct sock *sk)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
 
        dout("ceph_state_change %p state = %lu sk_state = %u\n",
             con, con->state, sk->sk_state);
@@ -183,6 +215,8 @@ static void ceph_state_change(struct sock *sk)
                dout("ceph_state_change TCP_ESTABLISHED\n");
                queue_con(con);
                break;
+       default:        /* Everything else is uninteresting */
+               break;
        }
 }
 
@@ -193,7 +227,7 @@ static void set_sock_callbacks(struct socket *sock,
                               struct ceph_connection *con)
 {
        struct sock *sk = sock->sk;
-       sk->sk_user_data = (void *)con;
+       sk->sk_user_data = con;
        sk->sk_data_ready = ceph_data_ready;
        sk->sk_write_space = ceph_write_space;
        sk->sk_state_change = ceph_state_change;
@@ -207,7 +241,7 @@ static void set_sock_callbacks(struct socket *sock,
 /*
  * initiate connection to a remote socket.
  */
-static struct socket *ceph_tcp_connect(struct ceph_connection *con)
+static int ceph_tcp_connect(struct ceph_connection *con)
 {
        struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
        struct socket *sock;
@@ -217,8 +251,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
        ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
                               IPPROTO_TCP, &sock);
        if (ret)
-               return ERR_PTR(ret);
-       con->sock = sock;
+               return ret;
        sock->sk->sk_allocation = GFP_NOFS;
 
 #ifdef CONFIG_LOCKDEP
@@ -235,27 +268,29 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
                dout("connect %s EINPROGRESS sk_state = %u\n",
                     ceph_pr_addr(&con->peer_addr.in_addr),
                     sock->sk->sk_state);
-               ret = 0;
-       }
-       if (ret < 0) {
+       } else if (ret < 0) {
                pr_err("connect %s error %d\n",
                       ceph_pr_addr(&con->peer_addr.in_addr), ret);
                sock_release(sock);
-               con->sock = NULL;
                con->error_msg = "connect error";
+
+               return ret;
        }
+       con->sock = sock;
 
-       if (ret < 0)
-               return ERR_PTR(ret);
-       return sock;
+       return 0;
 }
 
 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
+       int r;
 
-       return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+       r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+       if (r == -EAGAIN)
+               r = 0;
+       return r;
 }
 
 /*
@@ -266,13 +301,30 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
                     size_t kvlen, size_t len, int more)
 {
        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
+       int r;
 
        if (more)
                msg.msg_flags |= MSG_MORE;
        else
                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
 
-       return kernel_sendmsg(sock, &msg, iov, kvlen, len);
+       r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+       if (r == -EAGAIN)
+               r = 0;
+       return r;
+}
+
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+                    int offset, size_t size, int more)
+{
+       int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
+       int ret;
+
+       ret = kernel_sendpage(sock, page, offset, size, flags);
+       if (ret == -EAGAIN)
+               ret = 0;
+
+       return ret;
 }
 
 
@@ -330,7 +382,6 @@ static void reset_connection(struct ceph_connection *con)
                ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
        }
-       con->out_keepalive_pending = false;
        con->in_seq = 0;
        con->in_seq_acked = 0;
 }
@@ -383,22 +434,23 @@ bool ceph_con_opened(struct ceph_connection *con)
  */
 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
 {
-       dout("con_get %p nref = %d -> %d\n", con,
-            atomic_read(&con->nref), atomic_read(&con->nref) + 1);
-       if (atomic_inc_not_zero(&con->nref))
-               return con;
-       return NULL;
+       int nref = __atomic_add_unless(&con->nref, 1, 0);
+
+       dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
+
+       return nref ? con : NULL;
 }
 
 void ceph_con_put(struct ceph_connection *con)
 {
-       dout("con_put %p nref = %d -> %d\n", con,
-            atomic_read(&con->nref), atomic_read(&con->nref) - 1);
-       BUG_ON(atomic_read(&con->nref) == 0);
-       if (atomic_dec_and_test(&con->nref)) {
+       int nref = atomic_dec_return(&con->nref);
+
+       BUG_ON(nref < 0);
+       if (nref == 0) {
                BUG_ON(con->sock);
                kfree(con);
        }
+       dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
 }
 
 /*
@@ -434,14 +486,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
        return ret;
 }
 
+static void ceph_con_out_kvec_reset(struct ceph_connection *con)
+{
+       con->out_kvec_left = 0;
+       con->out_kvec_bytes = 0;
+       con->out_kvec_cur = &con->out_kvec[0];
+}
+
+static void ceph_con_out_kvec_add(struct ceph_connection *con,
+                               size_t size, void *data)
+{
+       int index;
+
+       index = con->out_kvec_left;
+       BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
+
+       con->out_kvec[index].iov_len = size;
+       con->out_kvec[index].iov_base = data;
+       con->out_kvec_left++;
+       con->out_kvec_bytes += size;
+}
 
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
  */
-static void prepare_write_message_footer(struct ceph_connection *con, int v)
+static void prepare_write_message_footer(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->out_msg;
+       int v = con->out_kvec_left;
 
        dout("prepare_write_message_footer %p\n", con);
        con->out_kvec_is_msg = true;
@@ -459,9 +532,9 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
 static void prepare_write_message(struct ceph_connection *con)
 {
        struct ceph_msg *m;
-       int v = 0;
+       u32 crc;
 
-       con->out_kvec_bytes = 0;
+       ceph_con_out_kvec_reset(con);
        con->out_kvec_is_msg = true;
        con->out_msg_done = false;
 
@@ -469,24 +542,18 @@ static void prepare_write_message(struct ceph_connection *con)
         * TCP packet that's a good thing. */
        if (con->in_seq > con->in_seq_acked) {
                con->in_seq_acked = con->in_seq;
-               con->out_kvec[v].iov_base = &tag_ack;
-               con->out_kvec[v++].iov_len = 1;
+               ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-               con->out_kvec[v].iov_base = &con->out_temp_ack;
-               con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
-               con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
+               ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+                       &con->out_temp_ack);
        }
 
-       m = list_first_entry(&con->out_queue,
-                      struct ceph_msg, list_head);
+       m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
        con->out_msg = m;
-       if (test_bit(LOSSYTX, &con->state)) {
-               list_del_init(&m->list_head);
-       } else {
-               /* put message on sent list */
-               ceph_msg_get(m);
-               list_move_tail(&m->list_head, &con->out_sent);
-       }
+
+       /* put message on sent list */
+       ceph_msg_get(m);
+       list_move_tail(&m->list_head, &con->out_sent);
 
        /*
         * only assign outgoing seq # if we haven't sent this message
@@ -505,30 +572,26 @@ static void prepare_write_message(struct ceph_connection *con)
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
-       con->out_kvec[v].iov_base = &tag_msg;
-       con->out_kvec[v++].iov_len = 1;
-       con->out_kvec[v].iov_base = &m->hdr;
-       con->out_kvec[v++].iov_len = sizeof(m->hdr);
-       con->out_kvec[v++] = m->front;
+       ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+       ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+       ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+
        if (m->middle)
-               con->out_kvec[v++] = m->middle->vec;
-       con->out_kvec_left = v;
-       con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
-               (m->middle ? m->middle->vec.iov_len : 0);
-       con->out_kvec_cur = con->out_kvec;
+               ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
+                       m->middle->vec.iov_base);
 
        /* fill in crc (except data pages), footer */
-       con->out_msg->hdr.crc =
-               cpu_to_le32(crc32c(0, (void *)&m->hdr,
-                                     sizeof(m->hdr) - sizeof(m->hdr.crc)));
+       crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
+       con->out_msg->hdr.crc = cpu_to_le32(crc);
        con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
-       con->out_msg->footer.front_crc =
-               cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
-       if (m->middle)
-               con->out_msg->footer.middle_crc =
-                       cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
-                                          m->middle->vec.iov_len));
-       else
+
+       crc = crc32c(0, m->front.iov_base, m->front.iov_len);
+       con->out_msg->footer.front_crc = cpu_to_le32(crc);
+       if (m->middle) {
+               crc = crc32c(0, m->middle->vec.iov_base,
+                               m->middle->vec.iov_len);
+               con->out_msg->footer.middle_crc = cpu_to_le32(crc);
+       } else
                con->out_msg->footer.middle_crc = 0;
        con->out_msg->footer.data_crc = 0;
        dout("prepare_write_message front_crc %u data_crc %u\n",
@@ -540,16 +603,15 @@ static void prepare_write_message(struct ceph_connection *con)
                /* initialize page iterator */
                con->out_msg_pos.page = 0;
                if (m->pages)
-                       con->out_msg_pos.page_pos =
-                               le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+                       con->out_msg_pos.page_pos = m->page_alignment;
                else
                        con->out_msg_pos.page_pos = 0;
                con->out_msg_pos.data_pos = 0;
-               con->out_msg_pos.did_page_crc = 0;
+               con->out_msg_pos.did_page_crc = false;
                con->out_more = 1;  /* data + footer will follow */
        } else {
                /* no, queue up footer too and be done */
-               prepare_write_message_footer(con, v);
+               prepare_write_message_footer(con);
        }
 
        set_bit(WRITE_PENDING, &con->state);
@@ -564,14 +626,14 @@ static void prepare_write_ack(struct ceph_connection *con)
             con->in_seq_acked, con->in_seq);
        con->in_seq_acked = con->in_seq;
 
-       con->out_kvec[0].iov_base = &tag_ack;
-       con->out_kvec[0].iov_len = 1;
+       ceph_con_out_kvec_reset(con);
+
+       ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+
        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-       con->out_kvec[1].iov_base = &con->out_temp_ack;
-       con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
-       con->out_kvec_cur = con->out_kvec;
+       ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+                               &con->out_temp_ack);
+
        con->out_more = 1;  /* more will follow.. eventually.. */
        set_bit(WRITE_PENDING, &con->state);
 }
@@ -582,11 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con)
 static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
-       con->out_kvec[0].iov_base = &tag_keepalive;
-       con->out_kvec[0].iov_len = 1;
-       con->out_kvec_left = 1;
-       con->out_kvec_bytes = 1;
-       con->out_kvec_cur = con->out_kvec;
+       ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -594,7 +653,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
  * Connection negotiation.
  */
 
-static void prepare_connect_authorizer(struct ceph_connection *con)
+static int prepare_connect_authorizer(struct ceph_connection *con)
 {
        void *auth_buf;
        int auth_len = 0;
@@ -608,13 +667,17 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
                                         con->auth_retry);
        mutex_lock(&con->mutex);
 
+       if (test_bit(CLOSED, &con->state) ||
+           test_bit(OPENING, &con->state))
+               return -EAGAIN;
+
        con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
        con->out_connect.authorizer_len = cpu_to_le32(auth_len);
 
-       con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
-       con->out_kvec[con->out_kvec_left].iov_len = auth_len;
-       con->out_kvec_left++;
-       con->out_kvec_bytes += auth_len;
+       if (auth_len)
+               ceph_con_out_kvec_add(con, auth_len, auth_buf);
+
+       return 0;
 }
 
 /*
@@ -623,22 +686,18 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
 static void prepare_write_banner(struct ceph_messenger *msgr,
                                 struct ceph_connection *con)
 {
-       int len = strlen(CEPH_BANNER);
+       ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+       ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
+                                       &msgr->my_enc_addr);
 
-       con->out_kvec[0].iov_base = CEPH_BANNER;
-       con->out_kvec[0].iov_len = len;
-       con->out_kvec[1].iov_base = &msgr->my_enc_addr;
-       con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
-       con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_connect(struct ceph_messenger *msgr,
-                                 struct ceph_connection *con,
-                                 int after_banner)
+static int prepare_write_connect(struct ceph_messenger *msgr,
+                                struct ceph_connection *con,
+                                int include_banner)
 {
        unsigned global_seq = get_global_seq(con->msgr, 0);
        int proto;
@@ -667,22 +726,18 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        con->out_connect.protocol_version = cpu_to_le32(proto);
        con->out_connect.flags = 0;
 
-       if (!after_banner) {
-               con->out_kvec_left = 0;
-               con->out_kvec_bytes = 0;
-       }
-       con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
-       con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
-       con->out_kvec_left++;
-       con->out_kvec_bytes += sizeof(con->out_connect);
-       con->out_kvec_cur = con->out_kvec;
+       if (include_banner)
+               prepare_write_banner(msgr, con);
+       else
+               ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
+
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 
-       prepare_connect_authorizer(con);
+       return prepare_connect_authorizer(con);
 }
 
-
 /*
  * write as much of pending kvecs to the socket as we can.
  *  1 -> done
@@ -703,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con)
                con->out_kvec_bytes -= ret;
                if (con->out_kvec_bytes == 0)
                        break;            /* done */
-               while (ret > 0) {
-                       if (ret >= con->out_kvec_cur->iov_len) {
-                               ret -= con->out_kvec_cur->iov_len;
-                               con->out_kvec_cur++;
-                               con->out_kvec_left--;
-                       } else {
-                               con->out_kvec_cur->iov_len -= ret;
-                               con->out_kvec_cur->iov_base += ret;
-                               ret = 0;
-                               break;
-                       }
+
+               /* account for full iov entries consumed */
+               while (ret >= con->out_kvec_cur->iov_len) {
+                       BUG_ON(!con->out_kvec_left);
+                       ret -= con->out_kvec_cur->iov_len;
+                       con->out_kvec_cur++;
+                       con->out_kvec_left--;
+               }
+               /* and for a partially-consumed entry */
+               if (ret) {
+                       con->out_kvec_cur->iov_len -= ret;
+                       con->out_kvec_cur->iov_base += ret;
                }
        }
        con->out_kvec_left = 0;
@@ -762,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        struct ceph_msg *msg = con->out_msg;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
-       int crc = con->msgr->nocrc;
+       bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
        int in_trail = 0;
@@ -779,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
        while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
-               void *kaddr = NULL;
                int max_write = PAGE_SIZE;
-               int page_shift = 0;
+               int bio_offset = 0;
 
                total_max_write = data_len - trail_len -
                        con->out_msg_pos.data_pos;
@@ -800,54 +855,45 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       if (crc)
-                               kaddr = kmap(page);
                        max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
-                       if (crc)
-                               kaddr = kmap(page);
                } else if (msg->pagelist) {
                        page = list_first_entry(&msg->pagelist->head,
                                                struct page, lru);
-                       if (crc)
-                               kaddr = kmap(page);
 #ifdef CONFIG_BLOCK
                } else if (msg->bio) {
                        struct bio_vec *bv;
 
                        bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
                        page = bv->bv_page;
-                       page_shift = bv->bv_offset;
-                       if (crc)
-                               kaddr = kmap(page) + page_shift;
+                       bio_offset = bv->bv_offset;
                        max_write = bv->bv_len;
 #endif
                } else {
-                       page = con->msgr->zero_page;
-                       if (crc)
-                               kaddr = page_address(con->msgr->zero_page);
+                       page = zero_page;
                }
                len = min_t(int, max_write - con->out_msg_pos.page_pos,
                            total_max_write);
 
-               if (crc && !con->out_msg_pos.did_page_crc) {
-                       void *base = kaddr + con->out_msg_pos.page_pos;
+               if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+                       void *base;
+                       u32 crc;
                        u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       char *kaddr;
 
+                       kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
-                       con->out_msg->footer.data_crc =
-                               cpu_to_le32(crc32c(tmpcrc, base, len));
-                       con->out_msg_pos.did_page_crc = 1;
+                       base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+                       crc = crc32c(tmpcrc, base, len);
+                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
+                       con->out_msg_pos.did_page_crc = true;
                }
-               ret = kernel_sendpage(con->sock, page,
-                                     con->out_msg_pos.page_pos + page_shift,
-                                     len,
-                                     MSG_DONTWAIT | MSG_NOSIGNAL |
-                                     MSG_MORE);
-
-               if (crc &&
-                   (msg->pages || msg->pagelist || msg->bio || in_trail))
+               ret = ceph_tcp_sendpage(con->sock, page,
+                                     con->out_msg_pos.page_pos + bio_offset,
+                                     len, 1);
+
+               if (do_datacrc)
                        kunmap(page);
 
                if (ret <= 0)
@@ -858,7 +904,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (ret == len) {
                        con->out_msg_pos.page_pos = 0;
                        con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = 0;
+                       con->out_msg_pos.did_page_crc = false;
                        if (in_trail)
                                list_move_tail(&page->lru,
                                               &msg->trail->head);
@@ -875,12 +921,10 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
-       if (!crc)
+       if (!do_datacrc)
                con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-       con->out_kvec_bytes = 0;
-       con->out_kvec_left = 0;
-       con->out_kvec_cur = con->out_kvec;
-       prepare_write_message_footer(con, 0);
+       ceph_con_out_kvec_reset(con);
+       prepare_write_message_footer(con);
        ret = 1;
 out:
        return ret;
@@ -894,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con)
        int ret;
 
        while (con->out_skip > 0) {
-               struct kvec iov = {
-                       .iov_base = page_address(con->msgr->zero_page),
-                       .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
-               };
+               size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
-               ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
+               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
                if (ret <= 0)
                        goto out;
                con->out_skip -= ret;
@@ -1059,12 +1100,109 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
        switch (ss->ss_family) {
        case AF_INET:
                ((struct sockaddr_in *)ss)->sin_port = htons(p);
+               break;
        case AF_INET6:
                ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
+               break;
        }
 }
 
 /*
+ * Unlike other *_pton function semantics, zero indicates success.
+ */
+static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
+               char delim, const char **ipend)
+{
+       struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
+       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
+
+       memset(ss, 0, sizeof(*ss));
+
+       if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
+               ss->ss_family = AF_INET;
+               return 0;
+       }
+
+       if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
+               ss->ss_family = AF_INET6;
+               return 0;
+       }
+
+       return -EINVAL;
+}
+
+/*
+ * Extract hostname string and resolve using kernel DNS facility.
+ */
+#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
+static int ceph_dns_resolve_name(const char *name, size_t namelen,
+               struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+       const char *end, *delim_p;
+       char *colon_p, *ip_addr = NULL;
+       int ip_len, ret;
+
+       /*
+        * The end of the hostname occurs immediately preceding the delimiter or
+        * the port marker (':') where the delimiter takes precedence.
+        */
+       delim_p = memchr(name, delim, namelen);
+       colon_p = memchr(name, ':', namelen);
+
+       if (delim_p && colon_p)
+               end = delim_p < colon_p ? delim_p : colon_p;
+       else if (!delim_p && colon_p)
+               end = colon_p;
+       else {
+               end = delim_p;
+               if (!end) /* case: hostname:/ */
+                       end = name + namelen;
+       }
+
+       if (end <= name)
+               return -EINVAL;
+
+       /* do dns_resolve upcall */
+       ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
+       if (ip_len > 0)
+               ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
+       else
+               ret = -ESRCH;
+
+       kfree(ip_addr);
+
+       *ipend = end;
+
+       pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
+                       ret, ret ? "failed" : ceph_pr_addr(ss));
+
+       return ret;
+}
+#else
+static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
+               struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+       return -EINVAL;
+}
+#endif
+
+/*
+ * Parse a server name (IP or hostname). If a valid IP address is not found
+ * then try to extract a hostname to resolve using userspace DNS upcall.
+ */
+static int ceph_parse_server_name(const char *name, size_t namelen,
+                       struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+       int ret;
+
+       ret = ceph_pton(name, namelen, ss, delim, ipend);
+       if (ret)
+               ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
+
+       return ret;
+}
+
+/*
  * Parse an ip[:port] list into an addr array.  Use the default
  * monitor port if a port isn't specified.
  */
@@ -1072,15 +1210,13 @@ int ceph_parse_ips(const char *c, const char *end,
                   struct ceph_entity_addr *addr,
                   int max_count, int *count)
 {
-       int i;
+       int i, ret = -EINVAL;
        const char *p = c;
 
        dout("parse_ips on '%.*s'\n", (int)(end-c), c);
        for (i = 0; i < max_count; i++) {
                const char *ipend;
                struct sockaddr_storage *ss = &addr[i].in_addr;
-               struct sockaddr_in *in4 = (void *)ss;
-               struct sockaddr_in6 *in6 = (void *)ss;
                int port;
                char delim = ',';
 
@@ -1089,15 +1225,11 @@ int ceph_parse_ips(const char *c, const char *end,
                        p++;
                }
 
-               memset(ss, 0, sizeof(*ss));
-               if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
-                            delim, &ipend))
-                       ss->ss_family = AF_INET;
-               else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
-                                 delim, &ipend))
-                       ss->ss_family = AF_INET6;
-               else
+               ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
+               if (ret)
                        goto bad;
+               ret = -EINVAL;
+
                p = ipend;
 
                if (delim == ']') {
@@ -1142,7 +1274,7 @@ int ceph_parse_ips(const char *c, const char *end,
 
 bad:
        pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
-       return -EINVAL;
+       return ret;
 }
 EXPORT_SYMBOL(ceph_parse_ips);
 
@@ -1210,6 +1342,7 @@ static int process_connect(struct ceph_connection *con)
        u64 sup_feat = con->msgr->supported_features;
        u64 req_feat = con->msgr->required_features;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
+       int ret;
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
 
@@ -1241,12 +1374,12 @@ static int process_connect(struct ceph_connection *con)
                     con->auth_retry);
                if (con->auth_retry == 2) {
                        con->error_msg = "connect authorization failure";
-                       reset_connection(con);
-                       set_bit(CLOSED, &con->state);
                        return -1;
                }
                con->auth_retry = 1;
-               prepare_write_connect(con->msgr, con, 0);
+               ret = prepare_write_connect(con->msgr, con, 0);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
 
@@ -1273,6 +1406,9 @@ static int process_connect(struct ceph_connection *con)
                if (con->ops->peer_reset)
                        con->ops->peer_reset(con);
                mutex_lock(&con->mutex);
+               if (test_bit(CLOSED, &con->state) ||
+                   test_bit(OPENING, &con->state))
+                       return -EAGAIN;
                break;
 
        case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1337,7 +1473,9 @@ static int process_connect(struct ceph_connection *con)
                 * to WAIT.  This shouldn't happen if we are the
                 * client.
                 */
-               pr_err("process_connect peer connecting WAIT\n");
+               pr_err("process_connect got WAIT as client\n");
+               con->error_msg = "protocol error, got WAIT as client";
+               return -1;
 
        default:
                pr_err("connect protocol error, will retry\n");
@@ -1377,6 +1515,7 @@ static void process_ack(struct ceph_connection *con)
                        break;
                dout("got ack for seq %llu type %d at %p\n", seq,
                     le16_to_cpu(m->hdr.type), m);
+               m->ack_stamp = jiffies;
                ceph_msg_remove(m);
        }
        prepare_read_tag(con);
@@ -1401,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con,
                if (ret <= 0)
                        return ret;
                section->iov_len += ret;
-               if (section->iov_len == sec_len)
-                       *crc = crc32c(0, section->iov_base,
-                                     section->iov_len);
        }
+       if (section->iov_len == sec_len)
+               *crc = crc32c(0, section->iov_base, section->iov_len);
 
        return 1;
 }
@@ -1416,7 +1554,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, int datacrc)
+                                     unsigned data_len, bool do_datacrc)
 {
        void *p;
        int ret;
@@ -1429,7 +1567,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
        p = kmap(pages[con->in_msg_pos.page]);
        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
                               left);
-       if (ret > 0 && datacrc)
+       if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
@@ -1449,7 +1587,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, int datacrc)
+                                   unsigned data_len, bool do_datacrc)
 {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
@@ -1465,7 +1603,7 @@ static int read_partial_message_bio(struct ceph_connection *con,
 
        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
                               left);
-       if (ret > 0 && datacrc)
+       if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
@@ -1491,10 +1629,11 @@ static int read_partial_message(struct ceph_connection *con)
        struct ceph_msg *m = con->in_msg;
        int ret;
        int to, left;
-       unsigned front_len, middle_len, data_len, data_off;
-       int datacrc = con->msgr->nocrc;
+       unsigned front_len, middle_len, data_len;
+       bool do_datacrc = !con->msgr->nocrc;
        int skip;
        u64 seq;
+       u32 crc;
 
        dout("read_partial_message con %p msg %p\n", con, m);
 
@@ -1507,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con)
                if (ret <= 0)
                        return ret;
                con->in_base_pos += ret;
-               if (con->in_base_pos == sizeof(con->in_hdr)) {
-                       u32 crc = crc32c(0, (void *)&con->in_hdr,
-                                sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
-                       if (crc != le32_to_cpu(con->in_hdr.crc)) {
-                               pr_err("read_partial_message bad hdr "
-                                      " crc %u != expected %u\n",
-                                      crc, con->in_hdr.crc);
-                               return -EBADMSG;
-                       }
-               }
        }
+
+       crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
+       if (cpu_to_le32(crc) != con->in_hdr.crc) {
+               pr_err("read_partial_message bad hdr "
+                      " crc %u != expected %u\n",
+                      crc, con->in_hdr.crc);
+               return -EBADMSG;
+       }
+
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
@@ -1527,19 +1665,17 @@ static int read_partial_message(struct ceph_connection *con)
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
-       data_off = le16_to_cpu(con->in_hdr.data_off);
 
        /* verify seq# */
        seq = le64_to_cpu(con->in_hdr.seq);
        if ((s64)seq - (s64)con->in_seq < 1) {
-               pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
+               pr_info("skipping %s%lld %s seq %lld expected %lld\n",
                        ENTITY_NAME(con->peer_name),
                        ceph_pr_addr(&con->peer_addr.in_addr),
                        seq, con->in_seq + 1);
                con->in_base_pos = -front_len - middle_len - data_len -
                        sizeof(m->footer);
                con->in_tag = CEPH_MSGR_TAG_READY;
-               con->in_seq++;
                return 0;
        } else if ((s64)seq - (s64)con->in_seq > 1) {
                pr_err("read_partial_message bad seq %lld expected %lld\n",
@@ -1576,7 +1712,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                con->in_msg_pos.page = 0;
                if (m->pages)
-                       con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+                       con->in_msg_pos.page_pos = m->page_alignment;
                else
                        con->in_msg_pos.page_pos = 0;
                con->in_msg_pos.data_pos = 0;
@@ -1605,7 +1741,7 @@ static int read_partial_message(struct ceph_connection *con)
        while (con->in_msg_pos.data_pos < data_len) {
                if (m->pages) {
                        ret = read_partial_message_pages(con, m->pages,
-                                                data_len, datacrc);
+                                                data_len, do_datacrc);
                        if (ret <= 0)
                                return ret;
 #ifdef CONFIG_BLOCK
@@ -1613,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                        ret = read_partial_message_bio(con,
                                                 &m->bio_iter, &m->bio_seg,
-                                                data_len, datacrc);
+                                                data_len, do_datacrc);
                        if (ret <= 0)
                                return ret;
 #endif
@@ -1648,7 +1784,7 @@ static int read_partial_message(struct ceph_connection *con)
                       m, con->in_middle_crc, m->footer.middle_crc);
                return -EBADMSG;
        }
-       if (datacrc &&
+       if (do_datacrc &&
            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
@@ -1710,15 +1846,6 @@ more:
 
        /* open the socket first? */
        if (con->sock == NULL) {
-               /*
-                * if we were STANDBY and are reconnecting _this_
-                * connection, bump connect_seq now.  Always bump
-                * global_seq.
-                */
-               if (test_and_clear_bit(STANDBY, &con->state))
-                       con->connect_seq++;
-
-               prepare_write_banner(msgr, con);
                prepare_write_connect(msgr, con, 1);
                prepare_read_banner(con);
                set_bit(CONNECTING, &con->state);
@@ -1728,11 +1855,9 @@ more:
                con->in_tag = CEPH_MSGR_TAG_READY;
                dout("try_write initiating connect on %p new state %lu\n",
                     con, con->state);
-               con->sock = ceph_tcp_connect(con);
-               if (IS_ERR(con->sock)) {
-                       con->sock = NULL;
+               ret = ceph_tcp_connect(con);
+               if (ret < 0) {
                        con->error_msg = "connect error";
-                       ret = -1;
                        goto out;
                }
        }
@@ -1742,16 +1867,12 @@ more_kvec:
        if (con->out_skip) {
                ret = write_partial_skip(con);
                if (ret <= 0)
-                       goto done;
-               if (ret < 0) {
-                       dout("try_write write_partial_skip err %d\n", ret);
-                       goto done;
-               }
+                       goto out;
        }
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
                if (ret <= 0)
-                       goto done;
+                       goto out;
        }
 
        /* msg pages? */
@@ -1766,11 +1887,11 @@ more_kvec:
                if (ret == 1)
                        goto more_kvec;  /* we need to send the footer, too! */
                if (ret == 0)
-                       goto done;
+                       goto out;
                if (ret < 0) {
                        dout("try_write write_partial_msg_pages err %d\n",
                             ret);
-                       goto done;
+                       goto out;
                }
        }
 
@@ -1794,10 +1915,9 @@ do_next:
        /* Nothing to do! */
        clear_bit(WRITE_PENDING, &con->state);
        dout("try_write nothing else to write.\n");
-done:
        ret = 0;
 out:
-       dout("try_write done on %p\n", con);
+       dout("try_write done on %p ret %d\n", con, ret);
        return ret;
 }
 
@@ -1821,24 +1941,33 @@ static int try_read(struct ceph_connection *con)
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
+
+       /*
+        * process_connect and process_message drop and re-take
+        * con->mutex.  make sure we handle a racing close or reopen.
+        */
+       if (test_bit(CLOSED, &con->state) ||
+           test_bit(OPENING, &con->state)) {
+               ret = -EAGAIN;
+               goto out;
+       }
+
        if (test_bit(CONNECTING, &con->state)) {
                if (!test_bit(NEGOTIATING, &con->state)) {
                        dout("try_read connecting\n");
                        ret = read_partial_banner(con);
                        if (ret <= 0)
-                               goto done;
-                       if (process_banner(con) < 0) {
-                               ret = -1;
                                goto out;
-                       }
+                       ret = process_banner(con);
+                       if (ret < 0)
+                               goto out;
                }
                ret = read_partial_connect(con);
                if (ret <= 0)
-                       goto done;
-               if (process_connect(con) < 0) {
-                       ret = -1;
                        goto out;
-               }
+               ret = process_connect(con);
+               if (ret < 0)
+                       goto out;
                goto more;
        }
 
@@ -1848,12 +1977,13 @@ more:
                 *
                 * FIXME: there must be a better way to do this!
                 */
-               static char buf[1024];
-               int skip = min(1024, -con->in_base_pos);
+               static char buf[SKIP_BUF_SIZE];
+               int skip = min((int) sizeof (buf), -con->in_base_pos);
+
                dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
                ret = ceph_tcp_recvmsg(con->sock, buf, skip);
                if (ret <= 0)
-                       goto done;
+                       goto out;
                con->in_base_pos += ret;
                if (con->in_base_pos)
                        goto more;
@@ -1864,7 +1994,7 @@ more:
                 */
                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
                if (ret <= 0)
-                       goto done;
+                       goto out;
                dout("try_read got tag %d\n", (int)con->in_tag);
                switch (con->in_tag) {
                case CEPH_MSGR_TAG_MSG:
@@ -1875,7 +2005,7 @@ more:
                        break;
                case CEPH_MSGR_TAG_CLOSE:
                        set_bit(CLOSED, &con->state);   /* fixme */
-                       goto done;
+                       goto out;
                default:
                        goto bad_tag;
                }
@@ -1887,13 +2017,12 @@ more:
                        case -EBADMSG:
                                con->error_msg = "bad crc";
                                ret = -EIO;
-                               goto out;
+                               break;
                        case -EIO:
                                con->error_msg = "io error";
-                               goto out;
-                       default:
-                               goto done;
+                               break;
                        }
+                       goto out;
                }
                if (con->in_tag == CEPH_MSGR_TAG_READY)
                        goto more;
@@ -1903,15 +2032,13 @@ more:
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
                ret = read_partial_ack(con);
                if (ret <= 0)
-                       goto done;
+                       goto out;
                process_ack(con);
                goto more;
        }
 
-done:
-       ret = 0;
 out:
-       dout("try_read done on %p\n", con);
+       dout("try_read done on %p ret %d\n", con, ret);
        return ret;
 
 bad_tag:
@@ -1925,20 +2052,6 @@ bad_tag:
 /*
  * Atomically queue work on a connection.  Bump @con reference to
  * avoid races with connection teardown.
- *
- * There is some trickery going on with QUEUED and BUSY because we
- * only want a _single_ thread operating on each connection at any
- * point in time, but we want to use all available CPUs.
- *
- * The worker thread only proceeds if it can atomically set BUSY.  It
- * clears QUEUED and does it's thing.  When it thinks it's done, it
- * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
- * (tries again to set BUSY).
- *
- * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
- * try to queue work.  If that fails (work is already queued, or BUSY)
- * we give up (work also already being done or is queued) but leave QUEUED
- * set so that the worker thread will loop if necessary.
  */
 static void queue_con(struct ceph_connection *con)
 {
@@ -1953,11 +2066,7 @@ static void queue_con(struct ceph_connection *con)
                return;
        }
 
-       set_bit(QUEUED, &con->state);
-       if (test_bit(BUSY, &con->state)) {
-               dout("queue_con %p - already BUSY\n", con);
-               con->ops->put(con);
-       } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
+       if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
                dout("queue_con %p - already queued\n", con);
                con->ops->put(con);
        } else {
@@ -1972,18 +2081,28 @@ static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
-       int backoff = 0;
-
-more:
-       if (test_and_set_bit(BUSY, &con->state) != 0) {
-               dout("con_work %p BUSY already set\n", con);
-               goto out;
-       }
-       dout("con_work %p start, clearing QUEUED\n", con);
-       clear_bit(QUEUED, &con->state);
+       int ret;
 
        mutex_lock(&con->mutex);
+restart:
+       if (test_and_clear_bit(BACKOFF, &con->state)) {
+               dout("con_work %p backing off\n", con);
+               if (queue_delayed_work(ceph_msgr_wq, &con->work,
+                                      round_jiffies_relative(con->delay))) {
+                       dout("con_work %p backoff %lu\n", con, con->delay);
+                       mutex_unlock(&con->mutex);
+                       return;
+               } else {
+                       con->ops->put(con);
+                       dout("con_work %p FAILED to back off %lu\n", con,
+                            con->delay);
+               }
+       }
 
+       if (test_bit(STANDBY, &con->state)) {
+               dout("con_work %p STANDBY\n", con);
+               goto done;
+       }
        if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
                dout("con_work CLOSED\n");
                con_close_socket(con);
@@ -1995,33 +2114,31 @@ more:
                con_close_socket(con);
        }
 
-       if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
-           try_read(con) < 0 ||
-           try_write(con) < 0) {
-               mutex_unlock(&con->mutex);
-               backoff = 1;
-               ceph_fault(con);     /* error/fault path */
-               goto done_unlocked;
-       }
+       if (test_and_clear_bit(SOCK_CLOSED, &con->state))
+               goto fault;
+
+       ret = try_read(con);
+       if (ret == -EAGAIN)
+               goto restart;
+       if (ret < 0)
+               goto fault;
+
+       ret = try_write(con);
+       if (ret == -EAGAIN)
+               goto restart;
+       if (ret < 0)
+               goto fault;
 
 done:
        mutex_unlock(&con->mutex);
-
 done_unlocked:
-       clear_bit(BUSY, &con->state);
-       dout("con->state=%lu\n", con->state);
-       if (test_bit(QUEUED, &con->state)) {
-               if (!backoff || test_bit(OPENING, &con->state)) {
-                       dout("con_work %p QUEUED reset, looping\n", con);
-                       goto more;
-               }
-               dout("con_work %p QUEUED reset, but just faulted\n", con);
-               clear_bit(QUEUED, &con->state);
-       }
-       dout("con_work %p done\n", con);
-
-out:
        con->ops->put(con);
+       return;
+
+fault:
+       mutex_unlock(&con->mutex);
+       ceph_fault(con);     /* error/fault path */
+       goto done_unlocked;
 }
 
 
@@ -2055,10 +2172,12 @@ static void ceph_fault(struct ceph_connection *con)
        /* Requeue anything that hasn't been acked */
        list_splice_init(&con->out_sent, &con->out_queue);
 
-       /* If there are no messages in the queue, place the connection
-        * in a STANDBY state (i.e., don't try to reconnect just yet). */
-       if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
-               dout("fault setting STANDBY\n");
+       /* If there are no messages queued or keepalive pending, place
+        * the connection in a STANDBY state */
+       if (list_empty(&con->out_queue) &&
+           !test_bit(KEEPALIVE_PENDING, &con->state)) {
+               dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
+               clear_bit(WRITE_PENDING, &con->state);
                set_bit(STANDBY, &con->state);
        } else {
                /* retry after a delay. */
@@ -2066,11 +2185,24 @@ static void ceph_fault(struct ceph_connection *con)
                        con->delay = BASE_DELAY_INTERVAL;
                else if (con->delay < MAX_DELAY_INTERVAL)
                        con->delay *= 2;
-               dout("fault queueing %p delay %lu\n", con, con->delay);
                con->ops->get(con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
-                                      round_jiffies_relative(con->delay)) == 0)
+                                      round_jiffies_relative(con->delay))) {
+                       dout("fault queued %p delay %lu\n", con, con->delay);
+               } else {
                        con->ops->put(con);
+                       dout("fault failed to queue %p delay %lu, backoff\n",
+                            con, con->delay);
+                       /*
+                        * In many cases we see a socket state change
+                        * while con_work is running and end up
+                        * queuing (non-delayed) work, such that we
+                        * can't backoff with a delay.  Set a flag so
+                        * that when con_work restarts we schedule the
+                        * delay then.
+                        */
+                       set_bit(BACKOFF, &con->state);
+               }
        }
 
 out_unlock:
@@ -2109,15 +2241,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
 
        spin_lock_init(&msgr->global_seq_lock);
 
-       /* the zero page is needed if a request is "canceled" while the message
-        * is being written over the socket */
-       msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
-       if (!msgr->zero_page) {
-               kfree(msgr);
-               return ERR_PTR(-ENOMEM);
-       }
-       kmap(msgr->zero_page);
-
        if (myaddr)
                msgr->inst.addr = *myaddr;
 
@@ -2134,13 +2257,24 @@ EXPORT_SYMBOL(ceph_messenger_create);
 void ceph_messenger_destroy(struct ceph_messenger *msgr)
 {
        dout("destroy %p\n", msgr);
-       kunmap(msgr->zero_page);
-       __free_page(msgr->zero_page);
        kfree(msgr);
        dout("destroyed messenger %p\n", msgr);
 }
 EXPORT_SYMBOL(ceph_messenger_destroy);
 
+static void clear_standby(struct ceph_connection *con)
+{
+       /* come back from STANDBY? */
+       if (test_and_clear_bit(STANDBY, &con->state)) {
+               mutex_lock(&con->mutex);
+               dout("clear_standby %p and ++connect_seq\n", con);
+               con->connect_seq++;
+               WARN_ON(test_bit(WRITE_PENDING, &con->state));
+               WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
+               mutex_unlock(&con->mutex);
+       }
+}
+
 /*
  * Queue up an outgoing message on the given connection.
  */
@@ -2173,6 +2307,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
+       clear_standby(con);
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                queue_con(con);
 }
@@ -2238,6 +2373,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
  */
 void ceph_con_keepalive(struct ceph_connection *con)
 {
+       dout("con_keepalive %p\n", con);
+       clear_standby(con);
        if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
            test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                queue_con(con);
@@ -2249,7 +2386,8 @@ EXPORT_SYMBOL(ceph_con_keepalive);
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+                             bool can_fail)
 {
        struct ceph_msg *m;
 
@@ -2275,8 +2413,22 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
        m->front_max = front_len;
        m->front_is_vmalloc = false;
        m->more_to_follow = false;
+       m->ack_stamp = 0;
        m->pool = NULL;
 
+       /* middle */
+       m->middle = NULL;
+
+       /* data */
+       m->nr_pages = 0;
+       m->page_alignment = 0;
+       m->pages = NULL;
+       m->pagelist = NULL;
+       m->bio = NULL;
+       m->bio_iter = NULL;
+       m->bio_seg = 0;
+       m->trail = NULL;
+
        /* front */
        if (front_len) {
                if (front_len > PAGE_CACHE_SIZE) {
@@ -2287,7 +2439,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
                        m->front.iov_base = kmalloc(front_len, flags);
                }
                if (m->front.iov_base == NULL) {
-                       pr_err("msg_new can't allocate %d bytes\n",
+                       dout("ceph_msg_new can't allocate %d bytes\n",
                             front_len);
                        goto out2;
                }
@@ -2296,25 +2448,20 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
        }
        m->front.iov_len = front_len;
 
-       /* middle */
-       m->middle = NULL;
-
-       /* data */
-       m->nr_pages = 0;
-       m->pages = NULL;
-       m->pagelist = NULL;
-       m->bio = NULL;
-       m->bio_iter = NULL;
-       m->bio_seg = 0;
-       m->trail = NULL;
-
        dout("ceph_msg_new %p front %d\n", m, front_len);
        return m;
 
 out2:
        ceph_msg_put(m);
 out:
-       pr_err("msg_new can't create type %d front %d\n", type, front_len);
+       if (!can_fail) {
+               pr_err("msg_new can't create type %d front %d\n", type,
+                      front_len);
+               WARN_ON(1);
+       } else {
+               dout("msg_new can't create type %d front %d\n", type,
+                    front_len);
+       }
        return NULL;
 }
 EXPORT_SYMBOL(ceph_msg_new);
@@ -2364,12 +2511,13 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
        }
        if (!msg) {
                *skip = 0;
-               msg = ceph_msg_new(type, front_len, GFP_NOFS);
+               msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
                if (!msg) {
                        pr_err("unable to allocate msg type %d len %d\n",
                               type, front_len);
                        return NULL;
                }
+               msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
        memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));