diff options
-rw-r--r-- | doc/dev/osd_internals/osd_throttles.rst | 21 | ||||
-rw-r--r-- | doc/dev/osd_internals/wbthrottle.rst | 28 | ||||
-rw-r--r-- | src/Makefile.am | 5 | ||||
-rw-r--r-- | src/common/config_opts.h | 19 | ||||
-rw-r--r-- | src/common/shared_cache.hpp | 13 | ||||
-rw-r--r-- | src/os/FDCache.h | 95 | ||||
-rw-r--r-- | src/os/FileStore.cc | 364 | ||||
-rw-r--r-- | src/os/FileStore.h | 41 | ||||
-rw-r--r-- | src/os/WBThrottle.cc | 239 | ||||
-rw-r--r-- | src/os/WBThrottle.h | 171 | ||||
-rw-r--r-- | src/os/hobject.h | 3 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 1 |
12 files changed, 720 insertions, 280 deletions
diff --git a/doc/dev/osd_internals/osd_throttles.rst b/doc/dev/osd_internals/osd_throttles.rst new file mode 100644 index 00000000000..4fa3044f986 --- /dev/null +++ b/doc/dev/osd_internals/osd_throttles.rst @@ -0,0 +1,21 @@ + Messenger throttle (number and size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + FileStore op_queue throttle (number and size) + |--------------------------------------------------------| + WBThrottle + |---------------------------------------------------------------------------------------------------------| + Journal (size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------| + |----------------------------------------------------------------------------------------------------> flushed ----------------> synced + | +Op: Read Header --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> op_applied -------------------------------------------------------------> Complete + | | +SubOp: --Messenger--> ReadHeader --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> sub_op_applied - + | + |-----------------------------> flushed ----------------> synced + |------------------------------------------------------------------------------------------| + Journal (size) + |---------------------------------| + WBThrottle + |-----------------------------------------------------| + FileStore op_queue throttle (number and size) diff --git a/doc/dev/osd_internals/wbthrottle.rst b/doc/dev/osd_internals/wbthrottle.rst new file mode 100644 index 00000000000..14ba0140d4d --- /dev/null +++ b/doc/dev/osd_internals/wbthrottle.rst @@ -0,0 +1,28 @@ +================== +Writeback Throttle +================== + +Previously, the filestore had a problem when handling large numbers of +small ios. We throttle dirty data implicitely via the journal, but +a large number of inodes can be dirtied without filling the journal +resulting in a very long sync time when the sync finally does happen. +The flusher was not an adequate solution to this problem since it +forced writeback of small writes too eagerly killing performance. + +WBThrottle tracks unflushed io per hobject_t and ::fsyncs in lru +order once the start_flusher threshhold is exceeded for any of +dirty bytes, dirty ios, or dirty inodes. While any of these exceed +the hard_limit, we block on throttle() in _do_op. + +See src/os/WBThrottle.h, src/osd/WBThrottle.cc + +To track the open FDs through the writeback process, there is now an +fdcache to cache open fds. lfn_open now returns a cached FDRef which +implicitely closes the fd once all references have expired. + +Filestore syncs have a sideeffect of flushing all outstanding objects +in the wbthrottle. + +lfn_unlink clears the cached FDRef and wbthrottle entries for the +unlinked object when then last link is removed and asserts that all +outstanding FDRefs for that object are dead. diff --git a/src/Makefile.am b/src/Makefile.am index 451548eb42f..5e176874b11 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1490,7 +1490,8 @@ libos_a_SOURCES = \ os/IndexManager.cc \ os/FlatIndex.cc \ os/DBObjectMap.cc \ - os/LevelDBStore.cc + os/LevelDBStore.cc \ + os/WBThrottle.cc libos_a_CXXFLAGS= ${AM_CXXFLAGS} noinst_LIBRARIES += libos.a @@ -1975,6 +1976,8 @@ noinst_HEADERS = \ os/FileStore.h\ os/FlatIndex.h\ os/HashIndex.h\ + os/FDCache.h\ + os/WBThrottle.h\ os/IndexManager.h\ os/Journal.h\ os/JournalingObjectStore.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index dd2b1dcba1f..285f4d52335 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -478,6 +478,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) OPTION(filestore, OPT_BOOL, false) +/// filestore wb throttle limits +OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100) +OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100) + // Tests index failure paths OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0) @@ -498,10 +512,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true) OPTION(filestore_btrfs_clone_range, OPT_BOOL, true) OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false) OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap -OPTION(filestore_flusher, OPT_BOOL, true) -OPTION(filestore_flusher_max_fds, OPT_INT, 512) -OPTION(filestore_flush_min, OPT_INT, 65536) -OPTION(filestore_sync_flush, OPT_BOOL, false) OPTION(filestore_journal_parallel, OPT_BOOL, false) OPTION(filestore_journal_writeahead, OPT_BOOL, false) OPTION(filestore_journal_trailing, OPT_BOOL, false) @@ -518,6 +528,7 @@ OPTION(filestore_merge_threshold, OPT_INT, 10) OPTION(filestore_split_multiple, OPT_INT, 2) OPTION(filestore_update_to, OPT_INT, 1000) OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor +OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp index 69a4c06dfbf..178d1001be3 100644 --- a/src/common/shared_cache.hpp +++ b/src/common/shared_cache.hpp @@ -85,12 +85,23 @@ public: assert(weak_refs.empty()); } + void clear(K key) { + VPtr val; // release any ref we have after we drop the lock + { + Mutex::Locker l(lock); + if (weak_refs.count(key)) { + val = weak_refs[key].lock(); + } + lru_remove(key); + } + } + void set_size(size_t new_size) { list<VPtr> to_release; { Mutex::Locker l(lock); max_size = new_size; - trim_cache(to_release); + trim_cache(&to_release); } } diff --git a/src/os/FDCache.h b/src/os/FDCache.h new file mode 100644 index 00000000000..cf07f860aa5 --- /dev/null +++ b/src/os/FDCache.h @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_FDCACHE_H +#define CEPH_FDCACHE_H + +#include <memory> +#include <errno.h> +#include <cstdio> +#include "hobject.h" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/shared_cache.hpp" +#include "include/compat.h" + +/** + * FD Cache + */ +class FDCache : public md_config_obs_t { + /** + * FD + * + * Wrapper for an fd. Destructor closes the fd. + */ + class FD { + public: + const int fd; + FD(int _fd) : fd(_fd) { + assert(_fd >= 0); + } + int operator*() const { + return fd; + } + ~FD() { + TEMP_FAILURE_RETRY(::close(fd)); + } + }; + + SharedLRU<hobject_t, FD> registry; + CephContext *cct; +public: + FDCache(CephContext *cct) : cct(cct) { + assert(cct); + cct->_conf->add_observer(this); + registry.set_size(cct->_conf->filestore_fd_cache_size); + } + ~FDCache() { + cct->_conf->remove_observer(this); + } + typedef std::tr1::shared_ptr<FD> FDRef; + + FDRef lookup(const hobject_t &hoid) { + return registry.lookup(hoid); + } + + FDRef add(const hobject_t &hoid, int fd) { + return registry.add(hoid, new FD(fd)); + } + + /// clear cached fd for hoid, subsequent lookups will get an empty FD + void clear(const hobject_t &hoid) { + registry.clear(hoid); + assert(!registry.lookup(hoid)); + } + + /// md_config_obs_t + const char** get_tracked_conf_keys() const { + static const char* KEYS[] = { + "filestore_fd_cache_size", + NULL + }; + return KEYS; + } + void handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed) { + if (changed.count("filestore_fd_cache_size")) { + registry.set_size(conf->filestore_fd_cache_size); + } + } + +}; +typedef FDCache::FDRef FDRef; + +#endif diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index b32f2875f71..dca8a1bbfea 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -196,10 +196,22 @@ int FileStore::lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf) return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, +int FileStore::lfn_open(coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, IndexedPath *path, Index *index) { + assert(outfd); + int flags = O_RDWR; + if (create) + flags |= O_CREAT; + Mutex::Locker l(fdcache_lock); + *outfd = fdcache.lookup(oid); + if (*outfd) { + return 0; + } Index index2; IndexedPath path2; if (!path) @@ -224,16 +236,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } - r = ::open((*path)->path(), flags, mode); + r = ::open((*path)->path(), flags, 0644); if (r < 0) { r = -errno; dout(10) << "error opening file " << (*path)->path() << " with flags=" - << flags << " and mode=" << mode << ": " << cpp_strerror(-r) << dendl; + << flags << ": " << cpp_strerror(-r) << dendl; goto fail; } fd = r; - if ((flags & O_CREAT) && (!exist)) { + if (create && (!exist)) { r = (*index)->created(oid, (*path)->path()); if (r < 0) { TEMP_FAILURE_RETRY(::close(fd)); @@ -242,31 +254,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } } - return fd; + *outfd = fdcache.add(oid, fd); + return 0; fail: assert(!m_filestore_fail_eio || r != -EIO); return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, IndexedPath *path) +void FileStore::lfn_close(FDRef fd) { - return lfn_open(cid, oid, flags, mode, path, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode) -{ - return lfn_open(cid, oid, flags, mode, 0, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags) -{ - return lfn_open(cid, oid, flags, 0); -} - -void FileStore::lfn_close(int fd) -{ - TEMP_FAILURE_RETRY(::close(fd)); } int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) @@ -324,6 +321,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos) { + Mutex::Locker l(fdcache_lock); Index index; int r = get_index(cid, &index); if (r < 0) @@ -355,6 +353,8 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, if (g_conf->filestore_debug_inject_read_err) { debug_obj_on_delete(o); } + wbthrottle.clear_object(o); // should be only non-cache ref + fdcache.clear(o); } else { /* Ensure that replay of this op doesn't result in the object_map * going away. @@ -387,6 +387,9 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha sync_entry_timeo_lock("sync_entry_timeo_lock"), timer(g_ceph_context, sync_entry_timeo_lock), stop(false), sync_thread(this), + fdcache_lock("fdcache_lock"), + fdcache(g_ceph_context), + wbthrottle(g_ceph_context), default_osr("default"), op_queue_len(0), op_queue_bytes(0), op_throttle_lock("FileStore::op_throttle_lock"), @@ -394,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"), op_wq(this, g_conf->filestore_op_thread_timeout, g_conf->filestore_op_thread_suicide_timeout, &op_tp), - flusher_queue_len(0), flusher_thread(this), logger(NULL), read_error_lock("FileStore::read_error_lock"), m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range), m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ), m_filestore_commit_timeout(g_conf->filestore_commit_timeout), m_filestore_fiemap(g_conf->filestore_fiemap), - m_filestore_flusher (g_conf->filestore_flusher ), m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data), m_filestore_journal_parallel(g_conf->filestore_journal_parallel ), m_filestore_journal_trailing(g_conf->filestore_journal_trailing), m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead), m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold), - m_filestore_sync_flush(g_conf->filestore_sync_flush), - m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds), - m_filestore_flush_min(g_conf->filestore_flush_min), m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval), m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval), m_filestore_fail_eio(g_conf->filestore_fail_eio), @@ -1067,6 +1065,7 @@ int FileStore::_detect_fs() #if defined(__linux__) if (st.f_type == BTRFS_SUPER_MAGIC) { dout(0) << "mount detected btrfs" << dendl; + wbthrottle.set_fs(WBThrottle::BTRFS); btrfs = true; btrfs_stable_commits = btrfs && m_filestore_btrfs_snap; @@ -1778,7 +1777,6 @@ int FileStore::mount() journal_start(); op_tp.start(); - flusher_thread.create(); op_finisher.start(); ondisk_finisher.start(); @@ -1816,11 +1814,9 @@ int FileStore::umount() lock.Lock(); stop = true; sync_cond.Signal(); - flusher_cond.Signal(); lock.Unlock(); sync_thread.join(); op_tp.stop(); - flusher_thread.join(); journal_stop(); @@ -1968,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o) void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) { + wbthrottle.throttle(); // inject a stall? if (g_conf->filestore_inject_stall) { int orig = g_conf->filestore_inject_stall; @@ -2263,12 +2260,13 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos if (!replaying || btrfs_stable_commits) return 1; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl; return 1; // if file does not exist, there is no guard, and we can replay. } - int ret = _check_replay_guard(fd, spos); + int ret = _check_replay_guard(**fd, spos); lfn_close(fd); return ret; } @@ -2762,22 +2760,24 @@ int FileStore::read( dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " << cpp_strerror(fd) << dendl; - return fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { + dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " + << cpp_strerror(r) << dendl; + return r; } if (len == 0) { struct stat st; memset(&st, 0, sizeof(struct stat)); - int r = ::fstat(fd, &st); + int r = ::fstat(**fd, &st); assert(r == 0); len = st.st_size; } bufferptr bptr(len); // prealloc space for entire read - got = safe_pread(fd, bptr.c_str(), len, offset); + got = safe_pread(**fd, bptr.c_str(), len, offset); if (got < 0) { dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl; lfn_close(fd); @@ -2815,15 +2815,14 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int r; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - r = fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl; } else { uint64_t i; - r = do_fiemap(fd, offset, len, &fiemap); + r = do_fiemap(**fd, offset, len, &fiemap); if (r < 0) goto done; @@ -2865,10 +2864,10 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, } done: - if (fd >= 0) + if (r >= 0) { lfn_close(fd); - if (r >= 0) ::encode(exomap, bl); + } dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << exomap.size() << " " << exomap << dendl; free(fiemap); @@ -2899,14 +2898,13 @@ int FileStore::_touch(coll_t cid, const hobject_t& oid) { dout(15) << "touch " << cid << "/" << oid << dendl; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - int r; - if (fd >= 0) { + FDRef fd; + int r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + return r; + } else { lfn_close(fd); - r = 0; - } else - r = fd; + } dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl; return r; } @@ -2920,17 +2918,17 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, int64_t actual; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - if (fd < 0) { - r = fd; - dout(0) << "write couldn't open " << cid << "/" << oid << " flags " << flags << ": " + FDRef fd; + r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + dout(0) << "write couldn't open " << cid << "/" + << oid << ": " << cpp_strerror(r) << dendl; goto out; } // seek - actual = ::lseek64(fd, offset, SEEK_SET); + actual = ::lseek64(**fd, offset, SEEK_SET); if (actual < 0) { r = -errno; dout(0) << "write lseek64 to " << offset << " failed: " << cpp_strerror(r) << dendl; @@ -2945,43 +2943,13 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, } // write - r = bl.write_fd(fd); + r = bl.write_fd(**fd); if (r == 0) r = bl.length(); // flush? - { - bool should_flush = (ssize_t)len >= m_filestore_flush_min; - bool local_flush = false; -#ifdef HAVE_SYNC_FILE_RANGE - bool async_done = false; - if (!should_flush || - !m_filestore_flusher || - !(async_done = queue_flusher(fd, offset, len, replica))) { - if (should_flush && m_filestore_sync_flush) { - ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE); - local_flush = true; - } - } - //Both lfn_close() and possible posix_fadvise() done by flusher - if (async_done) fd = -1; -#else - // no sync_file_range; (maybe) flush inline and close. - if (should_flush && m_filestore_sync_flush) { - ::fdatasync(fd); - local_flush = true; - } -#endif - if (local_flush && replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } - if (fd >= 0) lfn_close(fd); + wbthrottle.queue_wb(fd, oid, offset, len, replica); + lfn_close(fd); out: dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl; @@ -2996,14 +2964,14 @@ int FileStore::_zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t l #ifdef CEPH_HAVE_FALLOCATE # if !defined(DARWIN) && !defined(__FreeBSD__) // first try to punch a hole. - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - ret = -errno; + FDRef fd; + ret = lfn_open(cid, oid, false, &fd); + if (ret < 0) { goto out; } // first try fallocate - ret = fallocate(fd, FALLOC_FL_PUNCH_HOLE, offset, len); + ret = fallocate(**fd, FALLOC_FL_PUNCH_HOLE, offset, len); if (ret < 0) ret = -errno; lfn_close(fd); @@ -3039,23 +3007,26 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo if (_check_replay_guard(cid, newoid, spos) < 0) return 0; - int o, n, r; + int r; + FDRef o, n; { Index index; IndexedPath from, to; - o = lfn_open(cid, oldoid, O_RDONLY, 0, &from, &index); - if (o < 0) { - r = o; + r = lfn_open(cid, oldoid, false, &o, &from, &index); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_TRUNC|O_WRONLY, 0644, &to, &index); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n, &to, &index); + if (r < 0) { + goto out; + } + r = ::ftruncate(**n, 0); + if (r < 0) { goto out; } struct stat st; - ::fstat(o, &st); - r = _do_clone_range(o, n, 0, st.st_size, 0); + ::fstat(**o, &st); + r = _do_clone_range(**o, **n, 0, st.st_size, 0); if (r < 0) { r = -errno; goto out3; @@ -3068,17 +3039,17 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo { map<string, bufferptr> aset; - r = _fgetattrs(o, aset, false); + r = _fgetattrs(**o, aset, false); if (r < 0) goto out3; - r = _fsetattrs(n, aset); + r = _fsetattrs(**n, aset); if (r < 0) goto out3; } // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); out3: lfn_close(n); @@ -3248,21 +3219,19 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return 0; int r; - int o, n; - o = lfn_open(cid, oldoid, O_RDONLY); - if (o < 0) { - r = o; + FDRef o, n; + r = lfn_open(cid, oldoid, false, &o); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_WRONLY, 0644); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n); + if (r < 0) { goto out; } - r = _do_clone_range(o, n, srcoff, len, dstoff); + r = _do_clone_range(**o, **n, srcoff, len, dstoff); // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); lfn_close(n); out: @@ -3273,89 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return r; } - -bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica) -{ - bool queued; - lock.Lock(); - if (flusher_queue_len < m_filestore_flusher_max_fds) { - flusher_queue.push_back(sync_epoch); - flusher_queue.push_back(fd); - flusher_queue.push_back(off); - flusher_queue.push_back(len); - flusher_queue.push_back(replica); - flusher_queue_len++; - flusher_cond.Signal(); - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << dendl; - queued = true; - } else { - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << " hit flusher_max_fds " << m_filestore_flusher_max_fds - << ", skipping async flush" << dendl; - queued = false; - } - lock.Unlock(); - return queued; -} - -void FileStore::flusher_entry() -{ - lock.Lock(); - dout(20) << "flusher_entry start" << dendl; - while (true) { - if (!flusher_queue.empty()) { -#ifdef HAVE_SYNC_FILE_RANGE - list<uint64_t> q; - q.swap(flusher_queue); - - int num = flusher_queue_len; // see how many we're taking, here - - lock.Unlock(); - while (!q.empty()) { - uint64_t ep = q.front(); - q.pop_front(); - int fd = q.front(); - q.pop_front(); - uint64_t off = q.front(); - q.pop_front(); - uint64_t len = q.front(); - q.pop_front(); - bool replica = q.front(); - q.pop_front(); - if (!stop && ep == sync_epoch) { - dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl; - ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE); - if (replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } else - dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep - << ", sync_epoch=" << sync_epoch << ")" << dendl; - lfn_close(fd); - } - lock.Lock(); - flusher_queue_len -= num; // they're definitely closed, forget -#endif - } else { - if (stop) - break; - dout(20) << "flusher_entry sleeping" << dendl; - flusher_cond.Wait(lock); - dout(20) << "flusher_entry awoke" << dendl; - } - } - dout(20) << "flusher_entry finish" << dendl; - lock.Unlock(); -} - class SyncEntryTimeout : public Context { public: SyncEntryTimeout(int commit_timeo) @@ -3548,6 +3434,7 @@ void FileStore::sync_entry() logger->tinc(l_os_commit_len, dur); apply_manager.commit_finish(); + wbthrottle.clear(); logger->set(l_os_committing, 0); @@ -3856,15 +3743,14 @@ bool FileStore::debug_mdata_eio(const hobject_t &oid) { int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, bufferptr &bp) { dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = _fgetattr(fd, n, bp); + r = _fgetattr(**fd, n, bp); lfn_close(fd); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { map<string, bufferlist> got; @@ -3903,13 +3789,12 @@ int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, buffe int FileStore::getattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset, bool user_only) { dout(15) << "getattrs " << cid << "/" << oid << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, user_only); + r = _fgetattrs(**fd, aset, user_only); lfn_close(fd); if (g_conf->filestore_xattr_use_omap) { set<string> omap_attrs; @@ -3967,14 +3852,13 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> set<string> omap_remove; map<string, bufferptr> inline_set; map<string, bufferptr> inline_to_set; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } if (g_conf->filestore_xattr_use_omap) { - r = _fgetattrs(fd, inline_set, false); + r = _fgetattrs(**fd, inline_set, false); assert(!m_filestore_fail_eio || r != -EIO); } dout(15) << "setattrs " << cid << "/" << oid << dendl; @@ -3988,7 +3872,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> if (p->second.length() > g_conf->filestore_max_inline_xattr_size) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4000,7 +3884,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> inline_set.size() >= g_conf->filestore_max_inline_xattrs) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4015,7 +3899,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr> } - r = _fsetattrs(fd, inline_to_set); + r = _fsetattrs(**fd, inline_to_set); if (r < 0) goto out_close; @@ -4050,15 +3934,14 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name, const SequencerPosition &spos) { dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { Index index; r = get_index(cid, &index); @@ -4088,18 +3971,17 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid, dout(15) << "rmattrs " << cid << "/" << oid << dendl; map<string,bufferptr> aset; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, false); + r = _fgetattrs(**fd, aset, false); if (r >= 0) { for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) { char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) break; } @@ -4687,21 +4569,21 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // open guard on object so we don't any previous operations on the // new name that will modify the source inode. - int fd = lfn_open(oldcid, o, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(oldcid, o, 0, &fd); + if (r < 0) { // the source collection/object does not exist. If we are replaying, we // should be safe, so just return 0 and move on. assert(replaying); dout(10) << "collection_add " << c << "/" << o << " from " - << oldcid << "/" << o << " (dne, continue replay) " << dendl; + << oldcid << "/" << o << " (dne, continue replay) " << dendl; return 0; } - assert(fd >= 0); if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress" - _set_replay_guard(fd, spos, &o, true); + _set_replay_guard(**fd, spos, &o, true); } - int r = lfn_link(oldcid, c, o); + r = lfn_link(oldcid, c, o); if (replaying && !btrfs_stable_commits && r == -EEXIST) // crashed between link() and set_replay_guard() r = 0; @@ -4710,7 +4592,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // close guard on object so we don't do this again if (r == 0) { - _close_replay_guard(fd, spos); + _close_replay_guard(**fd, spos); } lfn_close(fd); @@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const "filestore_queue_max_bytes", "filestore_queue_committing_max_ops", "filestore_queue_committing_max_bytes", - "filestore_flusher", - "filestore_flusher_max_fds", - "filestore_sync_flush", "filestore_commit_timeout", "filestore_dump_file", "filestore_kill_at", @@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, changed.count("filestore_queue_max_bytes") || changed.count("filestore_queue_committing_max_ops") || changed.count("filestore_queue_committing_max_bytes") || - changed.count("filestore_flusher") || - changed.count("filestore_flusher_max_fds") || - changed.count("filestore_flush_min") || changed.count("filestore_kill_at") || changed.count("filestore_fail_eio") || changed.count("filestore_replica_fadvise")) { @@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes; m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops; m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes; - m_filestore_flusher = conf->filestore_flusher; - m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds; - m_filestore_flush_min = conf->filestore_flush_min; - m_filestore_sync_flush = conf->filestore_sync_flush; m_filestore_kill_at.set(conf->filestore_kill_at); m_filestore_fail_eio = conf->filestore_fail_eio; m_filestore_replica_fadvise = conf->filestore_replica_fadvise; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index d5ca2a4c237..78668dd92a4 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -40,6 +40,8 @@ using namespace __gnu_cxx; #include "IndexManager.h" #include "ObjectMap.h" #include "SequencerPosition.h" +#include "FDCache.h" +#include "WBThrottle.h" #include "include/uuid.h" @@ -198,6 +200,10 @@ private: friend ostream& operator<<(ostream& out, const OpSequencer& s); + Mutex fdcache_lock; + FDCache fdcache; + WBThrottle wbthrottle; + Sequencer default_osr; deque<OpSequencer*> op_queue; uint64_t op_queue_len, op_queue_bytes; @@ -250,37 +256,22 @@ private: void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend class C_JournaledAhead; - // flusher thread - Cond flusher_cond; - list<uint64_t> flusher_queue; - int flusher_queue_len; - void flusher_entry(); - struct FlusherThread : public Thread { - FileStore *fs; - FlusherThread(FileStore *f) : fs(f) {} - void *entry() { - fs->flusher_entry(); - return 0; - } - } flusher_thread; - bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica); - int open_journal(); - PerfCounters *logger; public: int lfn_find(coll_t cid, const hobject_t& oid, IndexedPath *path); int lfn_truncate(coll_t cid, const hobject_t& oid, off_t length); int lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path, Index *index); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode); - int lfn_open(coll_t cid, const hobject_t& oid, int flags); - void lfn_close(int fd); + int lfn_open( + coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, + IndexedPath *path = 0, + Index *index = 0); + void lfn_close(FDRef fd); int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ; int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos); @@ -510,15 +501,11 @@ private: bool m_filestore_btrfs_snap; float m_filestore_commit_timeout; bool m_filestore_fiemap; - bool m_filestore_flusher; bool m_filestore_fsync_flushes_journal_data; bool m_filestore_journal_parallel; bool m_filestore_journal_trailing; bool m_filestore_journal_writeahead; int m_filestore_fiemap_threshold; - bool m_filestore_sync_flush; - int m_filestore_flusher_max_fds; - int m_filestore_flush_min; double m_filestore_max_sync_interval; double m_filestore_min_sync_interval; bool m_filestore_fail_eio; diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc new file mode 100644 index 00000000000..4673488f833 --- /dev/null +++ b/src/os/WBThrottle.cc @@ -0,0 +1,239 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "os/WBThrottle.h" +#include "common/perf_counters.h" + +WBThrottle::WBThrottle(CephContext *cct) : + cur_ios(0), cur_size(0), + cct(cct), + logger(NULL), + stopping(false), + lock("WBThrottle::lock", false, true, false, cct), + fs(XFS) +{ + { + Mutex::Locker l(lock); + set_from_conf(); + } + assert(cct); + PerfCountersBuilder b( + cct, string("WBThrottle"), + l_wbthrottle_first, l_wbthrottle_last); + b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied"); + b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb"); + b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied"); + b.add_u64(l_wbthrottle_ios_wb, "ios_wb"); + b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied"); + b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i) + logger->set(i, 0); + + cct->_conf->add_observer(this); + create(); +} + +WBThrottle::~WBThrottle() { + assert(cct); + { + Mutex::Locker l(lock); + stopping = true; + cond.Signal(); + } + join(); + cct->get_perfcounters_collection()->remove(logger); + delete logger; + cct->_conf->remove_observer(this); +} + +const char** WBThrottle::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "filestore_wbthrottle_btrfs_bytes_start_flusher", + "filestore_wbthrottle_btrfs_bytes_hard_limit", + "filestore_wbthrottle_btrfs_ios_start_flusher", + "filestore_wbthrottle_btrfs_ios_hard_limit", + "filestore_wbthrottle_btrfs_inodes_start_flusher", + "filestore_wbthrottle_btrfs_inodes_hard_limit", + "filestore_wbthrottle_xfs_bytes_start_flusher", + "filestore_wbthrottle_xfs_bytes_hard_limit", + "filestore_wbthrottle_xfs_ios_start_flusher", + "filestore_wbthrottle_xfs_ios_hard_limit", + "filestore_wbthrottle_xfs_inodes_start_flusher", + "filestore_wbthrottle_xfs_inodes_hard_limit", + NULL + }; + return KEYS; +} + +void WBThrottle::set_from_conf() +{ + assert(lock.is_locked()); + if (fs == BTRFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; + } else if (fs == XFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; + } else { + assert(0 == "invalid value for fs"); + } + cond.Signal(); +} + +void WBThrottle::handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed) +{ + Mutex::Locker l(lock); + for (const char** i = get_tracked_conf_keys(); *i; ++i) { + if (changed.count(*i)) { + set_from_conf(); + return; + } + } +} + +bool WBThrottle::get_next_should_flush( + boost::tuple<hobject_t, FDRef, PendingWB> *next) +{ + assert(lock.is_locked()); + assert(next); + while (!stopping && + cur_ios < io_limits.first && + pending_wbs.size() < fd_limits.first && + cur_size < size_limits.first) + cond.Wait(lock); + if (stopping) + return false; + assert(!pending_wbs.empty()); + hobject_t obj(pop_object()); + + map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(obj); + *next = boost::make_tuple(obj, i->second.second, i->second.first); + pending_wbs.erase(i); + return true; +} + + +void *WBThrottle::entry() +{ + Mutex::Locker l(lock); + boost::tuple<hobject_t, FDRef, PendingWB> wb; + while (get_next_should_flush(&wb)) { + clearing = wb.get<0>(); + lock.Unlock(); + ::fsync(**wb.get<1>()); + if (wb.get<2>().nocache) + posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); + lock.Lock(); + clearing = hobject_t(); + cur_ios -= wb.get<2>().ios; + logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios); + cur_size -= wb.get<2>().size; + logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size); + logger->dec(l_wbthrottle_inodes_dirtied); + cond.Signal(); + wb = boost::tuple<hobject_t, FDRef, PendingWB>(); + } + return 0; +} + +void WBThrottle::queue_wb( + FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len, + bool nocache) +{ + Mutex::Locker l(lock); + map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter = + pending_wbs.find(hoid); + if (wbiter == pending_wbs.end()) { + wbiter = pending_wbs.insert( + make_pair(hoid, + make_pair( + PendingWB(), + fd))).first; + logger->inc(l_wbthrottle_inodes_dirtied); + } else { + remove_object(hoid); + } + + cur_ios++; + logger->inc(l_wbthrottle_ios_dirtied); + cur_size += len; + logger->inc(l_wbthrottle_bytes_dirtied, len); + + wbiter->second.first.add(nocache, len, 1); + insert_object(hoid); + cond.Signal(); +} + +void WBThrottle::clear() +{ + Mutex::Locker l(lock); + for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.begin(); + i != pending_wbs.end(); + ++i) { + cur_ios -= i->second.first.ios; + logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios); + cur_size -= i->second.first.size; + logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size); + logger->dec(l_wbthrottle_inodes_dirtied); + } + pending_wbs.clear(); + lru.clear(); + rev_lru.clear(); + cond.Signal(); + assert(cur_ios == 0); + assert(cur_size == 0); +} + +void WBThrottle::clear_object(const hobject_t &hoid) +{ + Mutex::Locker l(lock); + while (clearing == hoid) + cond.Wait(lock); + map<hobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(hoid); + if (i == pending_wbs.end()) + return; + + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + + pending_wbs.erase(i); + remove_object(hoid); +} + +void WBThrottle::throttle() +{ + Mutex::Locker l(lock); + while (!stopping && !( + cur_ios < io_limits.second && + pending_wbs.size() < fd_limits.second && + cur_size < size_limits.second)) { + cond.Wait(lock); + } +} diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h new file mode 100644 index 00000000000..070de08e123 --- /dev/null +++ b/src/os/WBThrottle.h @@ -0,0 +1,171 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef WBTHROTTLE_H +#define WBTHROTTLE_H + +#include <map> +#include <boost/tuple/tuple.hpp> +#include <tr1/memory> +#include "include/buffer.h" +#include "common/Formatter.h" +#include "os/hobject.h" +#include "include/interval_set.h" +#include "FDCache.h" +#include "common/Thread.h" +#include "common/ceph_context.h" + +class PerfCounters; +enum { + l_wbthrottle_first = 999090, + l_wbthrottle_bytes_dirtied, + l_wbthrottle_bytes_wb, + l_wbthrottle_ios_dirtied, + l_wbthrottle_ios_wb, + l_wbthrottle_inodes_dirtied, + l_wbthrottle_inodes_wb, + l_wbthrottle_last +}; + +/** + * WBThrottle + * + * Tracks, throttles, and flushes outstanding IO + */ +class WBThrottle : Thread, public md_config_obs_t { + hobject_t clearing; + + /* *_limits.first is the start_flusher limit and + * *_limits.second is the hard limit + */ + + /// Limits on unflushed bytes + pair<uint64_t, uint64_t> size_limits; + + /// Limits on unflushed ios + pair<uint64_t, uint64_t> io_limits; + + /// Limits on unflushed objects + pair<uint64_t, uint64_t> fd_limits; + + uint64_t cur_ios; /// Currently unflushed IOs + uint64_t cur_size; /// Currently unflushed bytes + + /** + * PendingWB tracks the ios pending on an object. + */ + class PendingWB { + public: + bool nocache; + uint64_t size; + uint64_t ios; + PendingWB() : nocache(true), size(0), ios(0) {} + void add(bool _nocache, uint64_t _size, uint64_t _ios) { + if (!_nocache) + nocache = false; // only nocache if all writes are nocache + size += _size; + ios += _ios; + } + }; + + CephContext *cct; + PerfCounters *logger; + bool stopping; + Mutex lock; + Cond cond; + + + /** + * Flush objects in lru order + */ + list<hobject_t> lru; + map<hobject_t, list<hobject_t>::iterator> rev_lru; + void remove_object(const hobject_t &hoid) { + assert(lock.is_locked()); + map<hobject_t, list<hobject_t>::iterator>::iterator iter = + rev_lru.find(hoid); + if (iter == rev_lru.end()) + return; + + lru.erase(iter->second); + rev_lru.erase(iter); + } + hobject_t pop_object() { + assert(!lru.empty()); + hobject_t hoid(lru.front()); + lru.pop_front(); + rev_lru.erase(hoid); + return hoid; + } + void insert_object(const hobject_t &hoid) { + assert(rev_lru.find(hoid) == rev_lru.end()); + lru.push_back(hoid); + rev_lru.insert(make_pair(hoid, --lru.end())); + } + + map<hobject_t, pair<PendingWB, FDRef> > pending_wbs; + + /// get next flush to perform + bool get_next_should_flush( + boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush + ); ///< @return false if we are shutting down +public: + enum FS { + BTRFS, + XFS + }; + +private: + FS fs; + + void set_from_conf(); +public: + WBThrottle(CephContext *cct); + ~WBThrottle(); + + /// Set fs as XFS or BTRFS + void set_fs(FS new_fs) { + Mutex::Locker l(lock); + fs = new_fs; + set_from_conf(); + } + + /// Queue wb on hoid, fd taking throttle (does not block) + void queue_wb( + FDRef fd, ///< [in] FDRef to hoid + const hobject_t &hoid, ///< [in] object + uint64_t offset, ///< [in] offset written + uint64_t len, ///< [in] length written + bool nocache ///< [in] try to clear out of cache after write + ); + + /// Clear all wb (probably due to sync) + void clear(); + + /// Clear object + void clear_object(const hobject_t &hoid); + + /// Block until there is throttle available + void throttle(); + + /// md_config_obs_t + const char** get_tracked_conf_keys() const; + void handle_conf_change(const md_config_t *conf, + const std::set<std::string> &changed); + + /// Thread + void *entry(); +}; + +#endif diff --git a/src/os/hobject.h b/src/os/hobject.h index 28e0da0d82a..47fcb3dda39 100644 --- a/src/os/hobject.h +++ b/src/os/hobject.h @@ -15,6 +15,9 @@ #ifndef __CEPH_OS_HOBJECT_H #define __CEPH_OS_HOBJECT_H +#include <string.h> +#include "include/types.h" +#include "include/rados.h" #include "include/object.h" #include "include/cmp.h" diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 761a77cd69c..019d6b8d99b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5288,7 +5288,6 @@ void ReplicatedPG::submit_push_data( void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { - remove_snap_mapped_object(*t, recovery_info.soid); t->collection_move(coll, get_temp_coll(t), recovery_info.soid); for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = recovery_info.clone_subset.begin(); |