From 12eff5d9ff46617f0067a1c57e2b61fc43afb3bb Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 21 May 2013 12:47:05 -0700 Subject: common/shared_cache.hpp: add clear() Clear clears a key/value from the cache. Signed-off-by: Samuel Just --- src/common/shared_cache.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp index 69a4c06dfbf..50da23a4f2f 100644 --- a/src/common/shared_cache.hpp +++ b/src/common/shared_cache.hpp @@ -85,6 +85,17 @@ public: assert(weak_refs.empty()); } + void clear(K key) { + VPtr val; // release any ref we have after we drop the lock + { + Mutex::Locker l(lock); + if (weak_refs.count(key)) { + val = weak_refs[key].lock(); + } + lru_remove(key); + } + } + void set_size(size_t new_size) { list to_release; { -- cgit v1.2.1 From 17a6e7faa7a512b803f228e26c101ccb45306f7a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 10 May 2013 12:26:32 -0700 Subject: doc/dev/osd_internals: add some info about throttles Signed-off-by: Samuel Just --- doc/dev/osd_internals/osd_throttles.rst | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 doc/dev/osd_internals/osd_throttles.rst diff --git a/doc/dev/osd_internals/osd_throttles.rst b/doc/dev/osd_internals/osd_throttles.rst new file mode 100644 index 00000000000..4fa3044f986 --- /dev/null +++ b/doc/dev/osd_internals/osd_throttles.rst @@ -0,0 +1,21 @@ + Messenger throttle (number and size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + FileStore op_queue throttle (number and size) + |--------------------------------------------------------| + WBThrottle + |---------------------------------------------------------------------------------------------------------| + Journal (size) + |-----------------------------------------------------------------------------------------------------------------------------------------------------------------| + |----------------------------------------------------------------------------------------------------> flushed ----------------> synced + | +Op: Read Header --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> op_applied -------------------------------------------------------------> Complete + | | +SubOp: --Messenger--> ReadHeader --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> sub_op_applied - + | + |-----------------------------> flushed ----------------> synced + |------------------------------------------------------------------------------------------| + Journal (size) + |---------------------------------| + WBThrottle + |-----------------------------------------------------| + FileStore op_queue throttle (number and size) -- cgit v1.2.1 From ca28d87ef201cfda5a0ae6eac7bf980a85d3465b Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 20 May 2013 13:38:19 -0700 Subject: common/shared_cache.hpp: fix set_size() Signed-off-by: Samuel Just --- src/common/shared_cache.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp index 50da23a4f2f..178d1001be3 100644 --- a/src/common/shared_cache.hpp +++ b/src/common/shared_cache.hpp @@ -101,7 +101,7 @@ public: { Mutex::Locker l(lock); max_size = new_size; - trim_cache(to_release); + trim_cache(&to_release); } } -- cgit v1.2.1 From 08665176e2fb23815da6154d53132ed3bbbf7eb8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 17 May 2013 10:38:08 -0700 Subject: FileStore: add fd cache Signed-off-by: Samuel Just --- src/Makefile.am | 1 + src/common/config_opts.h | 1 + src/os/FDCache.h | 95 +++++++++++++++++++ src/os/FileStore.cc | 240 +++++++++++++++++++++++------------------------ src/os/FileStore.h | 18 ++-- 5 files changed, 228 insertions(+), 127 deletions(-) create mode 100644 src/os/FDCache.h diff --git a/src/Makefile.am b/src/Makefile.am index 7a08e1f5a2a..9525fe63f4f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1973,6 +1973,7 @@ noinst_HEADERS = \ os/FileStore.h\ os/FlatIndex.h\ os/HashIndex.h\ + os/FDCache.h\ os/IndexManager.h\ os/Journal.h\ os/JournalingObjectStore.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 8a1da07e036..68c7bef08b3 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -514,6 +514,7 @@ OPTION(filestore_merge_threshold, OPT_INT, 10) OPTION(filestore_split_multiple, OPT_INT, 2) OPTION(filestore_update_to, OPT_INT, 1000) OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor +OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread diff --git a/src/os/FDCache.h b/src/os/FDCache.h new file mode 100644 index 00000000000..cf07f860aa5 --- /dev/null +++ b/src/os/FDCache.h @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_FDCACHE_H +#define CEPH_FDCACHE_H + +#include +#include +#include +#include "hobject.h" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/shared_cache.hpp" +#include "include/compat.h" + +/** + * FD Cache + */ +class FDCache : public md_config_obs_t { + /** + * FD + * + * Wrapper for an fd. Destructor closes the fd. + */ + class FD { + public: + const int fd; + FD(int _fd) : fd(_fd) { + assert(_fd >= 0); + } + int operator*() const { + return fd; + } + ~FD() { + TEMP_FAILURE_RETRY(::close(fd)); + } + }; + + SharedLRU registry; + CephContext *cct; +public: + FDCache(CephContext *cct) : cct(cct) { + assert(cct); + cct->_conf->add_observer(this); + registry.set_size(cct->_conf->filestore_fd_cache_size); + } + ~FDCache() { + cct->_conf->remove_observer(this); + } + typedef std::tr1::shared_ptr FDRef; + + FDRef lookup(const hobject_t &hoid) { + return registry.lookup(hoid); + } + + FDRef add(const hobject_t &hoid, int fd) { + return registry.add(hoid, new FD(fd)); + } + + /// clear cached fd for hoid, subsequent lookups will get an empty FD + void clear(const hobject_t &hoid) { + registry.clear(hoid); + assert(!registry.lookup(hoid)); + } + + /// md_config_obs_t + const char** get_tracked_conf_keys() const { + static const char* KEYS[] = { + "filestore_fd_cache_size", + NULL + }; + return KEYS; + } + void handle_conf_change(const md_config_t *conf, + const std::set &changed) { + if (changed.count("filestore_fd_cache_size")) { + registry.set_size(conf->filestore_fd_cache_size); + } + } + +}; +typedef FDCache::FDRef FDRef; + +#endif diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index b32f2875f71..24f7f1eff08 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -196,10 +196,22 @@ int FileStore::lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf) return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, +int FileStore::lfn_open(coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, IndexedPath *path, Index *index) { + assert(outfd); + int flags = O_RDWR; + if (create) + flags |= O_CREAT; + Mutex::Locker l(fdcache_lock); + *outfd = fdcache.lookup(oid); + if (*outfd) { + return 0; + } Index index2; IndexedPath path2; if (!path) @@ -224,16 +236,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } - r = ::open((*path)->path(), flags, mode); + r = ::open((*path)->path(), flags, 0644); if (r < 0) { r = -errno; dout(10) << "error opening file " << (*path)->path() << " with flags=" - << flags << " and mode=" << mode << ": " << cpp_strerror(-r) << dendl; + << flags << ": " << cpp_strerror(-r) << dendl; goto fail; } fd = r; - if ((flags & O_CREAT) && (!exist)) { + if (create && (!exist)) { r = (*index)->created(oid, (*path)->path()); if (r < 0) { TEMP_FAILURE_RETRY(::close(fd)); @@ -242,31 +254,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode goto fail; } } - return fd; + *outfd = fdcache.add(oid, fd); + return 0; fail: assert(!m_filestore_fail_eio || r != -EIO); return r; } -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, IndexedPath *path) -{ - return lfn_open(cid, oid, flags, mode, path, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode) -{ - return lfn_open(cid, oid, flags, mode, 0, 0); -} - -int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags) +void FileStore::lfn_close(FDRef fd) { - return lfn_open(cid, oid, flags, 0); -} - -void FileStore::lfn_close(int fd) -{ - TEMP_FAILURE_RETRY(::close(fd)); } int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) @@ -324,6 +321,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos) { + Mutex::Locker l(fdcache_lock); Index index; int r = get_index(cid, &index); if (r < 0) @@ -355,6 +353,7 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, if (g_conf->filestore_debug_inject_read_err) { debug_obj_on_delete(o); } + fdcache.clear(o); } else { /* Ensure that replay of this op doesn't result in the object_map * going away. @@ -387,6 +386,8 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha sync_entry_timeo_lock("sync_entry_timeo_lock"), timer(g_ceph_context, sync_entry_timeo_lock), stop(false), sync_thread(this), + fdcache_lock("fdcache_lock"), + fdcache(g_ceph_context), default_osr("default"), op_queue_len(0), op_queue_bytes(0), op_throttle_lock("FileStore::op_throttle_lock"), @@ -2263,12 +2264,13 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos if (!replaying || btrfs_stable_commits) return 1; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl; return 1; // if file does not exist, there is no guard, and we can replay. } - int ret = _check_replay_guard(fd, spos); + int ret = _check_replay_guard(**fd, spos); lfn_close(fd); return ret; } @@ -2762,22 +2764,24 @@ int FileStore::read( dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " << cpp_strerror(fd) << dendl; - return fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { + dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " + << cpp_strerror(r) << dendl; + return r; } if (len == 0) { struct stat st; memset(&st, 0, sizeof(struct stat)); - int r = ::fstat(fd, &st); + int r = ::fstat(**fd, &st); assert(r == 0); len = st.st_size; } bufferptr bptr(len); // prealloc space for entire read - got = safe_pread(fd, bptr.c_str(), len, offset); + got = safe_pread(**fd, bptr.c_str(), len, offset); if (got < 0) { dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl; lfn_close(fd); @@ -2815,15 +2819,14 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl; - int r; - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - r = fd; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl; } else { uint64_t i; - r = do_fiemap(fd, offset, len, &fiemap); + r = do_fiemap(**fd, offset, len, &fiemap); if (r < 0) goto done; @@ -2865,10 +2868,10 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid, } done: - if (fd >= 0) + if (r >= 0) { lfn_close(fd); - if (r >= 0) ::encode(exomap, bl); + } dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << exomap.size() << " " << exomap << dendl; free(fiemap); @@ -2899,14 +2902,13 @@ int FileStore::_touch(coll_t cid, const hobject_t& oid) { dout(15) << "touch " << cid << "/" << oid << dendl; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - int r; - if (fd >= 0) { + FDRef fd; + int r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + return r; + } else { lfn_close(fd); - r = 0; - } else - r = fd; + } dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl; return r; } @@ -2920,17 +2922,17 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, int64_t actual; - int flags = O_WRONLY|O_CREAT; - int fd = lfn_open(cid, oid, flags, 0644); - if (fd < 0) { - r = fd; - dout(0) << "write couldn't open " << cid << "/" << oid << " flags " << flags << ": " + FDRef fd; + r = lfn_open(cid, oid, true, &fd); + if (r < 0) { + dout(0) << "write couldn't open " << cid << "/" + << oid << ": " << cpp_strerror(r) << dendl; goto out; } // seek - actual = ::lseek64(fd, offset, SEEK_SET); + actual = ::lseek64(**fd, offset, SEEK_SET); if (actual < 0) { r = -errno; dout(0) << "write lseek64 to " << offset << " failed: " << cpp_strerror(r) << dendl; @@ -2945,7 +2947,7 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, } // write - r = bl.write_fd(fd); + r = bl.write_fd(**fd); if (r == 0) r = bl.length(); @@ -2957,23 +2959,24 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, bool async_done = false; if (!should_flush || !m_filestore_flusher || - !(async_done = queue_flusher(fd, offset, len, replica))) { + !(async_done = queue_flusher(**fd, offset, len, replica))) { if (should_flush && m_filestore_sync_flush) { - ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE); + ::sync_file_range(**fd, offset, len, SYNC_FILE_RANGE_WRITE); local_flush = true; } } + // TODOSAM: this will be fixed in a subsequent patch //Both lfn_close() and possible posix_fadvise() done by flusher - if (async_done) fd = -1; + //if (async_done) fd = -1; #else // no sync_file_range; (maybe) flush inline and close. if (should_flush && m_filestore_sync_flush) { - ::fdatasync(fd); + ::fdatasync(**fd); local_flush = true; } #endif if (local_flush && replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED); + int fa_r = posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED); if (fa_r) { dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; } else { @@ -2981,7 +2984,7 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, } } } - if (fd >= 0) lfn_close(fd); + lfn_close(fd); out: dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl; @@ -2996,14 +2999,14 @@ int FileStore::_zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t l #ifdef CEPH_HAVE_FALLOCATE # if !defined(DARWIN) && !defined(__FreeBSD__) // first try to punch a hole. - int fd = lfn_open(cid, oid, O_RDONLY); - if (fd < 0) { - ret = -errno; + FDRef fd; + ret = lfn_open(cid, oid, false, &fd); + if (ret < 0) { goto out; } // first try fallocate - ret = fallocate(fd, FALLOC_FL_PUNCH_HOLE, offset, len); + ret = fallocate(**fd, FALLOC_FL_PUNCH_HOLE, offset, len); if (ret < 0) ret = -errno; lfn_close(fd); @@ -3039,23 +3042,26 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo if (_check_replay_guard(cid, newoid, spos) < 0) return 0; - int o, n, r; + int r; + FDRef o, n; { Index index; IndexedPath from, to; - o = lfn_open(cid, oldoid, O_RDONLY, 0, &from, &index); - if (o < 0) { - r = o; + r = lfn_open(cid, oldoid, false, &o, &from, &index); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_TRUNC|O_WRONLY, 0644, &to, &index); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n, &to, &index); + if (r < 0) { + goto out; + } + r = ::ftruncate(**n, 0); + if (r < 0) { goto out; } struct stat st; - ::fstat(o, &st); - r = _do_clone_range(o, n, 0, st.st_size, 0); + ::fstat(**o, &st); + r = _do_clone_range(**o, **n, 0, st.st_size, 0); if (r < 0) { r = -errno; goto out3; @@ -3068,17 +3074,17 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo { map aset; - r = _fgetattrs(o, aset, false); + r = _fgetattrs(**o, aset, false); if (r < 0) goto out3; - r = _fsetattrs(n, aset); + r = _fsetattrs(**n, aset); if (r < 0) goto out3; } // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); out3: lfn_close(n); @@ -3248,21 +3254,19 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return 0; int r; - int o, n; - o = lfn_open(cid, oldoid, O_RDONLY); - if (o < 0) { - r = o; + FDRef o, n; + r = lfn_open(cid, oldoid, false, &o); + if (r < 0) { goto out2; } - n = lfn_open(cid, newoid, O_CREAT|O_WRONLY, 0644); - if (n < 0) { - r = n; + r = lfn_open(cid, newoid, true, &n); + if (r < 0) { goto out; } - r = _do_clone_range(o, n, srcoff, len, dstoff); + r = _do_clone_range(**o, **n, srcoff, len, dstoff); // clone is non-idempotent; record our work. - _set_replay_guard(n, spos, &newoid); + _set_replay_guard(**n, spos, &newoid); lfn_close(n); out: @@ -3339,7 +3343,8 @@ void FileStore::flusher_entry() } else dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep << ", sync_epoch=" << sync_epoch << ")" << dendl; - lfn_close(fd); + // TODOSAM: this will be replaced in a subsequent patch + //lfn_close(fd); } lock.Lock(); flusher_queue_len -= num; // they're definitely closed, forget @@ -3856,15 +3861,14 @@ bool FileStore::debug_mdata_eio(const hobject_t &oid) { int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, bufferptr &bp) { dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = _fgetattr(fd, n, bp); + r = _fgetattr(**fd, n, bp); lfn_close(fd); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { map got; @@ -3903,13 +3907,12 @@ int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, buffe int FileStore::getattrs(coll_t cid, const hobject_t& oid, map& aset, bool user_only) { dout(15) << "getattrs " << cid << "/" << oid << dendl; - int r; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, user_only); + r = _fgetattrs(**fd, aset, user_only); lfn_close(fd); if (g_conf->filestore_xattr_use_omap) { set omap_attrs; @@ -3967,14 +3970,13 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map set omap_remove; map inline_set; map inline_to_set; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } if (g_conf->filestore_xattr_use_omap) { - r = _fgetattrs(fd, inline_set, false); + r = _fgetattrs(**fd, inline_set, false); assert(!m_filestore_fail_eio || r != -EIO); } dout(15) << "setattrs " << cid << "/" << oid << dendl; @@ -3988,7 +3990,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map if (p->second.length() > g_conf->filestore_max_inline_xattr_size) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4000,7 +4002,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map inline_set.size() >= g_conf->filestore_max_inline_xattrs) { if (inline_set.count(p->first)) { inline_set.erase(p->first); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) goto out_close; } @@ -4015,7 +4017,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map } - r = _fsetattrs(fd, inline_to_set); + r = _fsetattrs(**fd, inline_to_set); if (r < 0) goto out_close; @@ -4050,15 +4052,14 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name, const SequencerPosition &spos) { dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r == -ENODATA && g_conf->filestore_xattr_use_omap) { Index index; r = get_index(cid, &index); @@ -4088,18 +4089,17 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid, dout(15) << "rmattrs " << cid << "/" << oid << dendl; map aset; - int r = 0; - int fd = lfn_open(cid, oid, 0); - if (fd < 0) { - r = -errno; + FDRef fd; + int r = lfn_open(cid, oid, false, &fd); + if (r < 0) { goto out; } - r = _fgetattrs(fd, aset, false); + r = _fgetattrs(**fd, aset, false); if (r >= 0) { for (map::iterator p = aset.begin(); p != aset.end(); ++p) { char n[CHAIN_XATTR_MAX_NAME_LEN]; get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN); - r = chain_fremovexattr(fd, n); + r = chain_fremovexattr(**fd, n); if (r < 0) break; } @@ -4687,21 +4687,21 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // open guard on object so we don't any previous operations on the // new name that will modify the source inode. - int fd = lfn_open(oldcid, o, 0); - if (fd < 0) { + FDRef fd; + int r = lfn_open(oldcid, o, 0, &fd); + if (r < 0) { // the source collection/object does not exist. If we are replaying, we // should be safe, so just return 0 and move on. assert(replaying); dout(10) << "collection_add " << c << "/" << o << " from " - << oldcid << "/" << o << " (dne, continue replay) " << dendl; + << oldcid << "/" << o << " (dne, continue replay) " << dendl; return 0; } - assert(fd >= 0); if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress" - _set_replay_guard(fd, spos, &o, true); + _set_replay_guard(**fd, spos, &o, true); } - int r = lfn_link(oldcid, c, o); + r = lfn_link(oldcid, c, o); if (replaying && !btrfs_stable_commits && r == -EEXIST) // crashed between link() and set_replay_guard() r = 0; @@ -4710,7 +4710,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, // close guard on object so we don't do this again if (r == 0) { - _close_replay_guard(fd, spos); + _close_replay_guard(**fd, spos); } lfn_close(fd); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index d5ca2a4c237..00249f274c1 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -40,6 +40,7 @@ using namespace __gnu_cxx; #include "IndexManager.h" #include "ObjectMap.h" #include "SequencerPosition.h" +#include "FDCache.h" #include "include/uuid.h" @@ -198,6 +199,8 @@ private: friend ostream& operator<<(ostream& out, const OpSequencer& s); + Mutex fdcache_lock; + FDCache fdcache; Sequencer default_osr; deque op_queue; uint64_t op_queue_len, op_queue_bytes; @@ -274,13 +277,14 @@ public: int lfn_find(coll_t cid, const hobject_t& oid, IndexedPath *path); int lfn_truncate(coll_t cid, const hobject_t& oid, off_t length); int lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, - IndexedPath *path, Index *index); - int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode); - int lfn_open(coll_t cid, const hobject_t& oid, int flags); - void lfn_close(int fd); + int lfn_open( + coll_t cid, + const hobject_t& oid, + bool create, + FDRef *outfd, + IndexedPath *path = 0, + Index *index = 0); + void lfn_close(FDRef fd); int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ; int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos); -- cgit v1.2.1 From 6670e2a73db8f59e2a6bac024afe43cd8a8a6715 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 14 May 2013 14:44:06 -0700 Subject: os/: Add WBThrottle Signed-off-by: Samuel Just --- src/Makefile.am | 4 +- src/common/config_opts.h | 14 ++++ src/os/WBThrottle.cc | 209 +++++++++++++++++++++++++++++++++++++++++++++++ src/os/WBThrottle.h | 155 +++++++++++++++++++++++++++++++++++ src/os/hobject.h | 3 + 5 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 src/os/WBThrottle.cc create mode 100644 src/os/WBThrottle.h diff --git a/src/Makefile.am b/src/Makefile.am index 9525fe63f4f..5e10c9eed25 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1490,7 +1490,8 @@ libos_a_SOURCES = \ os/IndexManager.cc \ os/FlatIndex.cc \ os/DBObjectMap.cc \ - os/LevelDBStore.cc + os/LevelDBStore.cc \ + os/WBThrottle.cc libos_a_CXXFLAGS= ${AM_CXXFLAGS} noinst_LIBRARIES += libos.a @@ -1974,6 +1975,7 @@ noinst_HEADERS = \ os/FlatIndex.h\ os/HashIndex.h\ os/FDCache.h\ + os/WBThrottle.h\ os/IndexManager.h\ os/Journal.h\ os/JournalingObjectStore.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 68c7bef08b3..27593cb9f9d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -474,6 +474,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) OPTION(filestore, OPT_BOOL, false) +/// filestore wb throttle limits +OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100) +OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100) + // Tests index failure paths OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0) diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc new file mode 100644 index 00000000000..24f7730c47f --- /dev/null +++ b/src/os/WBThrottle.cc @@ -0,0 +1,209 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "os/WBThrottle.h" + +WBThrottle::WBThrottle(CephContext *cct) : + cur_ios(0), cur_size(0), + cct(cct), + stopping(false), + lock("WBThrottle::lock", false, true, false, cct), + fs(XFS) +{ + { + Mutex::Locker l(lock); + set_from_conf(); + } + assert(cct); + cct->_conf->add_observer(this); + create(); +} + +WBThrottle::~WBThrottle() { + assert(cct); + { + Mutex::Locker l(lock); + stopping = true; + cond.Signal(); + } + join(); + cct->_conf->remove_observer(this); +} + +const char** WBThrottle::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "filestore_wbthrottle_btrfs_bytes_start_flusher", + "filestore_wbthrottle_btrfs_bytes_hard_limit", + "filestore_wbthrottle_btrfs_ios_start_flusher", + "filestore_wbthrottle_btrfs_ios_hard_limit", + "filestore_wbthrottle_btrfs_inodes_start_flusher", + "filestore_wbthrottle_btrfs_inodes_hard_limit", + "filestore_wbthrottle_xfs_bytes_start_flusher", + "filestore_wbthrottle_xfs_bytes_hard_limit", + "filestore_wbthrottle_xfs_ios_start_flusher", + "filestore_wbthrottle_xfs_ios_hard_limit", + "filestore_wbthrottle_xfs_inodes_start_flusher", + "filestore_wbthrottle_xfs_inodes_hard_limit", + NULL + }; + return KEYS; +} + +void WBThrottle::set_from_conf() +{ + assert(lock.is_locked()); + if (fs == BTRFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; + } else if (fs == XFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; + } else { + assert(0 == "invalid value for fs"); + } + cond.Signal(); +} + +void WBThrottle::handle_conf_change(const md_config_t *conf, + const std::set &changed) +{ + Mutex::Locker l(lock); + for (const char** i = get_tracked_conf_keys(); *i; ++i) { + if (changed.count(*i)) { + set_from_conf(); + return; + } + } +} + +bool WBThrottle::get_next_should_flush( + boost::tuple *next) +{ + assert(lock.is_locked()); + assert(next); + while (!stopping && + cur_ios < io_limits.first && + pending_wbs.size() < fd_limits.first && + cur_size < size_limits.first) + cond.Wait(lock); + if (stopping) + return false; + assert(!pending_wbs.empty()); + hobject_t obj(pop_object()); + + map >::iterator i = + pending_wbs.find(obj); + *next = boost::make_tuple(obj, i->second.second, i->second.first); + pending_wbs.erase(i); + return true; +} + + +void *WBThrottle::entry() +{ + Mutex::Locker l(lock); + boost::tuple wb; + while (get_next_should_flush(&wb)) { + clearing = wb.get<0>(); + lock.Unlock(); + ::fsync(**wb.get<1>()); + if (wb.get<2>().replica) + posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); + lock.Lock(); + clearing = hobject_t(); + cur_ios -= wb.get<2>().ios; + cur_size -= wb.get<2>().size; + cond.Signal(); + wb = boost::tuple(); + } + return 0; +} + +void WBThrottle::queue_wb( + FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len, + bool replica) +{ + Mutex::Locker l(lock); + map >::iterator wbiter = + pending_wbs.find(hoid); + if (wbiter == pending_wbs.end()) { + wbiter = pending_wbs.insert( + make_pair(hoid, + make_pair( + PendingWB(), + fd))).first; + } else { + remove_object(hoid); + } + + cur_ios++; + cur_size += len; + wbiter->second.first.add(replica, len, 1); + insert_object(hoid); + cond.Signal(); +} + +void WBThrottle::clear() +{ + Mutex::Locker l(lock); + for (map >::iterator i = + pending_wbs.begin(); + i != pending_wbs.end(); + ++i) { + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + } + pending_wbs.clear(); + lru.clear(); + rev_lru.clear(); + cond.Signal(); +} + +void WBThrottle::clear_object(const hobject_t &hoid) +{ + Mutex::Locker l(lock); + while (clearing == hoid) + cond.Wait(lock); + map >::iterator i = + pending_wbs.find(hoid); + if (i == pending_wbs.end()) + return; + + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + + pending_wbs.erase(i); + remove_object(hoid); +} + +void WBThrottle::throttle() +{ + Mutex::Locker l(lock); + while (!stopping && !( + cur_ios < io_limits.second && + pending_wbs.size() < fd_limits.second && + cur_size < size_limits.second)) { + cond.Wait(lock); + } +} diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h new file mode 100644 index 00000000000..2ac7234bd44 --- /dev/null +++ b/src/os/WBThrottle.h @@ -0,0 +1,155 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef WBTHROTTLE_H +#define WBTHROTTLE_H + +#include +#include +#include +#include "include/buffer.h" +#include "common/Formatter.h" +#include "os/hobject.h" +#include "include/interval_set.h" +#include "FDCache.h" +#include "common/Thread.h" +#include "common/ceph_context.h" + + +/** + * WBThrottle + * + * Tracks, throttles, and flushes outstanding IO + */ +class WBThrottle : Thread, public md_config_obs_t { + hobject_t clearing; + + /// Limits on unflushed bytes + pair size_limits; + + /// Limits on unflushed ios + pair io_limits; + + /// Limits on unflushed objects + pair fd_limits; + + uint64_t cur_ios; /// Currently unflushed IOs + uint64_t cur_size; /// Currently unflushed bytes + + /** + * PendingWB tracks the ios pending on an object. + */ + class PendingWB { + public: + bool replica; + uint64_t size; + uint64_t ios; + PendingWB() : replica(true), size(0), ios(0) {} + void add(bool _replica, uint64_t _size, uint64_t _ios) { + if (!_replica) + replica = false; // only replica if all writes are replica + size += _size; + ios += _ios; + } + }; + + CephContext *cct; + bool stopping; + Mutex lock; + Cond cond; + + + /** + * Flush objects in lru order + */ + list lru; + map::iterator> rev_lru; + void remove_object(const hobject_t &hoid) { + assert(lock.is_locked()); + map::iterator>::iterator iter = + rev_lru.find(hoid); + if (iter == rev_lru.end()) + return; + + lru.erase(iter->second); + rev_lru.erase(iter); + } + hobject_t pop_object() { + assert(!lru.empty()); + hobject_t hoid(lru.front()); + lru.pop_front(); + rev_lru.erase(hoid); + return hoid; + } + void insert_object(const hobject_t &hoid) { + assert(rev_lru.find(hoid) == rev_lru.end()); + lru.push_back(hoid); + rev_lru.insert(make_pair(hoid, --lru.end())); + } + + map > pending_wbs; + + /// get next flush to perform + bool get_next_should_flush( + boost::tuple *next ///< [out] next to flush + ); ///< @return false if we are shutting down +public: + enum FS { + BTRFS, + XFS + }; + +private: + FS fs; + + void set_from_conf(); +public: + WBThrottle(CephContext *cct); + ~WBThrottle(); + + /// Set fs as XFS or BTRFS + void set_fs(FS new_fs) { + Mutex::Locker l(lock); + fs = new_fs; + set_from_conf(); + } + + /// Queue wb on hoid, fd taking throttle (does not block) + void queue_wb( + FDRef fd, ///< [in] FDRef to hoid + const hobject_t &hoid, ///< [in] object + uint64_t offset, ///< [in] offset written + uint64_t len, ///< [in] length written + bool replica ///< [in] write is for replica + ); + + /// Clear all wb (probably due to sync) + void clear(); + + /// Clear object + void clear_object(const hobject_t &hoid); + + /// Block until there is throttle available + void throttle(); + + /// md_config_obs_t + const char** get_tracked_conf_keys() const; + void handle_conf_change(const md_config_t *conf, + const std::set &changed); + + /// Thread + void *entry(); +}; + +#endif diff --git a/src/os/hobject.h b/src/os/hobject.h index 28e0da0d82a..47fcb3dda39 100644 --- a/src/os/hobject.h +++ b/src/os/hobject.h @@ -15,6 +15,9 @@ #ifndef __CEPH_OS_HOBJECT_H #define __CEPH_OS_HOBJECT_H +#include +#include "include/types.h" +#include "include/rados.h" #include "include/object.h" #include "include/cmp.h" -- cgit v1.2.1 From 489cd5c441d865f48f40acf9d4d9181e0bcc7247 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 20 May 2013 13:44:57 -0700 Subject: FileStore: integrate WBThrottle Signed-off-by: Samuel Just --- src/common/config_opts.h | 4 -- src/os/FileStore.cc | 140 ++--------------------------------------------- src/os/FileStore.h | 23 +------- 3 files changed, 9 insertions(+), 158 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 27593cb9f9d..27e2daceb31 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -508,10 +508,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true) OPTION(filestore_btrfs_clone_range, OPT_BOOL, true) OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false) OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap -OPTION(filestore_flusher, OPT_BOOL, true) -OPTION(filestore_flusher_max_fds, OPT_INT, 512) -OPTION(filestore_flush_min, OPT_INT, 65536) -OPTION(filestore_sync_flush, OPT_BOOL, false) OPTION(filestore_journal_parallel, OPT_BOOL, false) OPTION(filestore_journal_writeahead, OPT_BOOL, false) OPTION(filestore_journal_trailing, OPT_BOOL, false) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 24f7f1eff08..dca8a1bbfea 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -353,6 +353,7 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, if (g_conf->filestore_debug_inject_read_err) { debug_obj_on_delete(o); } + wbthrottle.clear_object(o); // should be only non-cache ref fdcache.clear(o); } else { /* Ensure that replay of this op doesn't result in the object_map @@ -388,6 +389,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha stop(false), sync_thread(this), fdcache_lock("fdcache_lock"), fdcache(g_ceph_context), + wbthrottle(g_ceph_context), default_osr("default"), op_queue_len(0), op_queue_bytes(0), op_throttle_lock("FileStore::op_throttle_lock"), @@ -395,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"), op_wq(this, g_conf->filestore_op_thread_timeout, g_conf->filestore_op_thread_suicide_timeout, &op_tp), - flusher_queue_len(0), flusher_thread(this), logger(NULL), read_error_lock("FileStore::read_error_lock"), m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range), m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ), m_filestore_commit_timeout(g_conf->filestore_commit_timeout), m_filestore_fiemap(g_conf->filestore_fiemap), - m_filestore_flusher (g_conf->filestore_flusher ), m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data), m_filestore_journal_parallel(g_conf->filestore_journal_parallel ), m_filestore_journal_trailing(g_conf->filestore_journal_trailing), m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead), m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold), - m_filestore_sync_flush(g_conf->filestore_sync_flush), - m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds), - m_filestore_flush_min(g_conf->filestore_flush_min), m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval), m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval), m_filestore_fail_eio(g_conf->filestore_fail_eio), @@ -1068,6 +1065,7 @@ int FileStore::_detect_fs() #if defined(__linux__) if (st.f_type == BTRFS_SUPER_MAGIC) { dout(0) << "mount detected btrfs" << dendl; + wbthrottle.set_fs(WBThrottle::BTRFS); btrfs = true; btrfs_stable_commits = btrfs && m_filestore_btrfs_snap; @@ -1779,7 +1777,6 @@ int FileStore::mount() journal_start(); op_tp.start(); - flusher_thread.create(); op_finisher.start(); ondisk_finisher.start(); @@ -1817,11 +1814,9 @@ int FileStore::umount() lock.Lock(); stop = true; sync_cond.Signal(); - flusher_cond.Signal(); lock.Unlock(); sync_thread.join(); op_tp.stop(); - flusher_thread.join(); journal_stop(); @@ -1969,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o) void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) { + wbthrottle.throttle(); // inject a stall? if (g_conf->filestore_inject_stall) { int orig = g_conf->filestore_inject_stall; @@ -2952,38 +2948,7 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, r = bl.length(); // flush? - { - bool should_flush = (ssize_t)len >= m_filestore_flush_min; - bool local_flush = false; -#ifdef HAVE_SYNC_FILE_RANGE - bool async_done = false; - if (!should_flush || - !m_filestore_flusher || - !(async_done = queue_flusher(**fd, offset, len, replica))) { - if (should_flush && m_filestore_sync_flush) { - ::sync_file_range(**fd, offset, len, SYNC_FILE_RANGE_WRITE); - local_flush = true; - } - } - // TODOSAM: this will be fixed in a subsequent patch - //Both lfn_close() and possible posix_fadvise() done by flusher - //if (async_done) fd = -1; -#else - // no sync_file_range; (maybe) flush inline and close. - if (should_flush && m_filestore_sync_flush) { - ::fdatasync(**fd); - local_flush = true; - } -#endif - if (local_flush && replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } + wbthrottle.queue_wb(fd, oid, offset, len, replica); lfn_close(fd); out: @@ -3277,90 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t return r; } - -bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica) -{ - bool queued; - lock.Lock(); - if (flusher_queue_len < m_filestore_flusher_max_fds) { - flusher_queue.push_back(sync_epoch); - flusher_queue.push_back(fd); - flusher_queue.push_back(off); - flusher_queue.push_back(len); - flusher_queue.push_back(replica); - flusher_queue_len++; - flusher_cond.Signal(); - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << dendl; - queued = true; - } else { - dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len - << " qlen " << flusher_queue_len - << " hit flusher_max_fds " << m_filestore_flusher_max_fds - << ", skipping async flush" << dendl; - queued = false; - } - lock.Unlock(); - return queued; -} - -void FileStore::flusher_entry() -{ - lock.Lock(); - dout(20) << "flusher_entry start" << dendl; - while (true) { - if (!flusher_queue.empty()) { -#ifdef HAVE_SYNC_FILE_RANGE - list q; - q.swap(flusher_queue); - - int num = flusher_queue_len; // see how many we're taking, here - - lock.Unlock(); - while (!q.empty()) { - uint64_t ep = q.front(); - q.pop_front(); - int fd = q.front(); - q.pop_front(); - uint64_t off = q.front(); - q.pop_front(); - uint64_t len = q.front(); - q.pop_front(); - bool replica = q.front(); - q.pop_front(); - if (!stop && ep == sync_epoch) { - dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl; - ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE); - if (replica && m_filestore_replica_fadvise) { - int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED); - if (fa_r) { - dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl; - } else { - dout(10) << "posix_fadvise performed after local flush" << dendl; - } - } - } else - dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep - << ", sync_epoch=" << sync_epoch << ")" << dendl; - // TODOSAM: this will be replaced in a subsequent patch - //lfn_close(fd); - } - lock.Lock(); - flusher_queue_len -= num; // they're definitely closed, forget -#endif - } else { - if (stop) - break; - dout(20) << "flusher_entry sleeping" << dendl; - flusher_cond.Wait(lock); - dout(20) << "flusher_entry awoke" << dendl; - } - } - dout(20) << "flusher_entry finish" << dendl; - lock.Unlock(); -} - class SyncEntryTimeout : public Context { public: SyncEntryTimeout(int commit_timeo) @@ -3553,6 +3434,7 @@ void FileStore::sync_entry() logger->tinc(l_os_commit_len, dur); apply_manager.commit_finish(); + wbthrottle.clear(); logger->set(l_os_committing, 0); @@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const "filestore_queue_max_bytes", "filestore_queue_committing_max_ops", "filestore_queue_committing_max_bytes", - "filestore_flusher", - "filestore_flusher_max_fds", - "filestore_sync_flush", "filestore_commit_timeout", "filestore_dump_file", "filestore_kill_at", @@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, changed.count("filestore_queue_max_bytes") || changed.count("filestore_queue_committing_max_ops") || changed.count("filestore_queue_committing_max_bytes") || - changed.count("filestore_flusher") || - changed.count("filestore_flusher_max_fds") || - changed.count("filestore_flush_min") || changed.count("filestore_kill_at") || changed.count("filestore_fail_eio") || changed.count("filestore_replica_fadvise")) { @@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf, m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes; m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops; m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes; - m_filestore_flusher = conf->filestore_flusher; - m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds; - m_filestore_flush_min = conf->filestore_flush_min; - m_filestore_sync_flush = conf->filestore_sync_flush; m_filestore_kill_at.set(conf->filestore_kill_at); m_filestore_fail_eio = conf->filestore_fail_eio; m_filestore_replica_fadvise = conf->filestore_replica_fadvise; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 00249f274c1..78668dd92a4 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -41,6 +41,7 @@ using namespace __gnu_cxx; #include "ObjectMap.h" #include "SequencerPosition.h" #include "FDCache.h" +#include "WBThrottle.h" #include "include/uuid.h" @@ -201,6 +202,8 @@ private: Mutex fdcache_lock; FDCache fdcache; + WBThrottle wbthrottle; + Sequencer default_osr; deque op_queue; uint64_t op_queue_len, op_queue_bytes; @@ -253,24 +256,8 @@ private: void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend class C_JournaledAhead; - // flusher thread - Cond flusher_cond; - list flusher_queue; - int flusher_queue_len; - void flusher_entry(); - struct FlusherThread : public Thread { - FileStore *fs; - FlusherThread(FileStore *f) : fs(f) {} - void *entry() { - fs->flusher_entry(); - return 0; - } - } flusher_thread; - bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica); - int open_journal(); - PerfCounters *logger; public: @@ -514,15 +501,11 @@ private: bool m_filestore_btrfs_snap; float m_filestore_commit_timeout; bool m_filestore_fiemap; - bool m_filestore_flusher; bool m_filestore_fsync_flushes_journal_data; bool m_filestore_journal_parallel; bool m_filestore_journal_trailing; bool m_filestore_journal_writeahead; int m_filestore_fiemap_threshold; - bool m_filestore_sync_flush; - int m_filestore_flusher_max_fds; - int m_filestore_flush_min; double m_filestore_max_sync_interval; double m_filestore_min_sync_interval; bool m_filestore_fail_eio; -- cgit v1.2.1 From 08c39b84308f17afef776e08c09be3faa2b9eab4 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 22 May 2013 14:37:42 -0700 Subject: ReplicatedPG::submit_push_complete don't remove the head object The object would have had to have been removed already. With fd caching, this extra remove might check the wrong replay_guard since the fd caching mechanism assumes that between any operation on an hobject_t oid and a remove operation, all operations on that hobject_t must refer to the same inode. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 8f463098790..ab4da3ec314 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5283,7 +5283,6 @@ void ReplicatedPG::submit_push_data( void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { - remove_snap_mapped_object(*t, recovery_info.soid); t->collection_move(coll, get_temp_coll(t), recovery_info.soid); for (map >::const_iterator p = recovery_info.clone_subset.begin(); -- cgit v1.2.1 From 4d53e9c9409eb512c2e6f326d28fe12f788daaa7 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 28 May 2013 10:41:52 -0700 Subject: WBThrottle: add perfcounters Signed-off-by: Samuel Just --- src/os/WBThrottle.cc | 28 ++++++++++++++++++++++++++++ src/os/WBThrottle.h | 12 ++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc index 24f7730c47f..12186445bc1 100644 --- a/src/os/WBThrottle.cc +++ b/src/os/WBThrottle.cc @@ -2,10 +2,12 @@ // vim: ts=8 sw=2 smarttab #include "os/WBThrottle.h" +#include "common/perf_counters.h" WBThrottle::WBThrottle(CephContext *cct) : cur_ios(0), cur_size(0), cct(cct), + logger(NULL), stopping(false), lock("WBThrottle::lock", false, true, false, cct), fs(XFS) @@ -15,6 +17,20 @@ WBThrottle::WBThrottle(CephContext *cct) : set_from_conf(); } assert(cct); + PerfCountersBuilder b( + cct, string("WBThrottle"), + l_wbthrottle_first, l_wbthrottle_last); + b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied"); + b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb"); + b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied"); + b.add_u64(l_wbthrottle_ios_wb, "ios_wb"); + b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied"); + b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i) + logger->set(i, 0); + cct->_conf->add_observer(this); create(); } @@ -27,6 +43,8 @@ WBThrottle::~WBThrottle() { cond.Signal(); } join(); + cct->get_perfcounters_collection()->remove(logger); + delete logger; cct->_conf->remove_observer(this); } @@ -133,7 +151,10 @@ void *WBThrottle::entry() lock.Lock(); clearing = hobject_t(); cur_ios -= wb.get<2>().ios; + logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios); cur_size -= wb.get<2>().size; + logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size); + logger->dec(l_wbthrottle_inodes_dirtied); cond.Signal(); wb = boost::tuple(); } @@ -153,12 +174,16 @@ void WBThrottle::queue_wb( make_pair( PendingWB(), fd))).first; + logger->inc(l_wbthrottle_inodes_dirtied); } else { remove_object(hoid); } cur_ios++; + logger->inc(l_wbthrottle_ios_dirtied); cur_size += len; + logger->inc(l_wbthrottle_bytes_dirtied, len); + wbiter->second.first.add(replica, len, 1); insert_object(hoid); cond.Signal(); @@ -172,7 +197,10 @@ void WBThrottle::clear() i != pending_wbs.end(); ++i) { cur_ios -= i->second.first.ios; + logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios); cur_size -= i->second.first.size; + logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size); + logger->dec(l_wbthrottle_inodes_dirtied); } pending_wbs.clear(); lru.clear(); diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h index 2ac7234bd44..a188855b268 100644 --- a/src/os/WBThrottle.h +++ b/src/os/WBThrottle.h @@ -26,6 +26,17 @@ #include "common/Thread.h" #include "common/ceph_context.h" +class PerfCounters; +enum { + l_wbthrottle_first = 999090, + l_wbthrottle_bytes_dirtied, + l_wbthrottle_bytes_wb, + l_wbthrottle_ios_dirtied, + l_wbthrottle_ios_wb, + l_wbthrottle_inodes_dirtied, + l_wbthrottle_inodes_wb, + l_wbthrottle_last +}; /** * WBThrottle @@ -65,6 +76,7 @@ class WBThrottle : Thread, public md_config_obs_t { }; CephContext *cct; + PerfCounters *logger; bool stopping; Mutex lock; Cond cond; -- cgit v1.2.1 From 1c35556b56dd531b584558e44eff3ea5e650cc30 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 24 May 2013 13:35:14 -0700 Subject: doc/dev/osd_internals: add wbthrottle.rst Signed-off-by: Samuel Just --- doc/dev/osd_internals/wbthrottle.rst | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 doc/dev/osd_internals/wbthrottle.rst diff --git a/doc/dev/osd_internals/wbthrottle.rst b/doc/dev/osd_internals/wbthrottle.rst new file mode 100644 index 00000000000..14ba0140d4d --- /dev/null +++ b/doc/dev/osd_internals/wbthrottle.rst @@ -0,0 +1,28 @@ +================== +Writeback Throttle +================== + +Previously, the filestore had a problem when handling large numbers of +small ios. We throttle dirty data implicitely via the journal, but +a large number of inodes can be dirtied without filling the journal +resulting in a very long sync time when the sync finally does happen. +The flusher was not an adequate solution to this problem since it +forced writeback of small writes too eagerly killing performance. + +WBThrottle tracks unflushed io per hobject_t and ::fsyncs in lru +order once the start_flusher threshhold is exceeded for any of +dirty bytes, dirty ios, or dirty inodes. While any of these exceed +the hard_limit, we block on throttle() in _do_op. + +See src/os/WBThrottle.h, src/osd/WBThrottle.cc + +To track the open FDs through the writeback process, there is now an +fdcache to cache open fds. lfn_open now returns a cached FDRef which +implicitely closes the fd once all references have expired. + +Filestore syncs have a sideeffect of flushing all outstanding objects +in the wbthrottle. + +lfn_unlink clears the cached FDRef and wbthrottle entries for the +unlinked object when then last link is removed and asserts that all +outstanding FDRefs for that object are dead. -- cgit v1.2.1 From 4b31c7e7929bffa530ded0a695c25f8c1b0f7774 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 29 May 2013 15:05:34 -0700 Subject: WBThrottle: rename replica nocache We may want to influence the caching behavior for other reasons. Signed-off-by: Samuel Just --- src/os/WBThrottle.cc | 6 +++--- src/os/WBThrottle.h | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc index 12186445bc1..23e24765cc2 100644 --- a/src/os/WBThrottle.cc +++ b/src/os/WBThrottle.cc @@ -146,7 +146,7 @@ void *WBThrottle::entry() clearing = wb.get<0>(); lock.Unlock(); ::fsync(**wb.get<1>()); - if (wb.get<2>().replica) + if (wb.get<2>().nocache) posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); lock.Lock(); clearing = hobject_t(); @@ -163,7 +163,7 @@ void *WBThrottle::entry() void WBThrottle::queue_wb( FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len, - bool replica) + bool nocache) { Mutex::Locker l(lock); map >::iterator wbiter = @@ -184,7 +184,7 @@ void WBThrottle::queue_wb( cur_size += len; logger->inc(l_wbthrottle_bytes_dirtied, len); - wbiter->second.first.add(replica, len, 1); + wbiter->second.first.add(nocache, len, 1); insert_object(hoid); cond.Signal(); } diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h index a188855b268..797a6e78246 100644 --- a/src/os/WBThrottle.h +++ b/src/os/WBThrottle.h @@ -63,13 +63,13 @@ class WBThrottle : Thread, public md_config_obs_t { */ class PendingWB { public: - bool replica; + bool nocache; uint64_t size; uint64_t ios; - PendingWB() : replica(true), size(0), ios(0) {} - void add(bool _replica, uint64_t _size, uint64_t _ios) { - if (!_replica) - replica = false; // only replica if all writes are replica + PendingWB() : nocache(true), size(0), ios(0) {} + void add(bool _nocache, uint64_t _size, uint64_t _ios) { + if (!_nocache) + nocache = false; // only nocache if all writes are nocache size += _size; ios += _ios; } @@ -143,7 +143,7 @@ public: const hobject_t &hoid, ///< [in] object uint64_t offset, ///< [in] offset written uint64_t len, ///< [in] length written - bool replica ///< [in] write is for replica + bool nocache ///< [in] try to clear out of cache after write ); /// Clear all wb (probably due to sync) -- cgit v1.2.1 From a55e03cdfe45754b2aff8110aa1a0518404f1218 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 29 May 2013 15:05:51 -0700 Subject: WBThrottle: add some comments and some asserts Signed-off-by: Samuel Just --- src/os/WBThrottle.cc | 2 ++ src/os/WBThrottle.h | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc index 23e24765cc2..4673488f833 100644 --- a/src/os/WBThrottle.cc +++ b/src/os/WBThrottle.cc @@ -206,6 +206,8 @@ void WBThrottle::clear() lru.clear(); rev_lru.clear(); cond.Signal(); + assert(cur_ios == 0); + assert(cur_size == 0); } void WBThrottle::clear_object(const hobject_t &hoid) diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h index 797a6e78246..070de08e123 100644 --- a/src/os/WBThrottle.h +++ b/src/os/WBThrottle.h @@ -46,6 +46,10 @@ enum { class WBThrottle : Thread, public md_config_obs_t { hobject_t clearing; + /* *_limits.first is the start_flusher limit and + * *_limits.second is the hard limit + */ + /// Limits on unflushed bytes pair size_limits; -- cgit v1.2.1