inetpeer: fix a race in inetpeer_gc_worker()
[linux-2.6.git] / net / ceph / messenger.c
index 204e229..f0993af 100644 (file)
@@ -38,6 +38,11 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 static struct lock_class_key socket_class;
 #endif
 
+/*
+ * When skipping (ignoring) a block of input we read it into a "skip
+ * buffer," which is this many bytes in size.
+ */
+#define SKIP_BUF_SIZE  1024
 
 static void queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
@@ -56,7 +61,6 @@ static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
 static atomic_t addr_str_seq = ATOMIC_INIT(0);
 
 static struct page *zero_page;         /* used in certain error cases */
-static void *zero_page_address;                /* kernel virtual addr of zero_page */
 
 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 {
@@ -106,9 +110,6 @@ void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
-       BUG_ON(zero_page_address == NULL);
-       zero_page_address = NULL;
-
        BUG_ON(zero_page == NULL);
        kunmap(zero_page);
        page_cache_release(zero_page);
@@ -121,9 +122,6 @@ int ceph_msgr_init(void)
        zero_page = ZERO_PAGE(0);
        page_cache_get(zero_page);
 
-       BUG_ON(zero_page_address != NULL);
-       zero_page_address = kmap(zero_page);
-
        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
        if (ceph_msgr_wq)
                return 0;
@@ -316,6 +314,19 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
        return r;
 }
 
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+                    int offset, size_t size, int more)
+{
+       int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
+       int ret;
+
+       ret = kernel_sendpage(sock, page, offset, size, flags);
+       if (ret == -EAGAIN)
+               ret = 0;
+
+       return ret;
+}
+
 
 /*
  * Shutdown/close the socket for the given connection.
@@ -521,6 +532,7 @@ static void prepare_write_message_footer(struct ceph_connection *con)
 static void prepare_write_message(struct ceph_connection *con)
 {
        struct ceph_msg *m;
+       u32 crc;
 
        ceph_con_out_kvec_reset(con);
        con->out_kvec_is_msg = true;
@@ -569,17 +581,17 @@ static void prepare_write_message(struct ceph_connection *con)
                        m->middle->vec.iov_base);
 
        /* fill in crc (except data pages), footer */
-       con->out_msg->hdr.crc =
-               cpu_to_le32(crc32c(0, &m->hdr,
-                                     sizeof(m->hdr) - sizeof(m->hdr.crc)));
+       crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
+       con->out_msg->hdr.crc = cpu_to_le32(crc);
        con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
-       con->out_msg->footer.front_crc =
-               cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
-       if (m->middle)
-               con->out_msg->footer.middle_crc =
-                       cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
-                                          m->middle->vec.iov_len));
-       else
+
+       crc = crc32c(0, m->front.iov_base, m->front.iov_len);
+       con->out_msg->footer.front_crc = cpu_to_le32(crc);
+       if (m->middle) {
+               crc = crc32c(0, m->middle->vec.iov_base,
+                               m->middle->vec.iov_len);
+               con->out_msg->footer.middle_crc = cpu_to_le32(crc);
+       } else
                con->out_msg->footer.middle_crc = 0;
        con->out_msg->footer.data_crc = 0;
        dout("prepare_write_message front_crc %u data_crc %u\n",
@@ -746,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con)
                con->out_kvec_bytes -= ret;
                if (con->out_kvec_bytes == 0)
                        break;            /* done */
-               while (ret > 0) {
-                       if (ret >= con->out_kvec_cur->iov_len) {
-                               ret -= con->out_kvec_cur->iov_len;
-                               con->out_kvec_cur++;
-                               con->out_kvec_left--;
-                       } else {
-                               con->out_kvec_cur->iov_len -= ret;
-                               con->out_kvec_cur->iov_base += ret;
-                               ret = 0;
-                               break;
-                       }
+
+               /* account for full iov entries consumed */
+               while (ret >= con->out_kvec_cur->iov_len) {
+                       BUG_ON(!con->out_kvec_left);
+                       ret -= con->out_kvec_cur->iov_len;
+                       con->out_kvec_cur++;
+                       con->out_kvec_left--;
+               }
+               /* and for a partially-consumed entry */
+               if (ret) {
+                       con->out_kvec_cur->iov_len -= ret;
+                       con->out_kvec_cur->iov_base += ret;
                }
        }
        con->out_kvec_left = 0;
@@ -805,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        struct ceph_msg *msg = con->out_msg;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
-       bool do_crc = con->msgr->nocrc;
+       bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
        int in_trail = 0;
@@ -822,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
        while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
-               void *kaddr = NULL;
                int max_write = PAGE_SIZE;
-               int page_shift = 0;
+               int bio_offset = 0;
 
                total_max_write = data_len - trail_len -
                        con->out_msg_pos.data_pos;
