summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-05-20 13:44:57 -0700
committerSamuel Just <sam.just@inktank.com>2013-05-21 16:37:44 -0700
commit489cd5c441d865f48f40acf9d4d9181e0bcc7247 (patch)
treec7c1b5270ddf632ca99b38a5304d19f009627dd1
parent6670e2a73db8f59e2a6bac024afe43cd8a8a6715 (diff)
downloadceph-489cd5c441d865f48f40acf9d4d9181e0bcc7247.tar.gz
FileStore: integrate WBThrottle
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/common/config_opts.h4
-rw-r--r--src/os/FileStore.cc140
-rw-r--r--src/os/FileStore.h23
3 files changed, 9 insertions, 158 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 27593cb9f9d..27e2daceb31 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -508,10 +508,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true)
OPTION(filestore_btrfs_clone_range, OPT_BOOL, true)
OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false)
OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap
-OPTION(filestore_flusher, OPT_BOOL, true)
-OPTION(filestore_flusher_max_fds, OPT_INT, 512)
-OPTION(filestore_flush_min, OPT_INT, 65536)
-OPTION(filestore_sync_flush, OPT_BOOL, false)
OPTION(filestore_journal_parallel, OPT_BOOL, false)
OPTION(filestore_journal_writeahead, OPT_BOOL, false)
OPTION(filestore_journal_trailing, OPT_BOOL, false)
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 24f7f1eff08..dca8a1bbfea 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -353,6 +353,7 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
if (g_conf->filestore_debug_inject_read_err) {
debug_obj_on_delete(o);
}
+ wbthrottle.clear_object(o); // should be only non-cache ref
fdcache.clear(o);
} else {
/* Ensure that replay of this op doesn't result in the object_map
@@ -388,6 +389,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
stop(false), sync_thread(this),
fdcache_lock("fdcache_lock"),
fdcache(g_ceph_context),
+ wbthrottle(g_ceph_context),
default_osr("default"),
op_queue_len(0), op_queue_bytes(0),
op_throttle_lock("FileStore::op_throttle_lock"),
@@ -395,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
- flusher_queue_len(0), flusher_thread(this),
logger(NULL),
read_error_lock("FileStore::read_error_lock"),
m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range),
m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ),
m_filestore_commit_timeout(g_conf->filestore_commit_timeout),
m_filestore_fiemap(g_conf->filestore_fiemap),
- m_filestore_flusher (g_conf->filestore_flusher ),
m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data),
m_filestore_journal_parallel(g_conf->filestore_journal_parallel ),
m_filestore_journal_trailing(g_conf->filestore_journal_trailing),
m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead),
m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold),
- m_filestore_sync_flush(g_conf->filestore_sync_flush),
- m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds),
- m_filestore_flush_min(g_conf->filestore_flush_min),
m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval),
m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval),
m_filestore_fail_eio(g_conf->filestore_fail_eio),
@@ -1068,6 +1065,7 @@ int FileStore::_detect_fs()
#if defined(__linux__)
if (st.f_type == BTRFS_SUPER_MAGIC) {
dout(0) << "mount detected btrfs" << dendl;
+ wbthrottle.set_fs(WBThrottle::BTRFS);
btrfs = true;
btrfs_stable_commits = btrfs && m_filestore_btrfs_snap;
@@ -1779,7 +1777,6 @@ int FileStore::mount()
journal_start();
op_tp.start();
- flusher_thread.create();
op_finisher.start();
ondisk_finisher.start();
@@ -1817,11 +1814,9 @@ int FileStore::umount()
lock.Lock();
stop = true;
sync_cond.Signal();
- flusher_cond.Signal();
lock.Unlock();
sync_thread.join();
op_tp.stop();
- flusher_thread.join();
journal_stop();
@@ -1969,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o)
void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
+ wbthrottle.throttle();
// inject a stall?
if (g_conf->filestore_inject_stall) {
int orig = g_conf->filestore_inject_stall;
@@ -2952,38 +2948,7 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
r = bl.length();
// flush?
- {
- bool should_flush = (ssize_t)len >= m_filestore_flush_min;
- bool local_flush = false;
-#ifdef HAVE_SYNC_FILE_RANGE
- bool async_done = false;
- if (!should_flush ||
- !m_filestore_flusher ||
- !(async_done = queue_flusher(**fd, offset, len, replica))) {
- if (should_flush && m_filestore_sync_flush) {
- ::sync_file_range(**fd, offset, len, SYNC_FILE_RANGE_WRITE);
- local_flush = true;
- }
- }
- // TODOSAM: this will be fixed in a subsequent patch
- //Both lfn_close() and possible posix_fadvise() done by flusher
- //if (async_done) fd = -1;
-#else
- // no sync_file_range; (maybe) flush inline and close.
- if (should_flush && m_filestore_sync_flush) {
- ::fdatasync(**fd);
- local_flush = true;
- }
-#endif
- if (local_flush && replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- }
+ wbthrottle.queue_wb(fd, oid, offset, len, replica);
lfn_close(fd);
out:
@@ -3277,90 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
return r;
}
-
-bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica)
-{
- bool queued;
- lock.Lock();
- if (flusher_queue_len < m_filestore_flusher_max_fds) {
- flusher_queue.push_back(sync_epoch);
- flusher_queue.push_back(fd);
- flusher_queue.push_back(off);
- flusher_queue.push_back(len);
- flusher_queue.push_back(replica);
- flusher_queue_len++;
- flusher_cond.Signal();
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << dendl;
- queued = true;
- } else {
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << " hit flusher_max_fds " << m_filestore_flusher_max_fds
- << ", skipping async flush" << dendl;
- queued = false;
- }
- lock.Unlock();
- return queued;
-}
-
-void FileStore::flusher_entry()
-{
- lock.Lock();
- dout(20) << "flusher_entry start" << dendl;
- while (true) {
- if (!flusher_queue.empty()) {
-#ifdef HAVE_SYNC_FILE_RANGE
- list<uint64_t> q;
- q.swap(flusher_queue);
-
- int num = flusher_queue_len; // see how many we're taking, here
-
- lock.Unlock();
- while (!q.empty()) {
- uint64_t ep = q.front();
- q.pop_front();
- int fd = q.front();
- q.pop_front();
- uint64_t off = q.front();
- q.pop_front();
- uint64_t len = q.front();
- q.pop_front();
- bool replica = q.front();
- q.pop_front();
- if (!stop && ep == sync_epoch) {
- dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl;
- ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE);
- if (replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- } else
- dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep
- << ", sync_epoch=" << sync_epoch << ")" << dendl;
- // TODOSAM: this will be replaced in a subsequent patch
- //lfn_close(fd);
- }
- lock.Lock();
- flusher_queue_len -= num; // they're definitely closed, forget
-#endif
- } else {
- if (stop)
- break;
- dout(20) << "flusher_entry sleeping" << dendl;
- flusher_cond.Wait(lock);
- dout(20) << "flusher_entry awoke" << dendl;
- }
- }
- dout(20) << "flusher_entry finish" << dendl;
- lock.Unlock();
-}
-
class SyncEntryTimeout : public Context {
public:
SyncEntryTimeout(int commit_timeo)
@@ -3553,6 +3434,7 @@ void FileStore::sync_entry()
logger->tinc(l_os_commit_len, dur);
apply_manager.commit_finish();
+ wbthrottle.clear();
logger->set(l_os_committing, 0);
@@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const
"filestore_queue_max_bytes",
"filestore_queue_committing_max_ops",
"filestore_queue_committing_max_bytes",
- "filestore_flusher",
- "filestore_flusher_max_fds",
- "filestore_sync_flush",
"filestore_commit_timeout",
"filestore_dump_file",
"filestore_kill_at",
@@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
changed.count("filestore_queue_max_bytes") ||
changed.count("filestore_queue_committing_max_ops") ||
changed.count("filestore_queue_committing_max_bytes") ||
- changed.count("filestore_flusher") ||
- changed.count("filestore_flusher_max_fds") ||
- changed.count("filestore_flush_min") ||
changed.count("filestore_kill_at") ||
changed.count("filestore_fail_eio") ||
changed.count("filestore_replica_fadvise")) {
@@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
- m_filestore_flusher = conf->filestore_flusher;
- m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds;
- m_filestore_flush_min = conf->filestore_flush_min;
- m_filestore_sync_flush = conf->filestore_sync_flush;
m_filestore_kill_at.set(conf->filestore_kill_at);
m_filestore_fail_eio = conf->filestore_fail_eio;
m_filestore_replica_fadvise = conf->filestore_replica_fadvise;
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index 00249f274c1..78668dd92a4 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -41,6 +41,7 @@ using namespace __gnu_cxx;
#include "ObjectMap.h"
#include "SequencerPosition.h"
#include "FDCache.h"
+#include "WBThrottle.h"
#include "include/uuid.h"
@@ -201,6 +202,8 @@ private:
Mutex fdcache_lock;
FDCache fdcache;
+ WBThrottle wbthrottle;
+
Sequencer default_osr;
deque<OpSequencer*> op_queue;
uint64_t op_queue_len, op_queue_bytes;
@@ -253,24 +256,8 @@ private:
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend class C_JournaledAhead;
- // flusher thread
- Cond flusher_cond;
- list<uint64_t> flusher_queue;
- int flusher_queue_len;
- void flusher_entry();
- struct FlusherThread : public Thread {
- FileStore *fs;
- FlusherThread(FileStore *f) : fs(f) {}
- void *entry() {
- fs->flusher_entry();
- return 0;
- }
- } flusher_thread;
- bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica);
-
int open_journal();
-
PerfCounters *logger;
public:
@@ -514,15 +501,11 @@ private:
bool m_filestore_btrfs_snap;
float m_filestore_commit_timeout;
bool m_filestore_fiemap;
- bool m_filestore_flusher;
bool m_filestore_fsync_flushes_journal_data;
bool m_filestore_journal_parallel;
bool m_filestore_journal_trailing;
bool m_filestore_journal_writeahead;
int m_filestore_fiemap_threshold;
- bool m_filestore_sync_flush;
- int m_filestore_flusher_max_fds;
- int m_filestore_flush_min;
double m_filestore_max_sync_interval;
double m_filestore_min_sync_interval;
bool m_filestore_fail_eio;