diff options
author | Sage Weil <sage@inktank.com> | 2013-01-02 18:13:25 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-01-02 18:13:25 -0800 |
commit | 6b5a89d2373549497878d841d9b5aab3c79af099 (patch) | |
tree | 5b55e26adbef85dd5eafafc75c917ae32dd8dcc4 | |
parent | 29ff87a5735cbae4bf7802ebe904fd7f5cb55bfb (diff) | |
parent | 43cba617aa0247d714632bddf31b9271ef3a1b50 (diff) | |
download | ceph-6b5a89d2373549497878d841d9b5aab3c79af099.tar.gz |
Merge remote-tracking branch 'gh/next'
-rw-r--r-- | src/ceph_fuse.cc | 5 | ||||
-rw-r--r-- | src/log/Log.cc | 2 | ||||
-rw-r--r-- | src/os/FileJournal.cc | 75 | ||||
-rw-r--r-- | src/test/test_filejournal.cc | 47 |
4 files changed, 93 insertions, 36 deletions
diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index de513bcf0a8..f7a8d1a2c73 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -155,14 +155,15 @@ int main(int argc, const char **argv, const char *envp[]) { } r = cfuse.init(newargc, newargv); - if (r < 0) { + if (r != 0) { cerr << "ceph-fuse[" << getpid() << "]: fuse failed to initialize" << std::endl; - goto out_shutdown; + goto out_client_unmount; } cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl; r = cfuse.loop(); cerr << "ceph-fuse[" << getpid() << "]: fuse finished with error " << r << std::endl; + out_client_unmount: client->unmount(); //cout << "unmounted" << std::endl; diff --git a/src/log/Log.cc b/src/log/Log.cc index 2912463f6b6..e06afbfe1e2 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -252,7 +252,7 @@ void Log::_log_message(const char *s, bool crash) void Log::dump_recent() { - pthread_mutex_unlock(&m_flush_mutex); + pthread_mutex_lock(&m_flush_mutex); pthread_mutex_lock(&m_queue_mutex); EntryQueue t; diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 5f31406db40..20324f37a3d 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -1147,8 +1147,10 @@ void FileJournal::write_thread_entry() } assert(r == 0); - logger->inc(l_os_j_wr); - logger->inc(l_os_j_wr_bytes, bl.length()); + if (logger) { + logger->inc(l_os_j_wr); + logger->inc(l_os_j_wr_bytes, bl.length()); + } #ifdef HAVE_LIBAIO if (aio) @@ -1249,40 +1251,51 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; - aio_queue.push_back(aio_info(bl, pos, seq)); - aio_info& aio = aio_queue.back(); + while (bl.length() > 0) { + int max = MIN(bl.buffers().size(), IOV_MAX-1); + iovec *iov = new iovec[max]; + int n = 0; + unsigned len = 0; + for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin(); + n < max; + ++p, ++n) { + assert(p != bl.buffers().end()); + iov[n].iov_base = (void *)p->c_str(); + iov[n].iov_len = p->length(); + len += p->length(); + } - aio.iov = new iovec[aio.bl.buffers().size()]; - int n = 0; - for (std::list<buffer::ptr>::const_iterator p = aio.bl.buffers().begin(); - p != aio.bl.buffers().end(); - ++p, ++n) { - aio.iov[n].iov_base = (void *)p->c_str(); - aio.iov[n].iov_len = p->length(); - } - io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); + bufferlist tbl; + bl.splice(0, len, &tbl); // move bytes from bl -> tbl - dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len - << " in " << n << dendl; + aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); + aio_info& aio = aio_queue.back(); + aio.iov = iov; - aio_num++; - aio_bytes += aio.len; + io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); - iocb *piocb = &aio.iocb; - int attempts = 10; - do { - int r = io_submit(aio_ctx, 1, &piocb); - if (r < 0) { - derr << "io_submit to " << aio.off << "~" << aio.len - << " got " << cpp_strerror(r) << dendl; - if (r == -EAGAIN && attempts-- > 0) { - usleep(500); - continue; + dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len + << " in " << n << dendl; + + aio_num++; + aio_bytes += aio.len; + + iocb *piocb = &aio.iocb; + int attempts = 10; + do { + int r = io_submit(aio_ctx, 1, &piocb); + if (r < 0) { + derr << "io_submit to " << aio.off << "~" << aio.len + << " got " << cpp_strerror(r) << dendl; + if (r == -EAGAIN && attempts-- > 0) { + usleep(500); + continue; + } + assert(0 == "io_submit got unexpected error"); } - assert(0 == "io_submit got unexpected error"); - } - } while (false); - pos += aio.len; + } while (false); + pos += aio.len; + } write_finish_cond.Signal(); return 0; } diff --git a/src/test/test_filejournal.cc b/src/test/test_filejournal.cc index c3af26bc3e4..5b7576dea39 100644 --- a/src/test/test_filejournal.cc +++ b/src/test/test_filejournal.cc @@ -1,5 +1,6 @@ #include <gtest/gtest.h> #include <stdlib.h> +#include <limits.h> #include "common/ceph_argparse.h" #include "common/common_init.h" @@ -69,8 +70,13 @@ int main(int argc, char **argv) { finisher = new Finisher(g_ceph_context); - srand(getpid()+time(0)); - snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand()); + if (args.size()) { + strcpy(path, args[0]); + } else { + srand(getpid()+time(0)); + snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand()); + } + cout << "path " << path << std::endl; ::testing::InitGoogleTest(&argc, argv); @@ -160,6 +166,43 @@ TEST(TestFileJournal, WriteMany) { j.close(); } +TEST(TestFileJournal, WriteManyVecs) { + fsid.generate_random(); + FileJournal j(fsid, finisher, &sync_cond, path, directio, aio); + ASSERT_EQ(0, j.create()); + j.make_writeable(); + + C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&lock, &cond, &done)); + + bufferlist first; + first.append("small"); + j.submit_entry(1, first, 0, gb.new_sub()); + + bufferlist bl; + for (int i=0; i<IOV_MAX * 2; i++) { + bufferptr bp = buffer::create_page_aligned(4096); + memset(bp.c_str(), (char)i, 4096); + bl.append(bp); + } + bufferlist origbl = bl; + j.submit_entry(2, bl, 0, gb.new_sub()); + gb.activate(); + wait(); + + j.close(); + + j.open(1); + bufferlist inbl; + string v; + uint64_t seq = 0; + ASSERT_EQ(true, j.read_entry(inbl, seq)); + ASSERT_EQ(seq, 2ull); + ASSERT_TRUE(inbl.contents_equal(origbl)); + j.make_writeable(); + j.close(); + +} + TEST(TestFileJournal, ReplaySmall) { fsid.generate_random(); FileJournal j(fsid, finisher, &sync_cond, path, directio, aio); |