@@ -843,58 +855,47 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       if (do_crc)
-                               kaddr = kmap(page);
                        max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
-                       if (do_crc)
-                               kaddr = kmap(page);
                } else if (msg->pagelist) {
                        page = list_first_entry(&msg->pagelist->head,
                                                struct page, lru);
-                       if (do_crc)
-                               kaddr = kmap(page);
 #ifdef CONFIG_BLOCK
                } else if (msg->bio) {
                        struct bio_vec *bv;
 
                        bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
                        page = bv->bv_page;
-                       page_shift = bv->bv_offset;
-                       if (do_crc)
-                               kaddr = kmap(page) + page_shift;
+                       bio_offset = bv->bv_offset;
                        max_write = bv->bv_len;
 #endif
                } else {
                        page = zero_page;
-                       if (do_crc)
-                               kaddr = zero_page_address;
                }
                len = min_t(int, max_write - con->out_msg_pos.page_pos,
                            total_max_write);
 
-               if (do_crc && !con->out_msg_pos.did_page_crc) {
-                       void *base = kaddr + con->out_msg_pos.page_pos;
+               if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+                       void *base;
+                       u32 crc;
                        u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       char *kaddr;
 
+                       kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
-                       con->out_msg->footer.data_crc =
-                               cpu_to_le32(crc32c(tmpcrc, base, len));
+                       base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+                       crc = crc32c(tmpcrc, base, len);
+                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
                        con->out_msg_pos.did_page_crc = true;
                }
-               ret = kernel_sendpage(con->sock, page,
-                                     con->out_msg_pos.page_pos + page_shift,
-                                     len,
-                                     MSG_DONTWAIT | MSG_NOSIGNAL |
-                                     MSG_MORE);
-
-               if (do_crc &&
-                   (msg->pages || msg->pagelist || msg->bio || in_trail))
+               ret = ceph_tcp_sendpage(con->sock, page,
+                                     con->out_msg_pos.page_pos + bio_offset,
+                                     len, 1);
+
+               if (do_datacrc)
                        kunmap(page);
 
-               if (ret == -EAGAIN)
-                       ret = 0;
                if (ret <= 0)
                        goto out;
 
@@ -920,7 +921,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
-       if (!do_crc)
+       if (!do_datacrc)
                con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        ceph_con_out_kvec_reset(con);
        prepare_write_message_footer(con);
@@ -937,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con)
        int ret;
 
        while (con->out_skip > 0) {
-               struct kvec iov = {
-                       .iov_base = zero_page_address,
-                       .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
-               };
+               size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
-               ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
+               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
                if (ret <= 0)
                        goto out;
                con->out_skip -= ret;
@@ -1542,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con,
                if (ret <= 0)
                        return ret;
                section->iov_len += ret;
-               if (section->iov_len == sec_len)
-                       *crc = crc32c(0, section->iov_base,
-                                     section->iov_len);
        }
+       if (section->iov_len == sec_len)
+               *crc = crc32c(0, section->iov_base, section->iov_len);
 
        return 1;
 }
@@ -1633,9 +1630,10 @@ static int read_partial_message(struct ceph_connection *con)
        int ret;
        int to, left;
        unsigned front_len, middle_len, data_len;
-       bool do_datacrc = con->msgr->nocrc;
+       bool do_datacrc = !con->msgr->nocrc;
        int skip;
        u64 seq;
+       u32 crc;
 
        dout("read_partial_message con %p msg %p\n", con, m);
 
@@ -1648,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con)
                if (ret <= 0)
                        return ret;
                con->in_base_pos += ret;
-               if (con->in_base_pos == sizeof(con->in_hdr)) {
-                       u32 crc = crc32c(0, &con->in_hdr,
-                                sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
-                       if (crc != le32_to_cpu(con->in_hdr.crc)) {
-                               pr_err("read_partial_message bad hdr "
-                                      " crc %u != expected %u\n",
-                                      crc, con->in_hdr.crc);
-                               return -EBADMSG;
-                       }
-               }
        }
+
+       crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
+       if (cpu_to_le32(crc) != con->in_hdr.crc) {
+               pr_err("read_partial_message bad hdr "
+                      " crc %u != expected %u\n",
+                      crc, con->in_hdr.crc);
+               return -EBADMSG;
+       }
+
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
@@ -1980,8 +1977,9 @@ more:
                 *
                 * FIXME: there must be a better way to do this!
                 */
-               static char buf[1024];
-               int skip = min(1024, -con->in_base_pos);
+               static char buf[SKIP_BUF_SIZE];
+               int skip = min((int) sizeof (buf), -con->in_base_pos);
+
                dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
                ret = ceph_tcp_recvmsg(con->sock, buf, skip);
                if (ret <= 0)