From 6dc18e08315eb3071f68cb7aeb9f99b2b80457a2 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 8 Apr 2009 15:00:02 -0700 Subject: messenger: zero pages infrastructure (not working yet) --- src/kernel/messenger.c | 86 ++++++++++++++++++++++++++++++++++++++++------ src/kernel/messenger.h | 3 +- src/msg/SimpleMessenger.cc | 13 +++++++ 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index dc1bcd1a3d0..f39dbe2ef80 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -646,6 +646,12 @@ static void prepare_write_message(struct ceph_connection *con) { struct ceph_msg *m; int v = 0; + int p; + void *zero_page_addr = page_address(con->msgr->zero_page); + int i = 0; + unsigned len; + unsigned dlen; + __le32 zero_map_len; con->out_kvec_bytes = 0; @@ -673,16 +679,56 @@ static void prepare_write_message(struct ceph_connection *con) m->nr_pages); BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); + if (m->nr_pages) { + m->zero_map = kmalloc(m->nr_pages, GFP_KERNEL); + dlen = le32_to_cpu(m->hdr.data_len); + for (p = 0; p < m->nr_pages; + p++) { + struct page *page = NULL; + void *kaddr = NULL; + char is_zero = 1; + len = min((unsigned)PAGE_SIZE, dlen); + + mutex_lock(&m->page_mutex); + if (m->pages) { + page = m->pages[p]; + kaddr = kmap(page); + is_zero = (memcmp(kaddr, zero_page_addr, len) == 0); + kunmap(page); + } + + mutex_unlock(&m->page_mutex); + + dout(0, "page %d is %s\n", i, (is_zero ? "zero" : "not zero")); + + m->zero_map[i++] = is_zero; + dlen -= len; + } + } + zero_map_len = cpu_to_le32(m->nr_pages); /* tag + hdr + front */ - con->out_kvec[v].iov_base = &tag_msg; - con->out_kvec[v++].iov_len = 1; - con->out_kvec[v].iov_base = &m->hdr; - con->out_kvec[v++].iov_len = sizeof(m->hdr); - con->out_kvec[v++] = m->front; - con->out_kvec_left = v; - con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len; + +#define REGISTER_DATA(base, len) \ + con->out_kvec[v].iov_base = base; \ + con->out_kvec[v++].iov_len = len; \ + con->out_kvec_bytes += len; + +#define REGISTER_KVEC(kvec) \ + con->out_kvec[v++] = kvec; \ + con->out_kvec_bytes += kvec.iov_len; + + + REGISTER_DATA(&tag_msg, 1); + REGISTER_DATA(&m->hdr, sizeof(m->hdr)); + REGISTER_KVEC(m->front); + REGISTER_DATA(&zero_map_len, sizeof(zero_map_len)); + if (m->nr_pages) + REGISTER_DATA(m->zero_map, zero_map_len); + con->out_kvec_cur = con->out_kvec; + con->out_kvec_left = v; + /* fill in crc (except data pages), footer */ con->out_msg->hdr.crc = cpu_to_le32(crc32c(0, (void *)&m->hdr, @@ -893,6 +939,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) while (con->out_msg_pos.page < con->out_msg->nr_pages) { struct page *page = NULL; void *kaddr = NULL; + void *zero_page_addr = NULL; /* * if we are calculating the data crc (the default), we need @@ -906,8 +953,9 @@ static int write_partial_msg_pages(struct ceph_connection *con) kaddr = kmap(page); } else { page = con->msgr->zero_page; - if (crc) - kaddr = page_address(con->msgr->zero_page); + if (crc) { + kaddr = zero_page_addr; + } } len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), (int)(data_len - con->out_msg_pos.data_pos)); @@ -918,14 +966,28 @@ static int write_partial_msg_pages(struct ceph_connection *con) con->out_msg->footer.data_crc = cpu_to_le32(crc32c(tmpcrc, base, len)); con->out_msg_pos.did_page_crc = 1; + } +#if 0 + if (len == PAGE_SIZE && msg->pages) { + void *base; + if (!kaddr) + kaddr = kmap(page); + if (!zero_page_addr) + zero_page_addr = page_address(con->msgr->zero_page); + base = kaddr + con->out_msg_pos.page_pos; + if (memcmp(base, zero_page_addr, PAGE_SIZE) == 0) { + dout(0, "zero page\n"); + } + } +#endif ret = kernel_sendpage(con->sock, page, con->out_msg_pos.page_pos, len, MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE); - if (crc && msg->pages) + if (kaddr && kaddr != zero_page_addr) kunmap(page); mutex_unlock(&msg->page_mutex); @@ -1419,6 +1481,7 @@ static int read_partial_message(struct ceph_connection *con) unsigned front_len, data_len, data_off; struct ceph_client *client = con->msgr->parent; int datacrc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC); + __le32 zero_map_len; dout(20, "read_partial_message con %p msg %p\n", con, m); @@ -1465,6 +1528,9 @@ static int read_partial_message(struct ceph_connection *con) con->in_front_crc = crc32c(0, m->front.iov_base, m->front.iov_len); } + ret = ceph_tcp_recvmsg(con->sock, (char *)&zero_map_len, 4); + if (ret <= 0) + return ret; /* (page) data */ data_len = le32_to_cpu(m->hdr.data_len); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 7a7224a6693..4905bc20f27 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -118,6 +118,7 @@ struct ceph_msg { atomic_t nref; bool front_is_vmalloc; bool more_to_follow; + char *zero_map; }; struct ceph_msg_pos { @@ -203,7 +204,7 @@ struct ceph_connection { out_sent) */ struct ceph_msg_pos out_msg_pos; - struct kvec out_kvec[6], /* sending header/footer data */ + struct kvec out_kvec[7], /* sending header/footer data */ *out_kvec_cur; int out_kvec_left; /* kvec's left in out_kvec */ int out_kvec_bytes; /* total bytes left */ diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 995b0bf5d2f..aab1aa4a5c1 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1931,6 +1931,16 @@ Message *Rank::Pipe::read_message() dout(20) << "reader got front " << front.length() << dendl; } + int zero_map_len; + bufferptr zero_map_bp; + if (tcp_read( sd, (char *)&zero_map_len, 4 ) < 0) + return 0; + if (zero_map_len) { + zero_map_bp = buffer::create(zero_map_len); + if (tcp_read( sd, zero_map_bp.c_str(), zero_map_len ) < 0) + return 0; + } + // read data bufferlist data; unsigned data_len = le32_to_cpu(header.data_len); @@ -2074,6 +2084,9 @@ int Rank::Pipe::write_message(Message *m) m->calc_header_crc(); bufferlist blist = m->get_payload(); + + int zero_map_len = 0; + blist.append((char *)&zero_map_len, 4); blist.append(m->get_data()); dout(20) << "write_message " << m << " to " << header.dst << dendl; -- cgit v1.2.1