diff options
author | Sage Weil <sage@inktank.com> | 2013-05-28 16:35:55 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-06-02 14:09:51 -0700 |
commit | 87dcba2dd12a673fdc63ad64fb23e6e9f841d74f (patch) | |
tree | 630250c03b7a9d0d49217623c857932b2e233f5d | |
parent | bac5720b2a583e799c6961c733c4a9132a002440 (diff) | |
download | ceph-87dcba2dd12a673fdc63ad64fb23e6e9f841d74f.tar.gz |
os/LevelDBStore: do compact_prefix() work asynchronously
We generally do not want to block while compacting a range of leveldb.
Push the blocking+waiting off to a separate thread. (leveldb will do what
it can to avoid blocking internally; no reason for us to wait explicitly.)
This addresses part of #5176.
Signed-off-by: Sage Weil <sage@inktank.com>
(cherry picked from commit 4af917d4478ec07734a69447420280880d775fa2)
-rw-r--r-- | src/mon/MonitorDBStore.h | 2 | ||||
-rw-r--r-- | src/os/LevelDBStore.cc | 17 | ||||
-rw-r--r-- | src/os/LevelDBStore.h | 43 |
3 files changed, 60 insertions, 2 deletions
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index 3b53a8be80d..f496fa0e587 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -215,7 +215,7 @@ class MonitorDBStore int r = db->submit_transaction_sync(dbt); if (r >= 0) { while (!compact_prefixes.empty()) { - db->compact_prefix(compact_prefixes.front()); + db->compact_prefix_async(compact_prefixes.front()); compact_prefixes.pop_front(); } } diff --git a/src/os/LevelDBStore.cc b/src/os/LevelDBStore.cc index 612063cf481..5f052a57aff 100644 --- a/src/os/LevelDBStore.cc +++ b/src/os/LevelDBStore.cc @@ -140,3 +140,20 @@ int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) *key= string(in_prefix, prefix_len + 1); return 0; } + +void LevelDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + string prefix = compact_queue.front(); + compact_queue.pop_front(); + compact_queue_lock.Unlock(); + compact_prefix(prefix); + compact_queue_lock.Lock(); + continue; + } + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h index 557595181f6..94a69492247 100644 --- a/src/os/LevelDBStore.h +++ b/src/os/LevelDBStore.h @@ -33,6 +33,24 @@ class LevelDBStore : public KeyValueDB { int init(ostream &out, bool create_if_missing); + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list<string> compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + CompactThread(LevelDBStore *d) : db(d) {} + void *entry() { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + public: /// compact the underlying leveldb store void compact() { @@ -50,6 +68,16 @@ public: db->CompactRange(&cstart, &cend); } + void compact_prefix_async(const string& prefix) { + Mutex::Locker l(compact_queue_lock); + compact_queue.remove(prefix); // prevent unbounded dups + compact_queue.push_back(prefix); + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create(); + } + } + /** * options_t: Holds options which are minimally interpreted * on initialization and then passed through to LevelDB. @@ -94,10 +122,23 @@ public: #ifdef HAVE_LEVELDB_FILTER_POLICY filterpolicy(NULL), #endif + compact_queue_lock("LevelDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), options() {} - ~LevelDBStore() {} + ~LevelDBStore() { + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + } /// Opens underlying db int open(ostream &out) { |