From 6670e2a73db8f59e2a6bac024afe43cd8a8a6715 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 14 May 2013 14:44:06 -0700 Subject: os/: Add WBThrottle Signed-off-by: Samuel Just --- src/Makefile.am | 4 +- src/common/config_opts.h | 14 ++++ src/os/WBThrottle.cc | 209 +++++++++++++++++++++++++++++++++++++++++++++++ src/os/WBThrottle.h | 155 +++++++++++++++++++++++++++++++++++ src/os/hobject.h | 3 + 5 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 src/os/WBThrottle.cc create mode 100644 src/os/WBThrottle.h diff --git a/src/Makefile.am b/src/Makefile.am index 9525fe63f4f..5e10c9eed25 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1490,7 +1490,8 @@ libos_a_SOURCES = \ os/IndexManager.cc \ os/FlatIndex.cc \ os/DBObjectMap.cc \ - os/LevelDBStore.cc + os/LevelDBStore.cc \ + os/WBThrottle.cc libos_a_CXXFLAGS= ${AM_CXXFLAGS} noinst_LIBRARIES += libos.a @@ -1974,6 +1975,7 @@ noinst_HEADERS = \ os/FlatIndex.h\ os/HashIndex.h\ os/FDCache.h\ + os/WBThrottle.h\ os/IndexManager.h\ os/Journal.h\ os/JournalingObjectStore.h\ diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 68c7bef08b3..27593cb9f9d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -474,6 +474,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) OPTION(filestore, OPT_BOOL, false) +/// filestore wb throttle limits +OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100) +OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000) +OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20) +OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20) +OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100) +OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10) +OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100) + // Tests index failure paths OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0) diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc new file mode 100644 index 00000000000..24f7730c47f --- /dev/null +++ b/src/os/WBThrottle.cc @@ -0,0 +1,209 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "os/WBThrottle.h" + +WBThrottle::WBThrottle(CephContext *cct) : + cur_ios(0), cur_size(0), + cct(cct), + stopping(false), + lock("WBThrottle::lock", false, true, false, cct), + fs(XFS) +{ + { + Mutex::Locker l(lock); + set_from_conf(); + } + assert(cct); + cct->_conf->add_observer(this); + create(); +} + +WBThrottle::~WBThrottle() { + assert(cct); + { + Mutex::Locker l(lock); + stopping = true; + cond.Signal(); + } + join(); + cct->_conf->remove_observer(this); +} + +const char** WBThrottle::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "filestore_wbthrottle_btrfs_bytes_start_flusher", + "filestore_wbthrottle_btrfs_bytes_hard_limit", + "filestore_wbthrottle_btrfs_ios_start_flusher", + "filestore_wbthrottle_btrfs_ios_hard_limit", + "filestore_wbthrottle_btrfs_inodes_start_flusher", + "filestore_wbthrottle_btrfs_inodes_hard_limit", + "filestore_wbthrottle_xfs_bytes_start_flusher", + "filestore_wbthrottle_xfs_bytes_hard_limit", + "filestore_wbthrottle_xfs_ios_start_flusher", + "filestore_wbthrottle_xfs_ios_hard_limit", + "filestore_wbthrottle_xfs_inodes_start_flusher", + "filestore_wbthrottle_xfs_inodes_hard_limit", + NULL + }; + return KEYS; +} + +void WBThrottle::set_from_conf() +{ + assert(lock.is_locked()); + if (fs == BTRFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; + } else if (fs == XFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; + } else { + assert(0 == "invalid value for fs"); + } + cond.Signal(); +} + +void WBThrottle::handle_conf_change(const md_config_t *conf, + const std::set &changed) +{ + Mutex::Locker l(lock); + for (const char** i = get_tracked_conf_keys(); *i; ++i) { + if (changed.count(*i)) { + set_from_conf(); + return; + } + } +} + +bool WBThrottle::get_next_should_flush( + boost::tuple *next) +{ + assert(lock.is_locked()); + assert(next); + while (!stopping && + cur_ios < io_limits.first && + pending_wbs.size() < fd_limits.first && + cur_size < size_limits.first) + cond.Wait(lock); + if (stopping) + return false; + assert(!pending_wbs.empty()); + hobject_t obj(pop_object()); + + map >::iterator i = + pending_wbs.find(obj); + *next = boost::make_tuple(obj, i->second.second, i->second.first); + pending_wbs.erase(i); + return true; +} + + +void *WBThrottle::entry() +{ + Mutex::Locker l(lock); + boost::tuple wb; + while (get_next_should_flush(&wb)) { + clearing = wb.get<0>(); + lock.Unlock(); + ::fsync(**wb.get<1>()); + if (wb.get<2>().replica) + posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); + lock.Lock(); + clearing = hobject_t(); + cur_ios -= wb.get<2>().ios; + cur_size -= wb.get<2>().size; + cond.Signal(); + wb = boost::tuple(); + } + return 0; +} + +void WBThrottle::queue_wb( + FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len, + bool replica) +{ + Mutex::Locker l(lock); + map >::iterator wbiter = + pending_wbs.find(hoid); + if (wbiter == pending_wbs.end()) { + wbiter = pending_wbs.insert( + make_pair(hoid, + make_pair( + PendingWB(), + fd))).first; + } else { + remove_object(hoid); + } + + cur_ios++; + cur_size += len; + wbiter->second.first.add(replica, len, 1); + insert_object(hoid); + cond.Signal(); +} + +void WBThrottle::clear() +{ + Mutex::Locker l(lock); + for (map >::iterator i = + pending_wbs.begin(); + i != pending_wbs.end(); + ++i) { + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + } + pending_wbs.clear(); + lru.clear(); + rev_lru.clear(); + cond.Signal(); +} + +void WBThrottle::clear_object(const hobject_t &hoid) +{ + Mutex::Locker l(lock); + while (clearing == hoid) + cond.Wait(lock); + map >::iterator i = + pending_wbs.find(hoid); + if (i == pending_wbs.end()) + return; + + cur_ios -= i->second.first.ios; + cur_size -= i->second.first.size; + + pending_wbs.erase(i); + remove_object(hoid); +} + +void WBThrottle::throttle() +{ + Mutex::Locker l(lock); + while (!stopping && !( + cur_ios < io_limits.second && + pending_wbs.size() < fd_limits.second && + cur_size < size_limits.second)) { + cond.Wait(lock); + } +} diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h new file mode 100644 index 00000000000..2ac7234bd44 --- /dev/null +++ b/src/os/WBThrottle.h @@ -0,0 +1,155 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef WBTHROTTLE_H +#define WBTHROTTLE_H + +#include +#include +#include +#include "include/buffer.h" +#include "common/Formatter.h" +#include "os/hobject.h" +#include "include/interval_set.h" +#include "FDCache.h" +#include "common/Thread.h" +#include "common/ceph_context.h" + + +/** + * WBThrottle + * + * Tracks, throttles, and flushes outstanding IO + */ +class WBThrottle : Thread, public md_config_obs_t { + hobject_t clearing; + + /// Limits on unflushed bytes + pair size_limits; + + /// Limits on unflushed ios + pair io_limits; + + /// Limits on unflushed objects + pair fd_limits; + + uint64_t cur_ios; /// Currently unflushed IOs + uint64_t cur_size; /// Currently unflushed bytes + + /** + * PendingWB tracks the ios pending on an object. + */ + class PendingWB { + public: + bool replica; + uint64_t size; + uint64_t ios; + PendingWB() : replica(true), size(0), ios(0) {} + void add(bool _replica, uint64_t _size, uint64_t _ios) { + if (!_replica) + replica = false; // only replica if all writes are replica + size += _size; + ios += _ios; + } + }; + + CephContext *cct; + bool stopping; + Mutex lock; + Cond cond; + + + /** + * Flush objects in lru order + */ + list lru; + map::iterator> rev_lru; + void remove_object(const hobject_t &hoid) { + assert(lock.is_locked()); + map::iterator>::iterator iter = + rev_lru.find(hoid); + if (iter == rev_lru.end()) + return; + + lru.erase(iter->second); + rev_lru.erase(iter); + } + hobject_t pop_object() { + assert(!lru.empty()); + hobject_t hoid(lru.front()); + lru.pop_front(); + rev_lru.erase(hoid); + return hoid; + } + void insert_object(const hobject_t &hoid) { + assert(rev_lru.find(hoid) == rev_lru.end()); + lru.push_back(hoid); + rev_lru.insert(make_pair(hoid, --lru.end())); + } + + map > pending_wbs; + + /// get next flush to perform + bool get_next_should_flush( + boost::tuple *next ///< [out] next to flush + ); ///< @return false if we are shutting down +public: + enum FS { + BTRFS, + XFS + }; + +private: + FS fs; + + void set_from_conf(); +public: + WBThrottle(CephContext *cct); + ~WBThrottle(); + + /// Set fs as XFS or BTRFS + void set_fs(FS new_fs) { + Mutex::Locker l(lock); + fs = new_fs; + set_from_conf(); + } + + /// Queue wb on hoid, fd taking throttle (does not block) + void queue_wb( + FDRef fd, ///< [in] FDRef to hoid + const hobject_t &hoid, ///< [in] object + uint64_t offset, ///< [in] offset written + uint64_t len, ///< [in] length written + bool replica ///< [in] write is for replica + ); + + /// Clear all wb (probably due to sync) + void clear(); + + /// Clear object + void clear_object(const hobject_t &hoid); + + /// Block until there is throttle available + void throttle(); + + /// md_config_obs_t + const char** get_tracked_conf_keys() const; + void handle_conf_change(const md_config_t *conf, + const std::set &changed); + + /// Thread + void *entry(); +}; + +#endif diff --git a/src/os/hobject.h b/src/os/hobject.h index 28e0da0d82a..47fcb3dda39 100644 --- a/src/os/hobject.h +++ b/src/os/hobject.h @@ -15,6 +15,9 @@ #ifndef __CEPH_OS_HOBJECT_H #define __CEPH_OS_HOBJECT_H +#include +#include "include/types.h" +#include "include/rados.h" #include "include/object.h" #include "include/cmp.h" -- cgit v1.2.1