summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-01-02 18:13:25 -0800
committerSage Weil <sage@inktank.com>2013-01-02 18:13:25 -0800
commit6b5a89d2373549497878d841d9b5aab3c79af099 (patch)
tree5b55e26adbef85dd5eafafc75c917ae32dd8dcc4
parent29ff87a5735cbae4bf7802ebe904fd7f5cb55bfb (diff)
parent43cba617aa0247d714632bddf31b9271ef3a1b50 (diff)
downloadceph-6b5a89d2373549497878d841d9b5aab3c79af099.tar.gz
Merge remote-tracking branch 'gh/next'
-rw-r--r--src/ceph_fuse.cc5
-rw-r--r--src/log/Log.cc2
-rw-r--r--src/os/FileJournal.cc75
-rw-r--r--src/test/test_filejournal.cc47
4 files changed, 93 insertions, 36 deletions
diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc
index de513bcf0a8..f7a8d1a2c73 100644
--- a/src/ceph_fuse.cc
+++ b/src/ceph_fuse.cc
@@ -155,14 +155,15 @@ int main(int argc, const char **argv, const char *envp[]) {
}
r = cfuse.init(newargc, newargv);
- if (r < 0) {
+ if (r != 0) {
cerr << "ceph-fuse[" << getpid() << "]: fuse failed to initialize" << std::endl;
- goto out_shutdown;
+ goto out_client_unmount;
}
cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl;
r = cfuse.loop();
cerr << "ceph-fuse[" << getpid() << "]: fuse finished with error " << r << std::endl;
+ out_client_unmount:
client->unmount();
//cout << "unmounted" << std::endl;
diff --git a/src/log/Log.cc b/src/log/Log.cc
index 2912463f6b6..e06afbfe1e2 100644
--- a/src/log/Log.cc
+++ b/src/log/Log.cc
@@ -252,7 +252,7 @@ void Log::_log_message(const char *s, bool crash)
void Log::dump_recent()
{
- pthread_mutex_unlock(&m_flush_mutex);
+ pthread_mutex_lock(&m_flush_mutex);
pthread_mutex_lock(&m_queue_mutex);
EntryQueue t;
diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc
index 5f31406db40..20324f37a3d 100644
--- a/src/os/FileJournal.cc
+++ b/src/os/FileJournal.cc
@@ -1147,8 +1147,10 @@ void FileJournal::write_thread_entry()
}
assert(r == 0);
- logger->inc(l_os_j_wr);
- logger->inc(l_os_j_wr_bytes, bl.length());
+ if (logger) {
+ logger->inc(l_os_j_wr);
+ logger->inc(l_os_j_wr_bytes, bl.length());
+ }
#ifdef HAVE_LIBAIO
if (aio)
@@ -1249,40 +1251,51 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
- aio_queue.push_back(aio_info(bl, pos, seq));
- aio_info& aio = aio_queue.back();
+ while (bl.length() > 0) {
+ int max = MIN(bl.buffers().size(), IOV_MAX-1);
+ iovec *iov = new iovec[max];
+ int n = 0;
+ unsigned len = 0;
+ for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
+ n < max;
+ ++p, ++n) {
+ assert(p != bl.buffers().end());
+ iov[n].iov_base = (void *)p->c_str();
+ iov[n].iov_len = p->length();
+ len += p->length();
+ }
- aio.iov = new iovec[aio.bl.buffers().size()];
- int n = 0;
- for (std::list<buffer::ptr>::const_iterator p = aio.bl.buffers().begin();
- p != aio.bl.buffers().end();
- ++p, ++n) {
- aio.iov[n].iov_base = (void *)p->c_str();
- aio.iov[n].iov_len = p->length();
- }
- io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
+ bufferlist tbl;
+ bl.splice(0, len, &tbl); // move bytes from bl -> tbl
- dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
- << " in " << n << dendl;
+ aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
+ aio_info& aio = aio_queue.back();
+ aio.iov = iov;
- aio_num++;
- aio_bytes += aio.len;
+ io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
- iocb *piocb = &aio.iocb;
- int attempts = 10;
- do {
- int r = io_submit(aio_ctx, 1, &piocb);
- if (r < 0) {
- derr << "io_submit to " << aio.off << "~" << aio.len
- << " got " << cpp_strerror(r) << dendl;
- if (r == -EAGAIN && attempts-- > 0) {
- usleep(500);
- continue;
+ dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
+ << " in " << n << dendl;
+
+ aio_num++;
+ aio_bytes += aio.len;
+
+ iocb *piocb = &aio.iocb;
+ int attempts = 10;
+ do {
+ int r = io_submit(aio_ctx, 1, &piocb);
+ if (r < 0) {
+ derr << "io_submit to " << aio.off << "~" << aio.len
+ << " got " << cpp_strerror(r) << dendl;
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(500);
+ continue;
+ }
+ assert(0 == "io_submit got unexpected error");
}
- assert(0 == "io_submit got unexpected error");
- }
- } while (false);
- pos += aio.len;
+ } while (false);
+ pos += aio.len;
+ }
write_finish_cond.Signal();
return 0;
}
diff --git a/src/test/test_filejournal.cc b/src/test/test_filejournal.cc
index c3af26bc3e4..5b7576dea39 100644
--- a/src/test/test_filejournal.cc
+++ b/src/test/test_filejournal.cc
@@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include <stdlib.h>
+#include <limits.h>
#include "common/ceph_argparse.h"
#include "common/common_init.h"
@@ -69,8 +70,13 @@ int main(int argc, char **argv) {
finisher = new Finisher(g_ceph_context);
- srand(getpid()+time(0));
- snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand());
+ if (args.size()) {
+ strcpy(path, args[0]);
+ } else {
+ srand(getpid()+time(0));
+ snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand());
+ }
+ cout << "path " << path << std::endl;
::testing::InitGoogleTest(&argc, argv);
@@ -160,6 +166,43 @@ TEST(TestFileJournal, WriteMany) {
j.close();
}
+TEST(TestFileJournal, WriteManyVecs) {
+ fsid.generate_random();
+ FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
+ ASSERT_EQ(0, j.create());
+ j.make_writeable();
+
+ C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&lock, &cond, &done));
+
+ bufferlist first;
+ first.append("small");
+ j.submit_entry(1, first, 0, gb.new_sub());
+
+ bufferlist bl;
+ for (int i=0; i<IOV_MAX * 2; i++) {
+ bufferptr bp = buffer::create_page_aligned(4096);
+ memset(bp.c_str(), (char)i, 4096);
+ bl.append(bp);
+ }
+ bufferlist origbl = bl;
+ j.submit_entry(2, bl, 0, gb.new_sub());
+ gb.activate();
+ wait();
+
+ j.close();
+
+ j.open(1);
+ bufferlist inbl;
+ string v;
+ uint64_t seq = 0;
+ ASSERT_EQ(true, j.read_entry(inbl, seq));
+ ASSERT_EQ(seq, 2ull);
+ ASSERT_TRUE(inbl.contents_equal(origbl));
+ j.make_writeable();
+ j.close();
+
+}
+
TEST(TestFileJournal, ReplaySmall) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);