summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/dev/osd_internals/osd_throttles.rst21
-rw-r--r--doc/dev/osd_internals/wbthrottle.rst28
-rw-r--r--src/Makefile.am5
-rw-r--r--src/common/config_opts.h19
-rw-r--r--src/common/shared_cache.hpp13
-rw-r--r--src/os/FDCache.h95
-rw-r--r--src/os/FileStore.cc364
-rw-r--r--src/os/FileStore.h41
-rw-r--r--src/os/WBThrottle.cc239
-rw-r--r--src/os/WBThrottle.h171
-rw-r--r--src/os/hobject.h3
-rw-r--r--src/osd/ReplicatedPG.cc1
12 files changed, 720 insertions, 280 deletions
diff --git a/doc/dev/osd_internals/osd_throttles.rst b/doc/dev/osd_internals/osd_throttles.rst
new file mode 100644
index 00000000000..4fa3044f986
--- /dev/null
+++ b/doc/dev/osd_internals/osd_throttles.rst
@@ -0,0 +1,21 @@
+ Messenger throttle (number and size)
+ |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+ FileStore op_queue throttle (number and size)
+ |--------------------------------------------------------|
+ WBThrottle
+ |---------------------------------------------------------------------------------------------------------|
+ Journal (size)
+ |-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+ |----------------------------------------------------------------------------------------------------> flushed ----------------> synced
+ |
+Op: Read Header --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> op_applied -------------------------------------------------------------> Complete
+ | |
+SubOp: --Messenger--> ReadHeader --DispatchQ--> OSD::_dispatch --OpWQ--> PG::do_request --journalq--> Journal --FileStore::OpWQ--> Apply Thread --Finisher--> sub_op_applied -
+ |
+ |-----------------------------> flushed ----------------> synced
+ |------------------------------------------------------------------------------------------|
+ Journal (size)
+ |---------------------------------|
+ WBThrottle
+ |-----------------------------------------------------|
+ FileStore op_queue throttle (number and size)
diff --git a/doc/dev/osd_internals/wbthrottle.rst b/doc/dev/osd_internals/wbthrottle.rst
new file mode 100644
index 00000000000..14ba0140d4d
--- /dev/null
+++ b/doc/dev/osd_internals/wbthrottle.rst
@@ -0,0 +1,28 @@
+==================
+Writeback Throttle
+==================
+
+Previously, the filestore had a problem when handling large numbers of
+small ios. We throttle dirty data implicitely via the journal, but
+a large number of inodes can be dirtied without filling the journal
+resulting in a very long sync time when the sync finally does happen.
+The flusher was not an adequate solution to this problem since it
+forced writeback of small writes too eagerly killing performance.
+
+WBThrottle tracks unflushed io per hobject_t and ::fsyncs in lru
+order once the start_flusher threshhold is exceeded for any of
+dirty bytes, dirty ios, or dirty inodes. While any of these exceed
+the hard_limit, we block on throttle() in _do_op.
+
+See src/os/WBThrottle.h, src/osd/WBThrottle.cc
+
+To track the open FDs through the writeback process, there is now an
+fdcache to cache open fds. lfn_open now returns a cached FDRef which
+implicitely closes the fd once all references have expired.
+
+Filestore syncs have a sideeffect of flushing all outstanding objects
+in the wbthrottle.
+
+lfn_unlink clears the cached FDRef and wbthrottle entries for the
+unlinked object when then last link is removed and asserts that all
+outstanding FDRefs for that object are dead.
diff --git a/src/Makefile.am b/src/Makefile.am
index 451548eb42f..5e176874b11 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1490,7 +1490,8 @@ libos_a_SOURCES = \
os/IndexManager.cc \
os/FlatIndex.cc \
os/DBObjectMap.cc \
- os/LevelDBStore.cc
+ os/LevelDBStore.cc \
+ os/WBThrottle.cc
libos_a_CXXFLAGS= ${AM_CXXFLAGS}
noinst_LIBRARIES += libos.a
@@ -1975,6 +1976,8 @@ noinst_HEADERS = \
os/FileStore.h\
os/FlatIndex.h\
os/HashIndex.h\
+ os/FDCache.h\
+ os/WBThrottle.h\
os/IndexManager.h\
os/Journal.h\
os/JournalingObjectStore.h\
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index dd2b1dcba1f..285f4d52335 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -478,6 +478,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
OPTION(filestore, OPT_BOOL, false)
+/// filestore wb throttle limits
+OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100)
+OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100)
+
// Tests index failure paths
OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0)
@@ -498,10 +512,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true)
OPTION(filestore_btrfs_clone_range, OPT_BOOL, true)
OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false)
OPTION(filestore_fiemap, OPT_BOOL, false) // (try to) use fiemap
-OPTION(filestore_flusher, OPT_BOOL, true)
-OPTION(filestore_flusher_max_fds, OPT_INT, 512)
-OPTION(filestore_flush_min, OPT_INT, 65536)
-OPTION(filestore_sync_flush, OPT_BOOL, false)
OPTION(filestore_journal_parallel, OPT_BOOL, false)
OPTION(filestore_journal_writeahead, OPT_BOOL, false)
OPTION(filestore_journal_trailing, OPT_BOOL, false)
@@ -518,6 +528,7 @@ OPTION(filestore_merge_threshold, OPT_INT, 10)
OPTION(filestore_split_multiple, OPT_INT, 2)
OPTION(filestore_update_to, OPT_INT, 1000)
OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor
+OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size
OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps
OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity
OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread
diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp
index 69a4c06dfbf..178d1001be3 100644
--- a/src/common/shared_cache.hpp
+++ b/src/common/shared_cache.hpp
@@ -85,12 +85,23 @@ public:
assert(weak_refs.empty());
}
+ void clear(K key) {
+ VPtr val; // release any ref we have after we drop the lock
+ {
+ Mutex::Locker l(lock);
+ if (weak_refs.count(key)) {
+ val = weak_refs[key].lock();
+ }
+ lru_remove(key);
+ }
+ }
+
void set_size(size_t new_size) {
list<VPtr> to_release;
{
Mutex::Locker l(lock);
max_size = new_size;
- trim_cache(to_release);
+ trim_cache(&to_release);
}
}
diff --git a/src/os/FDCache.h b/src/os/FDCache.h
new file mode 100644
index 00000000000..cf07f860aa5
--- /dev/null
+++ b/src/os/FDCache.h
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_FDCACHE_H
+#define CEPH_FDCACHE_H
+
+#include <memory>
+#include <errno.h>
+#include <cstdio>
+#include "hobject.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/shared_cache.hpp"
+#include "include/compat.h"
+
+/**
+ * FD Cache
+ */
+class FDCache : public md_config_obs_t {
+ /**
+ * FD
+ *
+ * Wrapper for an fd. Destructor closes the fd.
+ */
+ class FD {
+ public:
+ const int fd;
+ FD(int _fd) : fd(_fd) {
+ assert(_fd >= 0);
+ }
+ int operator*() const {
+ return fd;
+ }
+ ~FD() {
+ TEMP_FAILURE_RETRY(::close(fd));
+ }
+ };
+
+ SharedLRU<hobject_t, FD> registry;
+ CephContext *cct;
+public:
+ FDCache(CephContext *cct) : cct(cct) {
+ assert(cct);
+ cct->_conf->add_observer(this);
+ registry.set_size(cct->_conf->filestore_fd_cache_size);
+ }
+ ~FDCache() {
+ cct->_conf->remove_observer(this);
+ }
+ typedef std::tr1::shared_ptr<FD> FDRef;
+
+ FDRef lookup(const hobject_t &hoid) {
+ return registry.lookup(hoid);
+ }
+
+ FDRef add(const hobject_t &hoid, int fd) {
+ return registry.add(hoid, new FD(fd));
+ }
+
+ /// clear cached fd for hoid, subsequent lookups will get an empty FD
+ void clear(const hobject_t &hoid) {
+ registry.clear(hoid);
+ assert(!registry.lookup(hoid));
+ }
+
+ /// md_config_obs_t
+ const char** get_tracked_conf_keys() const {
+ static const char* KEYS[] = {
+ "filestore_fd_cache_size",
+ NULL
+ };
+ return KEYS;
+ }
+ void handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed) {
+ if (changed.count("filestore_fd_cache_size")) {
+ registry.set_size(conf->filestore_fd_cache_size);
+ }
+ }
+
+};
+typedef FDCache::FDRef FDRef;
+
+#endif
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index b32f2875f71..dca8a1bbfea 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -196,10 +196,22 @@ int FileStore::lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf)
return r;
}
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
+int FileStore::lfn_open(coll_t cid,
+ const hobject_t& oid,
+ bool create,
+ FDRef *outfd,
IndexedPath *path,
Index *index)
{
+ assert(outfd);
+ int flags = O_RDWR;
+ if (create)
+ flags |= O_CREAT;
+ Mutex::Locker l(fdcache_lock);
+ *outfd = fdcache.lookup(oid);
+ if (*outfd) {
+ return 0;
+ }
Index index2;
IndexedPath path2;
if (!path)
@@ -224,16 +236,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode
goto fail;
}
- r = ::open((*path)->path(), flags, mode);
+ r = ::open((*path)->path(), flags, 0644);
if (r < 0) {
r = -errno;
dout(10) << "error opening file " << (*path)->path() << " with flags="
- << flags << " and mode=" << mode << ": " << cpp_strerror(-r) << dendl;
+ << flags << ": " << cpp_strerror(-r) << dendl;
goto fail;
}
fd = r;
- if ((flags & O_CREAT) && (!exist)) {
+ if (create && (!exist)) {
r = (*index)->created(oid, (*path)->path());
if (r < 0) {
TEMP_FAILURE_RETRY(::close(fd));
@@ -242,31 +254,16 @@ int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode
goto fail;
}
}
- return fd;
+ *outfd = fdcache.add(oid, fd);
+ return 0;
fail:
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode, IndexedPath *path)
+void FileStore::lfn_close(FDRef fd)
{
- return lfn_open(cid, oid, flags, mode, path, 0);
-}
-
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode)
-{
- return lfn_open(cid, oid, flags, mode, 0, 0);
-}
-
-int FileStore::lfn_open(coll_t cid, const hobject_t& oid, int flags)
-{
- return lfn_open(cid, oid, flags, 0);
-}
-
-void FileStore::lfn_close(int fd)
-{
- TEMP_FAILURE_RETRY(::close(fd));
}
int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
@@ -324,6 +321,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o)
int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
const SequencerPosition &spos)
{
+ Mutex::Locker l(fdcache_lock);
Index index;
int r = get_index(cid, &index);
if (r < 0)
@@ -355,6 +353,8 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
if (g_conf->filestore_debug_inject_read_err) {
debug_obj_on_delete(o);
}
+ wbthrottle.clear_object(o); // should be only non-cache ref
+ fdcache.clear(o);
} else {
/* Ensure that replay of this op doesn't result in the object_map
* going away.
@@ -387,6 +387,9 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
sync_entry_timeo_lock("sync_entry_timeo_lock"),
timer(g_ceph_context, sync_entry_timeo_lock),
stop(false), sync_thread(this),
+ fdcache_lock("fdcache_lock"),
+ fdcache(g_ceph_context),
+ wbthrottle(g_ceph_context),
default_osr("default"),
op_queue_len(0), op_queue_bytes(0),
op_throttle_lock("FileStore::op_throttle_lock"),
@@ -394,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
- flusher_queue_len(0), flusher_thread(this),
logger(NULL),
read_error_lock("FileStore::read_error_lock"),
m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range),
m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ),
m_filestore_commit_timeout(g_conf->filestore_commit_timeout),
m_filestore_fiemap(g_conf->filestore_fiemap),
- m_filestore_flusher (g_conf->filestore_flusher ),
m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data),
m_filestore_journal_parallel(g_conf->filestore_journal_parallel ),
m_filestore_journal_trailing(g_conf->filestore_journal_trailing),
m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead),
m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold),
- m_filestore_sync_flush(g_conf->filestore_sync_flush),
- m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds),
- m_filestore_flush_min(g_conf->filestore_flush_min),
m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval),
m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval),
m_filestore_fail_eio(g_conf->filestore_fail_eio),
@@ -1067,6 +1065,7 @@ int FileStore::_detect_fs()
#if defined(__linux__)
if (st.f_type == BTRFS_SUPER_MAGIC) {
dout(0) << "mount detected btrfs" << dendl;
+ wbthrottle.set_fs(WBThrottle::BTRFS);
btrfs = true;
btrfs_stable_commits = btrfs && m_filestore_btrfs_snap;
@@ -1778,7 +1777,6 @@ int FileStore::mount()
journal_start();
op_tp.start();
- flusher_thread.create();
op_finisher.start();
ondisk_finisher.start();
@@ -1816,11 +1814,9 @@ int FileStore::umount()
lock.Lock();
stop = true;
sync_cond.Signal();
- flusher_cond.Signal();
lock.Unlock();
sync_thread.join();
op_tp.stop();
- flusher_thread.join();
journal_stop();
@@ -1968,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o)
void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
+ wbthrottle.throttle();
// inject a stall?
if (g_conf->filestore_inject_stall) {
int orig = g_conf->filestore_inject_stall;
@@ -2263,12 +2260,13 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos
if (!replaying || btrfs_stable_commits)
return 1;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl;
return 1; // if file does not exist, there is no guard, and we can replay.
}
- int ret = _check_replay_guard(fd, spos);
+ int ret = _check_replay_guard(**fd, spos);
lfn_close(fd);
return ret;
}
@@ -2762,22 +2760,24 @@ int FileStore::read(
dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl;
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: " << cpp_strerror(fd) << dendl;
- return fd;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
+ dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: "
+ << cpp_strerror(r) << dendl;
+ return r;
}
if (len == 0) {
struct stat st;
memset(&st, 0, sizeof(struct stat));
- int r = ::fstat(fd, &st);
+ int r = ::fstat(**fd, &st);
assert(r == 0);
len = st.st_size;
}
bufferptr bptr(len); // prealloc space for entire read
- got = safe_pread(fd, bptr.c_str(), len, offset);
+ got = safe_pread(**fd, bptr.c_str(), len, offset);
if (got < 0) {
dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
lfn_close(fd);
@@ -2815,15 +2815,14 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid,
dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl;
- int r;
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- r = fd;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
} else {
uint64_t i;
- r = do_fiemap(fd, offset, len, &fiemap);
+ r = do_fiemap(**fd, offset, len, &fiemap);
if (r < 0)
goto done;
@@ -2865,10 +2864,10 @@ int FileStore::fiemap(coll_t cid, const hobject_t& oid,
}
done:
- if (fd >= 0)
+ if (r >= 0) {
lfn_close(fd);
- if (r >= 0)
::encode(exomap, bl);
+ }
dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << exomap.size() << " " << exomap << dendl;
free(fiemap);
@@ -2899,14 +2898,13 @@ int FileStore::_touch(coll_t cid, const hobject_t& oid)
{
dout(15) << "touch " << cid << "/" << oid << dendl;
- int flags = O_WRONLY|O_CREAT;
- int fd = lfn_open(cid, oid, flags, 0644);
- int r;
- if (fd >= 0) {
+ FDRef fd;
+ int r = lfn_open(cid, oid, true, &fd);
+ if (r < 0) {
+ return r;
+ } else {
lfn_close(fd);
- r = 0;
- } else
- r = fd;
+ }
dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl;
return r;
}
@@ -2920,17 +2918,17 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
int64_t actual;
- int flags = O_WRONLY|O_CREAT;
- int fd = lfn_open(cid, oid, flags, 0644);
- if (fd < 0) {
- r = fd;
- dout(0) << "write couldn't open " << cid << "/" << oid << " flags " << flags << ": "
+ FDRef fd;
+ r = lfn_open(cid, oid, true, &fd);
+ if (r < 0) {
+ dout(0) << "write couldn't open " << cid << "/"
+ << oid << ": "
<< cpp_strerror(r) << dendl;
goto out;
}
// seek
- actual = ::lseek64(fd, offset, SEEK_SET);
+ actual = ::lseek64(**fd, offset, SEEK_SET);
if (actual < 0) {
r = -errno;
dout(0) << "write lseek64 to " << offset << " failed: " << cpp_strerror(r) << dendl;
@@ -2945,43 +2943,13 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
}
// write
- r = bl.write_fd(fd);
+ r = bl.write_fd(**fd);
if (r == 0)
r = bl.length();
// flush?
- {
- bool should_flush = (ssize_t)len >= m_filestore_flush_min;
- bool local_flush = false;
-#ifdef HAVE_SYNC_FILE_RANGE
- bool async_done = false;
- if (!should_flush ||
- !m_filestore_flusher ||
- !(async_done = queue_flusher(fd, offset, len, replica))) {
- if (should_flush && m_filestore_sync_flush) {
- ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE);
- local_flush = true;
- }
- }
- //Both lfn_close() and possible posix_fadvise() done by flusher
- if (async_done) fd = -1;
-#else
- // no sync_file_range; (maybe) flush inline and close.
- if (should_flush && m_filestore_sync_flush) {
- ::fdatasync(fd);
- local_flush = true;
- }
-#endif
- if (local_flush && replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- }
- if (fd >= 0) lfn_close(fd);
+ wbthrottle.queue_wb(fd, oid, offset, len, replica);
+ lfn_close(fd);
out:
dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
@@ -2996,14 +2964,14 @@ int FileStore::_zero(coll_t cid, const hobject_t& oid, uint64_t offset, size_t l
#ifdef CEPH_HAVE_FALLOCATE
# if !defined(DARWIN) && !defined(__FreeBSD__)
// first try to punch a hole.
- int fd = lfn_open(cid, oid, O_RDONLY);
- if (fd < 0) {
- ret = -errno;
+ FDRef fd;
+ ret = lfn_open(cid, oid, false, &fd);
+ if (ret < 0) {
goto out;
}
// first try fallocate
- ret = fallocate(fd, FALLOC_FL_PUNCH_HOLE, offset, len);
+ ret = fallocate(**fd, FALLOC_FL_PUNCH_HOLE, offset, len);
if (ret < 0)
ret = -errno;
lfn_close(fd);
@@ -3039,23 +3007,26 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
if (_check_replay_guard(cid, newoid, spos) < 0)
return 0;
- int o, n, r;
+ int r;
+ FDRef o, n;
{
Index index;
IndexedPath from, to;
- o = lfn_open(cid, oldoid, O_RDONLY, 0, &from, &index);
- if (o < 0) {
- r = o;
+ r = lfn_open(cid, oldoid, false, &o, &from, &index);
+ if (r < 0) {
goto out2;
}
- n = lfn_open(cid, newoid, O_CREAT|O_TRUNC|O_WRONLY, 0644, &to, &index);
- if (n < 0) {
- r = n;
+ r = lfn_open(cid, newoid, true, &n, &to, &index);
+ if (r < 0) {
+ goto out;
+ }
+ r = ::ftruncate(**n, 0);
+ if (r < 0) {
goto out;
}
struct stat st;
- ::fstat(o, &st);
- r = _do_clone_range(o, n, 0, st.st_size, 0);
+ ::fstat(**o, &st);
+ r = _do_clone_range(**o, **n, 0, st.st_size, 0);
if (r < 0) {
r = -errno;
goto out3;
@@ -3068,17 +3039,17 @@ int FileStore::_clone(coll_t cid, const hobject_t& oldoid, const hobject_t& newo
{
map<string, bufferptr> aset;
- r = _fgetattrs(o, aset, false);
+ r = _fgetattrs(**o, aset, false);
if (r < 0)
goto out3;
- r = _fsetattrs(n, aset);
+ r = _fsetattrs(**n, aset);
if (r < 0)
goto out3;
}
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos, &newoid);
+ _set_replay_guard(**n, spos, &newoid);
out3:
lfn_close(n);
@@ -3248,21 +3219,19 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
return 0;
int r;
- int o, n;
- o = lfn_open(cid, oldoid, O_RDONLY);
- if (o < 0) {
- r = o;
+ FDRef o, n;
+ r = lfn_open(cid, oldoid, false, &o);
+ if (r < 0) {
goto out2;
}
- n = lfn_open(cid, newoid, O_CREAT|O_WRONLY, 0644);
- if (n < 0) {
- r = n;
+ r = lfn_open(cid, newoid, true, &n);
+ if (r < 0) {
goto out;
}
- r = _do_clone_range(o, n, srcoff, len, dstoff);
+ r = _do_clone_range(**o, **n, srcoff, len, dstoff);
// clone is non-idempotent; record our work.
- _set_replay_guard(n, spos, &newoid);
+ _set_replay_guard(**n, spos, &newoid);
lfn_close(n);
out:
@@ -3273,89 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
return r;
}
-
-bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica)
-{
- bool queued;
- lock.Lock();
- if (flusher_queue_len < m_filestore_flusher_max_fds) {
- flusher_queue.push_back(sync_epoch);
- flusher_queue.push_back(fd);
- flusher_queue.push_back(off);
- flusher_queue.push_back(len);
- flusher_queue.push_back(replica);
- flusher_queue_len++;
- flusher_cond.Signal();
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << dendl;
- queued = true;
- } else {
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
- << " qlen " << flusher_queue_len
- << " hit flusher_max_fds " << m_filestore_flusher_max_fds
- << ", skipping async flush" << dendl;
- queued = false;
- }
- lock.Unlock();
- return queued;
-}
-
-void FileStore::flusher_entry()
-{
- lock.Lock();
- dout(20) << "flusher_entry start" << dendl;
- while (true) {
- if (!flusher_queue.empty()) {
-#ifdef HAVE_SYNC_FILE_RANGE
- list<uint64_t> q;
- q.swap(flusher_queue);
-
- int num = flusher_queue_len; // see how many we're taking, here
-
- lock.Unlock();
- while (!q.empty()) {
- uint64_t ep = q.front();
- q.pop_front();
- int fd = q.front();
- q.pop_front();
- uint64_t off = q.front();
- q.pop_front();
- uint64_t len = q.front();
- q.pop_front();
- bool replica = q.front();
- q.pop_front();
- if (!stop && ep == sync_epoch) {
- dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl;
- ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE);
- if (replica && m_filestore_replica_fadvise) {
- int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED);
- if (fa_r) {
- dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
- } else {
- dout(10) << "posix_fadvise performed after local flush" << dendl;
- }
- }
- } else
- dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep
- << ", sync_epoch=" << sync_epoch << ")" << dendl;
- lfn_close(fd);
- }
- lock.Lock();
- flusher_queue_len -= num; // they're definitely closed, forget
-#endif
- } else {
- if (stop)
- break;
- dout(20) << "flusher_entry sleeping" << dendl;
- flusher_cond.Wait(lock);
- dout(20) << "flusher_entry awoke" << dendl;
- }
- }
- dout(20) << "flusher_entry finish" << dendl;
- lock.Unlock();
-}
-
class SyncEntryTimeout : public Context {
public:
SyncEntryTimeout(int commit_timeo)
@@ -3548,6 +3434,7 @@ void FileStore::sync_entry()
logger->tinc(l_os_commit_len, dur);
apply_manager.commit_finish();
+ wbthrottle.clear();
logger->set(l_os_committing, 0);
@@ -3856,15 +3743,14 @@ bool FileStore::debug_mdata_eio(const hobject_t &oid) {
int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, bufferptr &bp)
{
dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl;
- int r;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
- r = _fgetattr(fd, n, bp);
+ r = _fgetattr(**fd, n, bp);
lfn_close(fd);
if (r == -ENODATA && g_conf->filestore_xattr_use_omap) {
map<string, bufferlist> got;
@@ -3903,13 +3789,12 @@ int FileStore::getattr(coll_t cid, const hobject_t& oid, const char *name, buffe
int FileStore::getattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>& aset, bool user_only)
{
dout(15) << "getattrs " << cid << "/" << oid << dendl;
- int r;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
- r = _fgetattrs(fd, aset, user_only);
+ r = _fgetattrs(**fd, aset, user_only);
lfn_close(fd);
if (g_conf->filestore_xattr_use_omap) {
set<string> omap_attrs;
@@ -3967,14 +3852,13 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
set<string> omap_remove;
map<string, bufferptr> inline_set;
map<string, bufferptr> inline_to_set;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
if (g_conf->filestore_xattr_use_omap) {
- r = _fgetattrs(fd, inline_set, false);
+ r = _fgetattrs(**fd, inline_set, false);
assert(!m_filestore_fail_eio || r != -EIO);
}
dout(15) << "setattrs " << cid << "/" << oid << dendl;
@@ -3988,7 +3872,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
if (p->second.length() > g_conf->filestore_max_inline_xattr_size) {
if (inline_set.count(p->first)) {
inline_set.erase(p->first);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
goto out_close;
}
@@ -4000,7 +3884,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
inline_set.size() >= g_conf->filestore_max_inline_xattrs) {
if (inline_set.count(p->first)) {
inline_set.erase(p->first);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
goto out_close;
}
@@ -4015,7 +3899,7 @@ int FileStore::_setattrs(coll_t cid, const hobject_t& oid, map<string,bufferptr>
}
- r = _fsetattrs(fd, inline_to_set);
+ r = _fsetattrs(**fd, inline_to_set);
if (r < 0)
goto out_close;
@@ -4050,15 +3934,14 @@ int FileStore::_rmattr(coll_t cid, const hobject_t& oid, const char *name,
const SequencerPosition &spos)
{
dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r == -ENODATA && g_conf->filestore_xattr_use_omap) {
Index index;
r = get_index(cid, &index);
@@ -4088,18 +3971,17 @@ int FileStore::_rmattrs(coll_t cid, const hobject_t& oid,
dout(15) << "rmattrs " << cid << "/" << oid << dendl;
map<string,bufferptr> aset;
- int r = 0;
- int fd = lfn_open(cid, oid, 0);
- if (fd < 0) {
- r = -errno;
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd);
+ if (r < 0) {
goto out;
}
- r = _fgetattrs(fd, aset, false);
+ r = _fgetattrs(**fd, aset, false);
if (r >= 0) {
for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
char n[CHAIN_XATTR_MAX_NAME_LEN];
get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
- r = chain_fremovexattr(fd, n);
+ r = chain_fremovexattr(**fd, n);
if (r < 0)
break;
}
@@ -4687,21 +4569,21 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
// open guard on object so we don't any previous operations on the
// new name that will modify the source inode.
- int fd = lfn_open(oldcid, o, 0);
- if (fd < 0) {
+ FDRef fd;
+ int r = lfn_open(oldcid, o, 0, &fd);
+ if (r < 0) {
// the source collection/object does not exist. If we are replaying, we
// should be safe, so just return 0 and move on.
assert(replaying);
dout(10) << "collection_add " << c << "/" << o << " from "
- << oldcid << "/" << o << " (dne, continue replay) " << dendl;
+ << oldcid << "/" << o << " (dne, continue replay) " << dendl;
return 0;
}
- assert(fd >= 0);
if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
- _set_replay_guard(fd, spos, &o, true);
+ _set_replay_guard(**fd, spos, &o, true);
}
- int r = lfn_link(oldcid, c, o);
+ r = lfn_link(oldcid, c, o);
if (replaying && !btrfs_stable_commits &&
r == -EEXIST) // crashed between link() and set_replay_guard()
r = 0;
@@ -4710,7 +4592,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
// close guard on object so we don't do this again
if (r == 0) {
- _close_replay_guard(fd, spos);
+ _close_replay_guard(**fd, spos);
}
lfn_close(fd);
@@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const
"filestore_queue_max_bytes",
"filestore_queue_committing_max_ops",
"filestore_queue_committing_max_bytes",
- "filestore_flusher",
- "filestore_flusher_max_fds",
- "filestore_sync_flush",
"filestore_commit_timeout",
"filestore_dump_file",
"filestore_kill_at",
@@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
changed.count("filestore_queue_max_bytes") ||
changed.count("filestore_queue_committing_max_ops") ||
changed.count("filestore_queue_committing_max_bytes") ||
- changed.count("filestore_flusher") ||
- changed.count("filestore_flusher_max_fds") ||
- changed.count("filestore_flush_min") ||
changed.count("filestore_kill_at") ||
changed.count("filestore_fail_eio") ||
changed.count("filestore_replica_fadvise")) {
@@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
- m_filestore_flusher = conf->filestore_flusher;
- m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds;
- m_filestore_flush_min = conf->filestore_flush_min;
- m_filestore_sync_flush = conf->filestore_sync_flush;
m_filestore_kill_at.set(conf->filestore_kill_at);
m_filestore_fail_eio = conf->filestore_fail_eio;
m_filestore_replica_fadvise = conf->filestore_replica_fadvise;
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index d5ca2a4c237..78668dd92a4 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -40,6 +40,8 @@ using namespace __gnu_cxx;
#include "IndexManager.h"
#include "ObjectMap.h"
#include "SequencerPosition.h"
+#include "FDCache.h"
+#include "WBThrottle.h"
#include "include/uuid.h"
@@ -198,6 +200,10 @@ private:
friend ostream& operator<<(ostream& out, const OpSequencer& s);
+ Mutex fdcache_lock;
+ FDCache fdcache;
+ WBThrottle wbthrottle;
+
Sequencer default_osr;
deque<OpSequencer*> op_queue;
uint64_t op_queue_len, op_queue_bytes;
@@ -250,37 +256,22 @@ private:
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend class C_JournaledAhead;
- // flusher thread
- Cond flusher_cond;
- list<uint64_t> flusher_queue;
- int flusher_queue_len;
- void flusher_entry();
- struct FlusherThread : public Thread {
- FileStore *fs;
- FlusherThread(FileStore *f) : fs(f) {}
- void *entry() {
- fs->flusher_entry();
- return 0;
- }
- } flusher_thread;
- bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica);
-
int open_journal();
-
PerfCounters *logger;
public:
int lfn_find(coll_t cid, const hobject_t& oid, IndexedPath *path);
int lfn_truncate(coll_t cid, const hobject_t& oid, off_t length);
int lfn_stat(coll_t cid, const hobject_t& oid, struct stat *buf);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
- IndexedPath *path);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode,
- IndexedPath *path, Index *index);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags, mode_t mode);
- int lfn_open(coll_t cid, const hobject_t& oid, int flags);
- void lfn_close(int fd);
+ int lfn_open(
+ coll_t cid,
+ const hobject_t& oid,
+ bool create,
+ FDRef *outfd,
+ IndexedPath *path = 0,
+ Index *index = 0);
+ void lfn_close(FDRef fd);
int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ;
int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
@@ -510,15 +501,11 @@ private:
bool m_filestore_btrfs_snap;
float m_filestore_commit_timeout;
bool m_filestore_fiemap;
- bool m_filestore_flusher;
bool m_filestore_fsync_flushes_journal_data;
bool m_filestore_journal_parallel;
bool m_filestore_journal_trailing;
bool m_filestore_journal_writeahead;
int m_filestore_fiemap_threshold;
- bool m_filestore_sync_flush;
- int m_filestore_flusher_max_fds;
- int m_filestore_flush_min;
double m_filestore_max_sync_interval;
double m_filestore_min_sync_interval;
bool m_filestore_fail_eio;
diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc
new file mode 100644
index 00000000000..4673488f833
--- /dev/null
+++ b/src/os/WBThrottle.cc
@@ -0,0 +1,239 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "os/WBThrottle.h"
+#include "common/perf_counters.h"
+
+WBThrottle::WBThrottle(CephContext *cct) :
+ cur_ios(0), cur_size(0),
+ cct(cct),
+ logger(NULL),
+ stopping(false),
+ lock("WBThrottle::lock", false, true, false, cct),
+ fs(XFS)
+{
+ {
+ Mutex::Locker l(lock);
+ set_from_conf();
+ }
+ assert(cct);
+ PerfCountersBuilder b(
+ cct, string("WBThrottle"),
+ l_wbthrottle_first, l_wbthrottle_last);
+ b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied");
+ b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb");
+ b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied");
+ b.add_u64(l_wbthrottle_ios_wb, "ios_wb");
+ b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied");
+ b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb");
+ logger = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+ for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
+ logger->set(i, 0);
+
+ cct->_conf->add_observer(this);
+ create();
+}
+
+WBThrottle::~WBThrottle() {
+ assert(cct);
+ {
+ Mutex::Locker l(lock);
+ stopping = true;
+ cond.Signal();
+ }
+ join();
+ cct->get_perfcounters_collection()->remove(logger);
+ delete logger;
+ cct->_conf->remove_observer(this);
+}
+
+const char** WBThrottle::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "filestore_wbthrottle_btrfs_bytes_start_flusher",
+ "filestore_wbthrottle_btrfs_bytes_hard_limit",
+ "filestore_wbthrottle_btrfs_ios_start_flusher",
+ "filestore_wbthrottle_btrfs_ios_hard_limit",
+ "filestore_wbthrottle_btrfs_inodes_start_flusher",
+ "filestore_wbthrottle_btrfs_inodes_hard_limit",
+ "filestore_wbthrottle_xfs_bytes_start_flusher",
+ "filestore_wbthrottle_xfs_bytes_hard_limit",
+ "filestore_wbthrottle_xfs_ios_start_flusher",
+ "filestore_wbthrottle_xfs_ios_hard_limit",
+ "filestore_wbthrottle_xfs_inodes_start_flusher",
+ "filestore_wbthrottle_xfs_inodes_hard_limit",
+ NULL
+ };
+ return KEYS;
+}
+
+void WBThrottle::set_from_conf()
+{
+ assert(lock.is_locked());
+ if (fs == BTRFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
+ } else if (fs == XFS) {
+ size_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
+ size_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
+ io_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
+ io_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
+ fd_limits.first =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
+ fd_limits.second =
+ cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
+ } else {
+ assert(0 == "invalid value for fs");
+ }
+ cond.Signal();
+}
+
+void WBThrottle::handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed)
+{
+ Mutex::Locker l(lock);
+ for (const char** i = get_tracked_conf_keys(); *i; ++i) {
+ if (changed.count(*i)) {
+ set_from_conf();
+ return;
+ }
+ }
+}
+
+bool WBThrottle::get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next)
+{
+ assert(lock.is_locked());
+ assert(next);
+ while (!stopping &&
+ cur_ios < io_limits.first &&
+ pending_wbs.size() < fd_limits.first &&
+ cur_size < size_limits.first)
+ cond.Wait(lock);
+ if (stopping)
+ return false;
+ assert(!pending_wbs.empty());
+ hobject_t obj(pop_object());
+
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(obj);
+ *next = boost::make_tuple(obj, i->second.second, i->second.first);
+ pending_wbs.erase(i);
+ return true;
+}
+
+
+void *WBThrottle::entry()
+{
+ Mutex::Locker l(lock);
+ boost::tuple<hobject_t, FDRef, PendingWB> wb;
+ while (get_next_should_flush(&wb)) {
+ clearing = wb.get<0>();
+ lock.Unlock();
+ ::fsync(**wb.get<1>());
+ if (wb.get<2>().nocache)
+ posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
+ lock.Lock();
+ clearing = hobject_t();
+ cur_ios -= wb.get<2>().ios;
+ logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
+ cur_size -= wb.get<2>().size;
+ logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
+ logger->dec(l_wbthrottle_inodes_dirtied);
+ cond.Signal();
+ wb = boost::tuple<hobject_t, FDRef, PendingWB>();
+ }
+ return 0;
+}
+
+void WBThrottle::queue_wb(
+ FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len,
+ bool nocache)
+{
+ Mutex::Locker l(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
+ pending_wbs.find(hoid);
+ if (wbiter == pending_wbs.end()) {
+ wbiter = pending_wbs.insert(
+ make_pair(hoid,
+ make_pair(
+ PendingWB(),
+ fd))).first;
+ logger->inc(l_wbthrottle_inodes_dirtied);
+ } else {
+ remove_object(hoid);
+ }
+
+ cur_ios++;
+ logger->inc(l_wbthrottle_ios_dirtied);
+ cur_size += len;
+ logger->inc(l_wbthrottle_bytes_dirtied, len);
+
+ wbiter->second.first.add(nocache, len, 1);
+ insert_object(hoid);
+ cond.Signal();
+}
+
+void WBThrottle::clear()
+{
+ Mutex::Locker l(lock);
+ for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.begin();
+ i != pending_wbs.end();
+ ++i) {
+ cur_ios -= i->second.first.ios;
+ logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
+ cur_size -= i->second.first.size;
+ logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
+ logger->dec(l_wbthrottle_inodes_dirtied);
+ }
+ pending_wbs.clear();
+ lru.clear();
+ rev_lru.clear();
+ cond.Signal();
+ assert(cur_ios == 0);
+ assert(cur_size == 0);
+}
+
+void WBThrottle::clear_object(const hobject_t &hoid)
+{
+ Mutex::Locker l(lock);
+ while (clearing == hoid)
+ cond.Wait(lock);
+ map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+ pending_wbs.find(hoid);
+ if (i == pending_wbs.end())
+ return;
+
+ cur_ios -= i->second.first.ios;
+ cur_size -= i->second.first.size;
+
+ pending_wbs.erase(i);
+ remove_object(hoid);
+}
+
+void WBThrottle::throttle()
+{
+ Mutex::Locker l(lock);
+ while (!stopping && !(
+ cur_ios < io_limits.second &&
+ pending_wbs.size() < fd_limits.second &&
+ cur_size < size_limits.second)) {
+ cond.Wait(lock);
+ }
+}
diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h
new file mode 100644
index 00000000000..070de08e123
--- /dev/null
+++ b/src/os/WBThrottle.h
@@ -0,0 +1,171 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef WBTHROTTLE_H
+#define WBTHROTTLE_H
+
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <tr1/memory>
+#include "include/buffer.h"
+#include "common/Formatter.h"
+#include "os/hobject.h"
+#include "include/interval_set.h"
+#include "FDCache.h"
+#include "common/Thread.h"
+#include "common/ceph_context.h"
+
+class PerfCounters;
+enum {
+ l_wbthrottle_first = 999090,
+ l_wbthrottle_bytes_dirtied,
+ l_wbthrottle_bytes_wb,
+ l_wbthrottle_ios_dirtied,
+ l_wbthrottle_ios_wb,
+ l_wbthrottle_inodes_dirtied,
+ l_wbthrottle_inodes_wb,
+ l_wbthrottle_last
+};
+
+/**
+ * WBThrottle
+ *
+ * Tracks, throttles, and flushes outstanding IO
+ */
+class WBThrottle : Thread, public md_config_obs_t {
+ hobject_t clearing;
+
+ /* *_limits.first is the start_flusher limit and
+ * *_limits.second is the hard limit
+ */
+
+ /// Limits on unflushed bytes
+ pair<uint64_t, uint64_t> size_limits;
+
+ /// Limits on unflushed ios
+ pair<uint64_t, uint64_t> io_limits;
+
+ /// Limits on unflushed objects
+ pair<uint64_t, uint64_t> fd_limits;
+
+ uint64_t cur_ios; /// Currently unflushed IOs
+ uint64_t cur_size; /// Currently unflushed bytes
+
+ /**
+ * PendingWB tracks the ios pending on an object.
+ */
+ class PendingWB {
+ public:
+ bool nocache;
+ uint64_t size;
+ uint64_t ios;
+ PendingWB() : nocache(true), size(0), ios(0) {}
+ void add(bool _nocache, uint64_t _size, uint64_t _ios) {
+ if (!_nocache)
+ nocache = false; // only nocache if all writes are nocache
+ size += _size;
+ ios += _ios;
+ }
+ };
+
+ CephContext *cct;
+ PerfCounters *logger;
+ bool stopping;
+ Mutex lock;
+ Cond cond;
+
+
+ /**
+ * Flush objects in lru order
+ */
+ list<hobject_t> lru;
+ map<hobject_t, list<hobject_t>::iterator> rev_lru;
+ void remove_object(const hobject_t &hoid) {
+ assert(lock.is_locked());
+ map<hobject_t, list<hobject_t>::iterator>::iterator iter =
+ rev_lru.find(hoid);
+ if (iter == rev_lru.end())
+ return;
+
+ lru.erase(iter->second);
+ rev_lru.erase(iter);
+ }
+ hobject_t pop_object() {
+ assert(!lru.empty());
+ hobject_t hoid(lru.front());
+ lru.pop_front();
+ rev_lru.erase(hoid);
+ return hoid;
+ }
+ void insert_object(const hobject_t &hoid) {
+ assert(rev_lru.find(hoid) == rev_lru.end());
+ lru.push_back(hoid);
+ rev_lru.insert(make_pair(hoid, --lru.end()));
+ }
+
+ map<hobject_t, pair<PendingWB, FDRef> > pending_wbs;
+
+ /// get next flush to perform
+ bool get_next_should_flush(
+ boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush
+ ); ///< @return false if we are shutting down
+public:
+ enum FS {
+ BTRFS,
+ XFS
+ };
+
+private:
+ FS fs;
+
+ void set_from_conf();
+public:
+ WBThrottle(CephContext *cct);
+ ~WBThrottle();
+
+ /// Set fs as XFS or BTRFS
+ void set_fs(FS new_fs) {
+ Mutex::Locker l(lock);
+ fs = new_fs;
+ set_from_conf();
+ }
+
+ /// Queue wb on hoid, fd taking throttle (does not block)
+ void queue_wb(
+ FDRef fd, ///< [in] FDRef to hoid
+ const hobject_t &hoid, ///< [in] object
+ uint64_t offset, ///< [in] offset written
+ uint64_t len, ///< [in] length written
+ bool nocache ///< [in] try to clear out of cache after write
+ );
+
+ /// Clear all wb (probably due to sync)
+ void clear();
+
+ /// Clear object
+ void clear_object(const hobject_t &hoid);
+
+ /// Block until there is throttle available
+ void throttle();
+
+ /// md_config_obs_t
+ const char** get_tracked_conf_keys() const;
+ void handle_conf_change(const md_config_t *conf,
+ const std::set<std::string> &changed);
+
+ /// Thread
+ void *entry();
+};
+
+#endif
diff --git a/src/os/hobject.h b/src/os/hobject.h
index 28e0da0d82a..47fcb3dda39 100644
--- a/src/os/hobject.h
+++ b/src/os/hobject.h
@@ -15,6 +15,9 @@
#ifndef __CEPH_OS_HOBJECT_H
#define __CEPH_OS_HOBJECT_H
+#include <string.h>
+#include "include/types.h"
+#include "include/rados.h"
#include "include/object.h"
#include "include/cmp.h"
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 761a77cd69c..019d6b8d99b 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -5288,7 +5288,6 @@ void ReplicatedPG::submit_push_data(
void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
{
- remove_snap_mapped_object(*t, recovery_info.soid);
t->collection_move(coll, get_temp_coll(t), recovery_info.soid);
for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
recovery_info.clone_subset.begin();