summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-05-28 16:35:55 -0700
committerSage Weil <sage@inktank.com>2013-06-02 14:09:51 -0700
commit87dcba2dd12a673fdc63ad64fb23e6e9f841d74f (patch)
tree630250c03b7a9d0d49217623c857932b2e233f5d
parentbac5720b2a583e799c6961c733c4a9132a002440 (diff)
downloadceph-87dcba2dd12a673fdc63ad64fb23e6e9f841d74f.tar.gz
os/LevelDBStore: do compact_prefix() work asynchronously
We generally do not want to block while compacting a range of leveldb. Push the blocking+waiting off to a separate thread. (leveldb will do what it can to avoid blocking internally; no reason for us to wait explicitly.) This addresses part of #5176. Signed-off-by: Sage Weil <sage@inktank.com> (cherry picked from commit 4af917d4478ec07734a69447420280880d775fa2)
-rw-r--r--src/mon/MonitorDBStore.h2
-rw-r--r--src/os/LevelDBStore.cc17
-rw-r--r--src/os/LevelDBStore.h43
3 files changed, 60 insertions, 2 deletions
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
index 3b53a8be80d..f496fa0e587 100644
--- a/src/mon/MonitorDBStore.h
+++ b/src/mon/MonitorDBStore.h
@@ -215,7 +215,7 @@ class MonitorDBStore
int r = db->submit_transaction_sync(dbt);
if (r >= 0) {
while (!compact_prefixes.empty()) {
- db->compact_prefix(compact_prefixes.front());
+ db->compact_prefix_async(compact_prefixes.front());
compact_prefixes.pop_front();
}
}
diff --git a/src/os/LevelDBStore.cc b/src/os/LevelDBStore.cc
index 612063cf481..5f052a57aff 100644
--- a/src/os/LevelDBStore.cc
+++ b/src/os/LevelDBStore.cc
@@ -140,3 +140,20 @@ int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
*key= string(in_prefix, prefix_len + 1);
return 0;
}
+
+void LevelDBStore::compact_thread_entry()
+{
+ compact_queue_lock.Lock();
+ while (!compact_queue_stop) {
+ while (!compact_queue.empty()) {
+ string prefix = compact_queue.front();
+ compact_queue.pop_front();
+ compact_queue_lock.Unlock();
+ compact_prefix(prefix);
+ compact_queue_lock.Lock();
+ continue;
+ }
+ compact_queue_cond.Wait(compact_queue_lock);
+ }
+ compact_queue_lock.Unlock();
+}
diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h
index 557595181f6..94a69492247 100644
--- a/src/os/LevelDBStore.h
+++ b/src/os/LevelDBStore.h
@@ -33,6 +33,24 @@ class LevelDBStore : public KeyValueDB {
int init(ostream &out, bool create_if_missing);
+ // manage async compactions
+ Mutex compact_queue_lock;
+ Cond compact_queue_cond;
+ list<string> compact_queue;
+ bool compact_queue_stop;
+ class CompactThread : public Thread {
+ LevelDBStore *db;
+ public:
+ CompactThread(LevelDBStore *d) : db(d) {}
+ void *entry() {
+ db->compact_thread_entry();
+ return NULL;
+ }
+ friend class LevelDBStore;
+ } compact_thread;
+
+ void compact_thread_entry();
+
public:
/// compact the underlying leveldb store
void compact() {
@@ -50,6 +68,16 @@ public:
db->CompactRange(&cstart, &cend);
}
+ void compact_prefix_async(const string& prefix) {
+ Mutex::Locker l(compact_queue_lock);
+ compact_queue.remove(prefix); // prevent unbounded dups
+ compact_queue.push_back(prefix);
+ compact_queue_cond.Signal();
+ if (!compact_thread.is_started()) {
+ compact_thread.create();
+ }
+ }
+
/**
* options_t: Holds options which are minimally interpreted
* on initialization and then passed through to LevelDB.
@@ -94,10 +122,23 @@ public:
#ifdef HAVE_LEVELDB_FILTER_POLICY
filterpolicy(NULL),
#endif
+ compact_queue_lock("LevelDBStore::compact_thread_lock"),
+ compact_queue_stop(false),
+ compact_thread(this),
options()
{}
- ~LevelDBStore() {}
+ ~LevelDBStore() {
+ compact_queue_lock.Lock();
+ if (compact_thread.is_started()) {
+ compact_queue_stop = true;
+ compact_queue_cond.Signal();
+ compact_queue_lock.Unlock();
+ compact_thread.join();
+ } else {
+ compact_queue_lock.Unlock();
+ }
+ }
/// Opens underlying db
int open(ostream &out) {