summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am4
-rw-r--r--src/common/config_opts.h14
-rw-r--r--src/os/WBThrottle.cc209
-rw-r--r--src/os/WBThrottle.h155
-rw-r--r--src/os/hobject.h3
5 files changed, 384 insertions, 1 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 9525fe63f4f..5e10c9eed25 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1490,7 +1490,8 @@ libos_a_SOURCES = \
os/IndexManager.cc \
os/FlatIndex.cc \
os/DBObjectMap.cc \
- os/LevelDBStore.cc
+ os/LevelDBStore.cc \
+ os/WBThrottle.cc
libos_a_CXXFLAGS= ${AM_CXXFLAGS}
noinst_LIBRARIES += libos.a
@@ -1974,6 +1975,7 @@ noinst_HEADERS = \
os/FlatIndex.h\
os/HashIndex.h\
os/FDCache.h\
+ os/WBThrottle.h\
os/IndexManager.h\
os/Journal.h\
os/JournalingObjectStore.h\
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 68c7bef08b3..27593cb9f9d 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -474,6 +474,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
OPTION(filestore, OPT_BOOL, false)
+/// filestore wb throttle limits
+OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100)
+OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100)
+
// Tests index failure paths
OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0)
diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc
new file mode 100644
index 00000000000..24f7730c47f
--- /dev/null
+++ b/src/os/WBThrottle.cc
@@ -0,0 +1,209 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "os/WBThrottle.h"
+
+WBThrottle::WBThrottle(CephContext *cct) :
+ cur_ios(0), cur_size(0),
+ cct(cct),
+ stopping(false),
+ lock("WBThrottle::lock", false, true, false, cct),
+ fs(XFS)
+{
+ {
+ Mutex::Locker l(lock);
+ set_from_conf();
+ }
+ assert(cct);
+ cct->_conf->add_observer(this);
+ create();
+}
+
+WBThrottle::~WBThrottle() {
+ assert(cct);
+ {
+ Mutex::Locker l(lock);
+ stopping = true;
+ cond.Signal();
+ }
+ join();
+ cct->_conf->remove_observer(this);
+}
+
+const char** WBThrottle::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "filestore_wbthrottle_btrfs_bytes_start_flusher",
+ "filestore_wbthrottle_btrfs_bytes_hard_limit",
+ "filestore_wbthrottle_btrfs_ios_start_flusher",
+ "filestore_wbthrottle_btrfs_ios_hard_limit",
+ "filestore_wbthrottle_btrfs_inodes_start_flusher",
+ "filestore_wbthrottle_btrfs_inodes_hard_limit",
+ "filestore_wbthrottle_xfs_bytes_start_flusher",
+ "filestore_wbthrottle_xfs_bytes_hard_limit",
+ "filestore_wbthrottle_xfs_ios_start_flusher",
+ "filestore_wbthrottle_xfs_ios_hard_limit",
+ "filestore_wbthrottle_xfs_inodes_start_flusher",
+ "filestore_wbthrottle_xfs_inodes_hard_limit",
+ NULL
+ };
+ return KEYS;
+}
+
+void WBThrottle::set_from_conf()
+{
+ assert(lock.is_locked());
+ if (fs == BTRFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
+ } else if (fs == XFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
+ } else {
+ assert(0 == "invalid value for fs");
+ }
+ cond.Signal();
+}
+
+void WBThrottle::handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed)
+{
+ Mutex::Locker l(lock);
+ for (const char** i = get_tracked_conf_keys(); *i; ++i) {
+ if (changed.count(*i)) {
+ set_from_conf();
+ return;
+ }
+ }
+}
+
+bool WBThrottle::get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next)
+{
+ assert(lock.is_locked());
+ assert(next);
+ while (!stopping &&
+ cur_ios < io_limits.first &&
+ pending_wbs.size() < fd_limits.first &&
+ cur_size < size_limits.first)
+ cond.Wait(lock);
+ if (stopping)
+ return false;
+ assert(!pending_wbs.empty());
+ hobject_t obj(pop_object());
+
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(obj);
+ *next = boost::make_tuple(obj, i->second.second, i->second.first);
+ pending_wbs.erase(i);
+ return true;
+}
+
+
+void *WBThrottle::entry()
+{
+ Mutex::Locker l(lock);
+ boost::tuple<hobject_t, FDRef, PendingWB> wb;
+ while (get_next_should_flush(&wb)) {
+ clearing = wb.get<0>();
+ lock.Unlock();
+ ::fsync(**wb.get<1>());
+ if (wb.get<2>().replica)
+ posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
+ lock.Lock();
+ clearing = hobject_t();
+ cur_ios -= wb.get<2>().ios;
+ cur_size -= wb.get<2>().size;
+ cond.Signal();
+ wb = boost::tuple<hobject_t, FDRef, PendingWB>();
+ }
+ return 0;
+}
+
+void WBThrottle::queue_wb(
+ FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len,
+ bool replica)
+{
+ Mutex::Locker l(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
+ pending_wbs.find(hoid);
+ if (wbiter == pending_wbs.end()) {
+ wbiter = pending_wbs.insert(
+ make_pair(hoid,
+ make_pair(
+ PendingWB(),
+ fd))).first;
+ } else {
+ remove_object(hoid);
+ }
+
+ cur_ios++;
+ cur_size += len;
+ wbiter->second.first.add(replica, len, 1);
+ insert_object(hoid);
+ cond.Signal();
+}
+
+void WBThrottle::clear()
+{
+ Mutex::Locker l(lock);
+ for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.begin();
+ i != pending_wbs.end();
+ ++i) {
+ cur_ios -= i->second.first.ios;
+ cur_size -= i->second.first.size;
+ }
+ pending_wbs.clear();
+ lru.clear();
+ rev_lru.clear();
+ cond.Signal();
+}
+
+void WBThrottle::clear_object(const hobject_t &hoid)
+{
+ Mutex::Locker l(lock);
+ while (clearing == hoid)
+ cond.Wait(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(hoid);
+ if (i == pending_wbs.end())
+ return;
+
+ cur_ios -= i->second.first.ios;
+ cur_size -= i->second.first.size;
+
+ pending_wbs.erase(i);
+ remove_object(hoid);
+}
+
+void WBThrottle::throttle()
+{
+ Mutex::Locker l(lock);
+ while (!stopping && !(
+ cur_ios < io_limits.second &&
+ pending_wbs.size() < fd_limits.second &&
+ cur_size < size_limits.second)) {
+ cond.Wait(lock);
+ }
+}
diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h
new file mode 100644
index 00000000000..2ac7234bd44
--- /dev/null
+++ b/src/os/WBThrottle.h
@@ -0,0 +1,155 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef WBTHROTTLE_H
+#define WBTHROTTLE_H
+
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <tr1/memory>
+#include "include/buffer.h"
+#include "common/Formatter.h"
+#include "os/hobject.h"
+#include "include/interval_set.h"
+#include "FDCache.h"
+#include "common/Thread.h"
+#include "common/ceph_context.h"
+
+
+/**
+ * WBThrottle
+ *
+ * Tracks, throttles, and flushes outstanding IO
+ */
+class WBThrottle : Thread, public md_config_obs_t {
+ hobject_t clearing;
+
+ /// Limits on unflushed bytes
+ pair<uint64_t, uint64_t> size_limits;
+
+ /// Limits on unflushed ios
+ pair<uint64_t, uint64_t> io_limits;
+
+ /// Limits on unflushed objects
+ pair<uint64_t, uint64_t> fd_limits;
+
+ uint64_t cur_ios; /// Currently unflushed IOs
+ uint64_t cur_size; /// Currently unflushed bytes
+
+ /**
+ * PendingWB tracks the ios pending on an object.
+ */
+ class PendingWB {
+ public:
+ bool replica;
+ uint64_t size;
+ uint64_t ios;
+ PendingWB() : replica(true), size(0), ios(0) {}
+ void add(bool _replica, uint64_t _size, uint64_t _ios) {
+ if (!_replica)
+ replica = false; // only replica if all writes are replica
+ size += _size;
+ ios += _ios;
+ }
+ };
+
+ CephContext *cct;
+ bool stopping;
+ Mutex lock;
+ Cond cond;
+
+
+ /**
+ * Flush objects in lru order
+ */
+ list<hobject_t> lru;
+ map<hobject_t, list<hobject_t>::iterator> rev_lru;
+ void remove_object(const hobject_t &hoid) {
+ assert(lock.is_locked());
+ map<hobject_t, list<hobject_t>::iterator>::iterator iter =
+ rev_lru.find(hoid);
+ if (iter == rev_lru.end())
+ return;
+
+ lru.erase(iter->second);
+ rev_lru.erase(iter);
+ }
+ hobject_t pop_object() {
+ assert(!lru.empty());
+ hobject_t hoid(lru.front());
+ lru.pop_front();
+ rev_lru.erase(hoid);
+ return hoid;
+ }
+ void insert_object(const hobject_t &hoid) {
+ assert(rev_lru.find(hoid) == rev_lru.end());
+ lru.push_back(hoid);
+ rev_lru.insert(make_pair(hoid, --lru.end()));
+ }
+
+ map<hobject_t, pair<PendingWB, FDRef> > pending_wbs;
+
+ /// get next flush to perform
+ bool get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush
+ ); ///< @return false if we are shutting down
+public:
+ enum FS {
+ BTRFS,
+ XFS
+ };
+
+private:
+ FS fs;
+
+ void set_from_conf();
+public:
+ WBThrottle(CephContext *cct);
+ ~WBThrottle();
+
+ /// Set fs as XFS or BTRFS
+ void set_fs(FS new_fs) {
+ Mutex::Locker l(lock);
+ fs = new_fs;
+ set_from_conf();
+ }
+
+ /// Queue wb on hoid, fd taking throttle (does not block)
+ void queue_wb(
+ FDRef fd, ///< [in] FDRef to hoid
+ const hobject_t &hoid, ///< [in] object
+ uint64_t offset, ///< [in] offset written
+ uint64_t len, ///< [in] length written
+ bool replica ///< [in] write is for replica
+ );
+
+ /// Clear all wb (probably due to sync)
+ void clear();
+
+ /// Clear object
+ void clear_object(const hobject_t &hoid);
+
+ /// Block until there is throttle available
+ void throttle();
+
+ /// md_config_obs_t
+ const char** get_tracked_conf_keys() const;
+ void handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed);
+
+ /// Thread
+ void *entry();
+};
+
+#endif
diff --git a/src/os/hobject.h b/src/os/hobject.h
index 28e0da0d82a..47fcb3dda39 100644
--- a/src/os/hobject.h
+++ b/src/os/hobject.h
@@ -15,6 +15,9 @@
#ifndef __CEPH_OS_HOBJECT_H
#define __CEPH_OS_HOBJECT_H
+#include <string.h>
+#include "include/types.h"
+#include "include/rados.h"
#include "include/object.h"
#include "include/cmp.h"