diff options
-rw-r--r-- | PendingReleaseNotes | 34 | ||||
-rw-r--r-- | doc/release-notes.rst | 53 | ||||
-rw-r--r-- | src/common/TrackedOp.cc | 265 | ||||
-rw-r--r-- | src/common/TrackedOp.h | 154 | ||||
-rw-r--r-- | src/common/bloom_filter.cc | 89 | ||||
-rw-r--r-- | src/common/bloom_filter.hpp | 181 | ||||
-rw-r--r-- | src/include/Makefile.am | 1 | ||||
-rw-r--r-- | src/include/histogram.h | 76 | ||||
-rw-r--r-- | src/mon/PGMonitor.h | 1 | ||||
-rw-r--r-- | src/objclass/class_api.cc | 2 | ||||
-rw-r--r-- | src/os/Makefile.am | 3 | ||||
-rw-r--r-- | src/osd/Makefile.am | 1 | ||||
-rw-r--r-- | src/osd/OSD.cc | 103 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 263 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 106 | ||||
-rw-r--r-- | src/osd/PG.cc | 48 | ||||
-rw-r--r-- | src/osd/ReplicatedBackend.cc | 6 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 88 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 2 | ||||
-rw-r--r-- | src/osd/osd_types.h | 62 | ||||
-rw-r--r-- | src/test/common/test_bloom_filter.cc | 71 | ||||
-rw-r--r-- | src/test/encoding/types.h | 5 |
22 files changed, 977 insertions, 637 deletions
diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 779a081480f..a3ec73290f3 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -1,37 +1,3 @@ -v0.70 -~~~~~ - -* librados::Rados::pool_create_async() and librados::Rados::pool_delete_async() - don't drop a reference to the completion object on error, caller needs to take - care of that. This has never really worked correctly and we were leaking an - object - -* 'ceph osd crush set <id> <weight> <loc..>' no longer adds the osd to the - specified location, as that's a job for 'ceph osd crush add'. It will - however continue to work just the same as long as the osd already exists - in the crush map. - -* The OSD now enforces that class write methods cannot both mutate an - object and return data. The rbd.assign_bid method, the lone - offender, has been removed. This breaks compatibility with - pre-bobtail librbd clients by preventing them from creating new - images. - -* librados now returns on commit instead of ack for synchronous calls. - This is a bit safer in the case where both OSDs and the client crash, and - is probably how it should have been acting from the beginning. Users are - unlikely to notice but it could result in lower performance in some - circumstances. Those who care should switch to using the async interfaces, - which let you specify safety semantics precisely. - -* The C++ librados AioComplete::get_version() method was incorrectly - returning an int (usually 32-bits). To avoid breaking library - compatibility, a get_version64() method is added that returns the - full-width value. The old method is deprecated and will be removed - in a future release. Users of the C++ librados API that make use of - the get_version() method should modify their code to avoid getting a - value that is truncated from 64 to to 32 bits. - v0.71 ~~~~~ diff --git a/doc/release-notes.rst b/doc/release-notes.rst index bb1dfe4bfec..2b566baa0ea 100644 --- a/doc/release-notes.rst +++ b/doc/release-notes.rst @@ -2,6 +2,37 @@ Release Notes =============== +v0.70 +----- + +Upgrading +~~~~~~~~~ + +* librados::Rados::pool_create_async() and librados::Rados::pool_delete_async() + don't drop a reference to the completion object on error, caller needs to take + care of that. This has never really worked correctly and we were leaking an + object + +* 'ceph osd crush set <id> <weight> <loc..>' no longer adds the osd to the + specified location, as that's a job for 'ceph osd crush add'. It will + however continue to work just the same as long as the osd already exists + in the crush map. + +Notable Changes +~~~~~~~~~~~~~~~ + +* mon: a few 'ceph mon add' races fixed (command is now idempotent) (Joao Luis) +* crush: fix name caching +* rgw: fix a few minor memory leaks (Yehuda Sadeh) +* ceph: improve parsing of CEPH_ARGS (Benoit Knecht) +* mon: avoid rewriting full osdmaps on restart (Joao Luis) +* crc32c: fix optimized crc32c code (it now detects arch support properly) +* mon: fix 'ceph osd crush reweight ...' (Joao Luis) +* osd: revert xattr size limit (fixes large rgw uploads) +* mds: fix heap profiler commands (Joao Luis) +* rgw: fix inefficient use of std::list::size() (Yehuda Sadeh) + + v0.69 ----- @@ -19,6 +50,28 @@ Upgrading the because the server-side behavior has changed it is possible that an application misusing the interface may now get errors. +* The OSD now enforces that class write methods cannot both mutate an + object and return data. The rbd.assign_bid method, the lone + offender, has been removed. This breaks compatibility with + pre-bobtail librbd clients by preventing them from creating new + images. + +* librados now returns on commit instead of ack for synchronous calls. + This is a bit safer in the case where both OSDs and the client crash, and + is probably how it should have been acting from the beginning. Users are + unlikely to notice but it could result in lower performance in some + circumstances. Those who care should switch to using the async interfaces, + which let you specify safety semantics precisely. + +* The C++ librados AioComplete::get_version() method was incorrectly + returning an int (usually 32-bits). To avoid breaking library + compatibility, a get_version64() method is added that returns the + full-width value. The old method is deprecated and will be removed + in a future release. Users of the C++ librados API that make use of + the get_version() method should modify their code to avoid getting a + value that is truncated from 64 to to 32 bits. + + Notable Changes ~~~~~~~~~~~~~~~ diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc new file mode 100644 index 00000000000..d1dbc1e7135 --- /dev/null +++ b/src/common/TrackedOp.cc @@ -0,0 +1,265 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "TrackedOp.h" +#include "common/Formatter.h" +#include <iostream> +#include <vector> +#include "common/debug.h" +#include "common/config.h" +#include "msg/Message.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_optracker +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "-- op tracker -- "; +} + +void OpHistory::on_shutdown() +{ + arrived.clear(); + duration.clear(); + shutdown = true; +} + +void OpHistory::insert(utime_t now, TrackedOpRef op) +{ + if (shutdown) + return; + duration.insert(make_pair(op->get_duration(), op)); + arrived.insert(make_pair(op->get_arrived(), op)); + cleanup(now); +} + +void OpHistory::cleanup(utime_t now) +{ + while (arrived.size() && + (now - arrived.begin()->first > + (double)(history_duration))) { + duration.erase(make_pair( + arrived.begin()->second->get_duration(), + arrived.begin()->second)); + arrived.erase(arrived.begin()); + } + + while (duration.size() > history_size) { + arrived.erase(make_pair( + duration.begin()->second->get_arrived(), + duration.begin()->second)); + duration.erase(duration.begin()); + } +} + +void OpHistory::dump_ops(utime_t now, Formatter *f) +{ + cleanup(now); + f->open_object_section("OpHistory"); + f->dump_int("num to keep", history_size); + f->dump_int("duration to keep", history_duration); + { + f->open_array_section("Ops"); + for (set<pair<utime_t, TrackedOpRef> >::const_iterator i = + arrived.begin(); + i != arrived.end(); + ++i) { + f->open_object_section("Op"); + i->second->dump(now, f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); +} + +void OpTracker::dump_historic_ops(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + utime_t now = ceph_clock_now(cct); + history.dump_ops(now, f); +} + +void OpTracker::dump_ops_in_flight(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + f->open_object_section("ops_in_flight"); // overall dump + f->dump_int("num_ops", ops_in_flight.size()); + f->open_array_section("ops"); // list of TrackedOps + utime_t now = ceph_clock_now(cct); + for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + f->open_object_section("op"); + (*p)->dump(now, f); + f->close_section(); // this TrackedOp + } + f->close_section(); // list of TrackedOps + f->close_section(); // overall dump +} + +void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + ops_in_flight.push_back(i); + ops_in_flight.back()->seq = seq++; +} + +void OpTracker::unregister_inflight_op(TrackedOp *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + assert(i->xitem.get_list() == &ops_in_flight); + utime_t now = ceph_clock_now(cct); + i->xitem.remove_myself(); + i->request->clear_data(); + history.insert(now, TrackedOpRef(i)); +} + +bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) +{ + Mutex::Locker locker(ops_in_flight_lock); + if (!ops_in_flight.size()) + return false; + + utime_t now = ceph_clock_now(cct); + utime_t too_old = now; + too_old -= complaint_time; + + utime_t oldest_secs = now - ops_in_flight.front()->get_arrived(); + + dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + << "; oldest is " << oldest_secs + << " seconds old" << dendl; + + if (oldest_secs < complaint_time) + return false; + + xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); + warning_vector.reserve(log_threshold + 1); + + int slow = 0; // total slow + int warned = 0; // total logged + while (!i.end() && (*i)->get_arrived() < too_old) { + slow++; + + // exponential backoff of warning intervals + if (((*i)->get_arrived() + + (complaint_time * (*i)->warn_interval_multiplier)) < now) { + // will warn + if (warning_vector.empty()) + warning_vector.push_back(""); + warned++; + if (warned > log_threshold) + break; + + utime_t age = now - (*i)->get_arrived(); + stringstream ss; + ss << "slow request " << age << " seconds old, received at " << (*i)->get_arrived() + << ": " << *((*i)->request) << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); + warning_vector.push_back(ss.str()); + + // only those that have been shown will backoff + (*i)->warn_interval_multiplier *= 2; + } + ++i; + } + + // only summarize if we warn about any. if everything has backed + // off, we will stay silent. + if (warned > 0) { + stringstream ss; + ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " + << oldest_secs << " secs"; + warning_vector[0] = ss.str(); + } + + return warning_vector.size(); +} + +void OpTracker::get_age_ms_histogram(pow2_hist_t *h) +{ + Mutex::Locker locker(ops_in_flight_lock); + + h->clear(); + + utime_t now = ceph_clock_now(NULL); + unsigned bin = 30; + uint32_t lb = 1 << (bin-1); // lower bound for this bin + int count = 0; + for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + utime_t age = now - (*i)->get_arrived(); + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; + } + if (count) + h->set(bin, count); +} + +void OpTracker::mark_event(TrackedOp *op, const string &dest) +{ + utime_t now = ceph_clock_now(cct); + return _mark_event(op, dest, now); +} + +void OpTracker::_mark_event(TrackedOp *op, const string &evt, + utime_t time) +{ + Mutex::Locker locker(ops_in_flight_lock); + dout(5) << //"reqid: " << op->get_reqid() << + ", seq: " << op->seq + << ", time: " << time << ", event: " << evt + << ", request: " << *op->request << dendl; +} + +void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { + op->mark_event("done"); + tracker->unregister_inflight_op(op); + // Do not delete op, unregister_inflight_op took control +} + +void TrackedOp::mark_event(const string &event) +{ + utime_t now = ceph_clock_now(g_ceph_context); + { + Mutex::Locker l(lock); + events.push_back(make_pair(now, event)); + } + tracker->mark_event(this, event); + _event_marked(); +} + +void TrackedOp::dump(utime_t now, Formatter *f) const +{ + Message *m = request; + stringstream name; + m->print(name); + f->dump_string("description", name.str().c_str()); // this TrackedOp + f->dump_stream("received_at") << get_arrived(); + f->dump_float("age", now - get_arrived()); + f->dump_float("duration", get_duration()); + { + f->open_array_section("type_data"); + _dump(now, f); + f->close_section(); + } +} diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 753331df7f3..44e03905759 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -17,15 +17,163 @@ #include <stdint.h> #include <include/utime.h> #include "common/Mutex.h" +#include "include/histogram.h" #include "include/xlist.h" #include "msg/Message.h" #include <tr1/memory> +class TrackedOp; +typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef; + +class OpTracker; +class OpHistory { + set<pair<utime_t, TrackedOpRef> > arrived; + set<pair<double, TrackedOpRef> > duration; + void cleanup(utime_t now); + bool shutdown; + OpTracker *tracker; + uint32_t history_size; + uint32_t history_duration; + +public: + OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_), + history_size(0), history_duration(0) {} + ~OpHistory() { + assert(arrived.empty()); + assert(duration.empty()); + } + void insert(utime_t now, TrackedOpRef op); + void dump_ops(utime_t now, Formatter *f); + void on_shutdown(); + void set_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history_size = new_size; + history_duration = new_duration; + } +}; + +class OpTracker { + class RemoveOnDelete { + OpTracker *tracker; + public: + RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} + void operator()(TrackedOp *op); + }; + friend class RemoveOnDelete; + friend class OpHistory; + uint64_t seq; + Mutex ops_in_flight_lock; + xlist<TrackedOp *> ops_in_flight; + OpHistory history; + float complaint_time; + int log_threshold; + +public: + CephContext *cct; + OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), + history(this), complaint_time(0), log_threshold(0), cct(cct_) {} + void set_complaint_and_threshold(float time, int threshold) { + complaint_time = time; + log_threshold = threshold; + } + void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history.set_size_and_duration(new_size, new_duration); + } + void dump_ops_in_flight(Formatter *f); + void dump_historic_ops(Formatter *f); + void register_inflight_op(xlist<TrackedOp*>::item *i); + void unregister_inflight_op(TrackedOp *i); + + void get_age_ms_histogram(pow2_hist_t *h); + + /** + * Look for Ops which are too old, and insert warning + * strings for each Op that is too old. + * + * @param warning_strings A vector<string> reference which is filled + * with a warning string for each old Op. + * @return True if there are any Ops to warn on, false otherwise. + */ + bool check_ops_in_flight(std::vector<string> &warning_strings); + void mark_event(TrackedOp *op, const string &evt); + void _mark_event(TrackedOp *op, const string &evt, utime_t now); + + void on_shutdown() { + Mutex::Locker l(ops_in_flight_lock); + history.on_shutdown(); + } + ~OpTracker() { + assert(ops_in_flight.empty()); + } + + template <typename T> + typename T::Ref create_request(Message *ref) + { + typename T::Ref retval(new T(ref, this), + RemoveOnDelete(this)); + + _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); + _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); + _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); + _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); + + retval->init_from_message(); + + return retval; + } +}; + class TrackedOp { +private: + friend class OpHistory; + friend class OpTracker; + xlist<TrackedOp*>::item xitem; +protected: + Message *request; /// the logical request we are tracking + OpTracker *tracker; /// the tracker we are associated with + + list<pair<utime_t, string> > events; /// list of events and their times + Mutex lock; /// to protect the events list + string current; /// the current state the event is in + uint64_t seq; /// a unique value set by the OpTracker + + uint32_t warn_interval_multiplier; // limits output of a given op warning + + TrackedOp(Message *req, OpTracker *_tracker) : + xitem(this), + request(req), + tracker(_tracker), + lock("TrackedOp::lock"), + seq(0), + warn_interval_multiplier(1) + { + tracker->register_inflight_op(&xitem); + } + + virtual void init_from_message() {} + /// output any type-specific data you want to get when dump() is called + virtual void _dump(utime_t now, Formatter *f) const {} + /// if you want something else to happen when events are marked, implement + virtual void _event_marked() {} + public: - virtual void mark_event(const string &event) = 0; - virtual ~TrackedOp() {} + virtual ~TrackedOp() { assert(request); request->put(); } + + utime_t get_arrived() const { + return request->get_recv_stamp(); + } + // This function maybe needs some work; assumes last event is completion time + double get_duration() const { + return events.size() ? + (events.rbegin()->first - get_arrived()) : + 0.0; + } + Message *get_req() const { return request; } + + void mark_event(const string &event); + virtual const char *state_string() const { + return events.rbegin()->second.c_str(); + } + void dump(utime_t now, Formatter *f) const; }; -typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef; #endif diff --git a/src/common/bloom_filter.cc b/src/common/bloom_filter.cc index f602b80149e..68875e925bf 100644 --- a/src/common/bloom_filter.cc +++ b/src/common/bloom_filter.cc @@ -6,26 +6,26 @@ void bloom_filter::encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 2, bl); ::encode((uint64_t)salt_count_, bl); - ::encode((uint64_t)table_size_, bl); - ::encode((uint64_t)inserted_element_count_, bl); + ::encode((uint64_t)insert_count_, bl); + ::encode((uint64_t)target_element_count_, bl); ::encode((uint64_t)random_seed_, bl); - bufferptr bp((const char*)bit_table_, raw_table_size_); + bufferptr bp((const char*)bit_table_, table_size_); ::encode(bp, bl); ENCODE_FINISH(bl); } void bloom_filter::decode(bufferlist::iterator& p) { - DECODE_START(1, p); + DECODE_START(2, p); uint64_t v; ::decode(v, p); salt_count_ = v; ::decode(v, p); - table_size_ = v; + insert_count_ = v; ::decode(v, p); - inserted_element_count_ = v; + target_element_count_ = v; ::decode(v, p); random_seed_ = v; bufferlist t; @@ -33,11 +33,14 @@ void bloom_filter::decode(bufferlist::iterator& p) salt_.clear(); generate_unique_salt(); - raw_table_size_ = t.length(); - assert(raw_table_size_ == table_size_ / bits_per_char); + table_size_ = t.length(); delete bit_table_; - bit_table_ = new cell_type[raw_table_size_]; - t.copy(0, raw_table_size_, (char *)bit_table_); + if (table_size_) { + bit_table_ = new cell_type[table_size_]; + t.copy(0, table_size_, (char *)bit_table_); + } else { + bit_table_ = NULL; + } DECODE_FINISH(p); } @@ -46,8 +49,8 @@ void bloom_filter::dump(Formatter *f) const { f->dump_unsigned("salt_count", salt_count_); f->dump_unsigned("table_size", table_size_); - f->dump_unsigned("raw_table_size", raw_table_size_); - f->dump_unsigned("insert_count", inserted_element_count_); + f->dump_unsigned("insert_count", insert_count_); + f->dump_unsigned("target_element_count", target_element_count_); f->dump_unsigned("random_seed", random_seed_); f->open_array_section("salt_table"); @@ -56,7 +59,7 @@ void bloom_filter::dump(Formatter *f) const f->close_section(); f->open_array_section("bit_table"); - for (unsigned i = 0; i < raw_table_size_; ++i) + for (unsigned i = 0; i < table_size_; ++i) f->dump_unsigned("byte", (unsigned)bit_table_[i]); f->close_section(); } @@ -74,3 +77,61 @@ void bloom_filter::generate_test_instances(list<bloom_filter*>& ls) ls.back()->insert("boof"); ls.back()->insert("boogggg"); } + + +void compressible_bloom_filter::encode(bufferlist& bl) const +{ + ENCODE_START(2, 2, bl); + bloom_filter::encode(bl); + + uint32_t s = size_list.size(); + ::encode(s, bl); + for (vector<size_t>::const_iterator p = size_list.begin(); + p != size_list.end(); ++p) + ::encode((uint64_t)*p, bl); + + ENCODE_FINISH(bl); +} + +void compressible_bloom_filter::decode(bufferlist::iterator& p) +{ + DECODE_START(2, p); + bloom_filter::decode(p); + + uint32_t s; + ::decode(s, p); + size_list.resize(s); + for (unsigned i = 0; i < s; i++) { + uint64_t v; + ::decode(v, p); + size_list[i] = v; + } + + DECODE_FINISH(p); +} + +void compressible_bloom_filter::dump(Formatter *f) const +{ + bloom_filter::dump(f); + + f->open_array_section("table_sizes"); + for (vector<size_t>::const_iterator p = size_list.begin(); + p != size_list.end(); ++p) + f->dump_unsigned("size", (uint64_t)*p); + f->close_section(); +} + +void compressible_bloom_filter::generate_test_instances(list<compressible_bloom_filter*>& ls) +{ + ls.push_back(new compressible_bloom_filter(10, .5, 1)); + ls.push_back(new compressible_bloom_filter(10, .5, 1)); + ls.back()->insert("foo"); + ls.back()->insert("bar"); + ls.push_back(new compressible_bloom_filter(50, .5, 1)); + ls.back()->insert("foo"); + ls.back()->insert("bar"); + ls.back()->insert("baz"); + ls.back()->insert("boof"); + ls.back()->compress(20); + ls.back()->insert("boogggg"); +} diff --git a/src/common/bloom_filter.hpp b/src/common/bloom_filter.hpp index 6216c7fb34d..93787a89a60 100644 --- a/src/common/bloom_filter.hpp +++ b/src/common/bloom_filter.hpp @@ -53,14 +53,22 @@ protected: typedef unsigned int bloom_type; typedef unsigned char cell_type; + unsigned char* bit_table_; ///< pointer to bit map + std::vector<bloom_type> salt_; ///< vector of salts + std::size_t salt_count_; ///< number of salts + std::size_t table_size_; ///< bit table size in bytes + std::size_t insert_count_; ///< insertion count + std::size_t target_element_count_; ///< target number of unique insertions + std::size_t random_seed_; ///< random seed + public: bloom_filter() : bit_table_(0), salt_count_(0), table_size_(0), - raw_table_size_(0), - inserted_element_count_(0), + insert_count_(0), + target_element_count_(0), random_seed_(0) {} @@ -68,7 +76,8 @@ public: const double& false_positive_probability, const std::size_t& random_seed) : bit_table_(0), - inserted_element_count_(0), + insert_count_(0), + target_element_count_(predicted_inserted_element_count), random_seed_((random_seed) ? random_seed : 0xA5A5A5A5) { find_optimal_parameters(predicted_inserted_element_count, false_positive_probability, @@ -76,12 +85,15 @@ public: init(); } - bloom_filter(const std::size_t& salt_count, std::size_t table_size, - const std::size_t& random_seed) + bloom_filter(const std::size_t& salt_count, + std::size_t table_size, + const std::size_t& random_seed, + std::size_t target_element_count) : bit_table_(0), salt_count_(salt_count), table_size_(table_size), - inserted_element_count_(0), + insert_count_(0), + target_element_count_(target_element_count), random_seed_((random_seed) ? random_seed : 0xA5A5A5A5) { init(); @@ -89,9 +101,12 @@ public: void init() { generate_unique_salt(); - raw_table_size_ = table_size_ / bits_per_char; - bit_table_ = new cell_type[raw_table_size_]; - std::fill_n(bit_table_,raw_table_size_,0x00); + if (table_size_) { + bit_table_ = new cell_type[table_size_]; + std::fill_n(bit_table_, table_size_, 0x00); + } else { + bit_table_ = NULL; + } } bloom_filter(const bloom_filter& filter) @@ -104,12 +119,11 @@ public: if (this != &filter) { salt_count_ = filter.salt_count_; table_size_ = filter.table_size_; - raw_table_size_ = filter.raw_table_size_; - inserted_element_count_ = filter.inserted_element_count_; + insert_count_ = filter.insert_count_; random_seed_ = filter.random_seed_; delete[] bit_table_; - bit_table_ = new cell_type[raw_table_size_]; - std::copy(filter.bit_table_,filter.bit_table_ + raw_table_size_,bit_table_); + bit_table_ = new cell_type[table_size_]; + std::copy(filter.bit_table_, filter.bit_table_ + table_size_, bit_table_); salt_ = filter.salt_; } return *this; @@ -127,8 +141,9 @@ public: inline void clear() { - std::fill_n(bit_table_,raw_table_size_,0x00); - inserted_element_count_ = 0; + if (bit_table_) + std::fill_n(bit_table_, table_size_, 0x00); + insert_count_ = 0; } /** @@ -141,26 +156,28 @@ public: * @param val integer value to insert */ inline void insert(uint32_t val) { + assert(bit_table_); std::size_t bit_index = 0; std::size_t bit = 0; for (std::size_t i = 0; i < salt_.size(); ++i) { compute_indices(hash_ap(val,salt_[i]),bit_index,bit); - bit_table_[bit_index / bits_per_char] |= bit_mask[bit]; + bit_table_[bit_index >> 3] |= bit_mask[bit]; } - ++inserted_element_count_; + ++insert_count_; } inline void insert(const unsigned char* key_begin, const std::size_t& length) { + assert(bit_table_); std::size_t bit_index = 0; std::size_t bit = 0; for (std::size_t i = 0; i < salt_.size(); ++i) { compute_indices(hash_ap(key_begin,length,salt_[i]),bit_index,bit); - bit_table_[bit_index / bits_per_char] |= bit_mask[bit]; + bit_table_[bit_index >> 3] |= bit_mask[bit]; } - ++inserted_element_count_; + ++insert_count_; } template<typename T> @@ -202,12 +219,14 @@ public: */ inline virtual bool contains(uint32_t val) const { + if (!bit_table_) + return false; std::size_t bit_index = 0; std::size_t bit = 0; for (std::size_t i = 0; i < salt_.size(); ++i) { compute_indices(hash_ap(val,salt_[i]),bit_index,bit); - if ((bit_table_[bit_index / bits_per_char] & bit_mask[bit]) != bit_mask[bit]) + if ((bit_table_[bit_index >> 3] & bit_mask[bit]) != bit_mask[bit]) { return false; } @@ -217,12 +236,14 @@ public: inline virtual bool contains(const unsigned char* key_begin, const std::size_t length) const { + if (!bit_table_) + return false; std::size_t bit_index = 0; std::size_t bit = 0; for (std::size_t i = 0; i < salt_.size(); ++i) { compute_indices(hash_ap(key_begin,length,salt_[i]),bit_index,bit); - if ((bit_table_[bit_index / bits_per_char] & bit_mask[bit]) != bit_mask[bit]) + if ((bit_table_[bit_index >> 3] & bit_mask[bit]) != bit_mask[bit]) { return false; } @@ -278,12 +299,41 @@ public: inline virtual std::size_t size() const { - return table_size_; + return table_size_ * bits_per_char; } inline std::size_t element_count() const { - return inserted_element_count_; + return insert_count_; + } + + /* + * density of bits set. inconvenient units, but: + * .3 = ~50% target insertions + * .5 = 100% target insertions, "perfectly full" + * .75 = 200% target insertions + * 1.0 = all bits set... infinite insertions + */ + inline double density() const + { + if (!bit_table_) + return 0.0; + size_t set = 0; + uint8_t *p = bit_table_; + size_t left = table_size_; + while (left-- > 0) { + uint8_t c = *p; + for (; c; ++set) + c &= c - 1; + ++p; + } + return (double)set / (double)(table_size_ << 3); + } + + virtual inline double approx_unique_element_count() const { + // this is not a very good estimate; a better solution should have + // some asymptotic behavior as density() approaches 1.0. + return (double)target_element_count_ * 2.0 * density(); } inline double effective_fpp() const @@ -295,7 +345,7 @@ public: the current number of inserted elements - not the user defined predicated/expected number of inserted elements. */ - return std::pow(1.0 - std::exp(-1.0 * salt_.size() * inserted_element_count_ / size()), 1.0 * salt_.size()); + return std::pow(1.0 - std::exp(-1.0 * salt_.size() * insert_count_ / size()), 1.0 * salt_.size()); } inline bloom_filter& operator &= (const bloom_filter& filter) @@ -306,7 +356,7 @@ public: (table_size_ == filter.table_size_) && (random_seed_ == filter.random_seed_) ) { - for (std::size_t i = 0; i < raw_table_size_; ++i) { + for (std::size_t i = 0; i < table_size_; ++i) { bit_table_[i] &= filter.bit_table_[i]; } } @@ -321,7 +371,7 @@ public: (table_size_ == filter.table_size_) && (random_seed_ == filter.random_seed_) ) { - for (std::size_t i = 0; i < raw_table_size_; ++i) { + for (std::size_t i = 0; i < table_size_; ++i) { bit_table_[i] |= filter.bit_table_[i]; } } @@ -336,7 +386,7 @@ public: (table_size_ == filter.table_size_) && (random_seed_ == filter.random_seed_) ) { - for (std::size_t i = 0; i < raw_table_size_; ++i) { + for (std::size_t i = 0; i < table_size_; ++i) { bit_table_[i] ^= filter.bit_table_[i]; } } @@ -352,8 +402,8 @@ protected: inline virtual void compute_indices(const bloom_type& hash, std::size_t& bit_index, std::size_t& bit) const { - bit_index = hash % table_size_; - bit = bit_index % bits_per_char; + bit_index = hash % (table_size_ << 3); + bit = bit_index & 7; } void generate_unique_salt() @@ -418,7 +468,8 @@ protected: } else { - std::copy(predef_salt,predef_salt + predef_salt_count,std::back_inserter(salt_)); + std::copy(predef_salt,predef_salt + predef_salt_count, + std::back_inserter(salt_)); srand(static_cast<unsigned int>(random_seed_)); while (salt_.size() < salt_count_) { @@ -466,8 +517,8 @@ protected: *salt_count = static_cast<std::size_t>(min_k); size_t t = static_cast<std::size_t>(min_m); - t += (((t % bits_per_char) != 0) ? (bits_per_char - (t % bits_per_char)) : 0); - *table_size = t; + t += (((t & 7) != 0) ? (bits_per_char - (t & 7)) : 0); + *table_size = t >> 3; } inline bloom_type hash_ap(uint32_t val, bloom_type hash) const @@ -507,14 +558,6 @@ protected: return hash; } - std::vector<bloom_type> salt_; - unsigned char* bit_table_; - std::size_t salt_count_; - std::size_t table_size_; - std::size_t raw_table_size_; - std::size_t inserted_element_count_; - std::size_t random_seed_; - public: void encode(bufferlist& bl) const; void decode(bufferlist::iterator& bl); @@ -549,53 +592,77 @@ class compressible_bloom_filter : public bloom_filter { public: + compressible_bloom_filter() : bloom_filter() {} + compressible_bloom_filter(const std::size_t& predicted_element_count, const double& false_positive_probability, const std::size_t& random_seed) - : bloom_filter(predicted_element_count,false_positive_probability,random_seed) + : bloom_filter(predicted_element_count, false_positive_probability, random_seed) + { + size_list.push_back(table_size_); + } + + compressible_bloom_filter(const std::size_t& salt_count, + std::size_t table_size, + const std::size_t& random_seed, + std::size_t target_count) + : bloom_filter(salt_count, table_size, random_seed, target_count) { size_list.push_back(table_size_); } inline virtual std::size_t size() const { - return size_list.back(); + return size_list.back() * bits_per_char; } - inline bool compress(const double& percentage) + inline bool compress(const double& target_ratio) { - if ((0.0 >= percentage) || (percentage >= 100.0)) + if (!bit_table_) + return false; + + if ((0.0 >= target_ratio) || (target_ratio >= 1.0)) { return false; } std::size_t original_table_size = size_list.back(); - std::size_t new_table_size = static_cast<std::size_t>((size_list.back() * (1.0 - (percentage / 100.0)))); - new_table_size -= (((new_table_size % bits_per_char) != 0) ? (new_table_size % bits_per_char) : 0); + std::size_t new_table_size = static_cast<std::size_t>(size_list.back() * target_ratio); - if ((bits_per_char > new_table_size) || (new_table_size >= original_table_size)) + if ((!new_table_size) || (new_table_size >= original_table_size)) { return false; } - cell_type* tmp = new cell_type[new_table_size / bits_per_char]; - std::copy(bit_table_, bit_table_ + (new_table_size / bits_per_char), tmp); - cell_type* itr = bit_table_ + (new_table_size / bits_per_char); - cell_type* end = bit_table_ + (original_table_size / bits_per_char); + cell_type* tmp = new cell_type[new_table_size]; + std::copy(bit_table_, bit_table_ + (new_table_size), tmp); + cell_type* itr = bit_table_ + (new_table_size); + cell_type* end = bit_table_ + (original_table_size); cell_type* itr_tmp = tmp; - + cell_type* itr_end = tmp + (new_table_size); while (end != itr) { *(itr_tmp++) |= (*itr++); + if (itr_tmp == itr_end) + itr_tmp = tmp; } delete[] bit_table_; bit_table_ = tmp; size_list.push_back(new_table_size); + table_size_ = new_table_size; return true; } + virtual inline double approx_unique_element_count() const { + // this is not a very good estimate; a better solution should have + // some asymptotic behavior as density() approaches 1.0. + // + // the compress() correction is also bad; it tends to under-estimate. + return (double)target_element_count_ * 2.0 * density() * (double)size_list.back() / (double)size_list.front(); + } + private: inline virtual void compute_indices(const bloom_type& hash, std::size_t& bit_index, std::size_t& bit) const @@ -603,13 +670,19 @@ private: bit_index = hash; for (std::size_t i = 0; i < size_list.size(); ++i) { - bit_index %= size_list[i]; + bit_index %= size_list[i] << 3; } - bit = bit_index % bits_per_char; + bit = bit_index & 7; } std::vector<std::size_t> size_list; +public: + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& bl); + void dump(Formatter *f) const; + static void generate_test_instances(std::list<compressible_bloom_filter*>& ls); }; +WRITE_CLASS_ENCODER(compressible_bloom_filter) #endif diff --git a/src/include/Makefile.am b/src/include/Makefile.am index 2d98e777f00..c8823ce523d 100644 --- a/src/include/Makefile.am +++ b/src/include/Makefile.am @@ -43,6 +43,7 @@ noinst_HEADERS += \ include/filepath.h \ include/frag.h \ include/hash.h \ + include/histogram.h \ include/intarith.h \ include/interval_set.h \ include/int_types.h \ diff --git a/src/include/histogram.h b/src/include/histogram.h new file mode 100644 index 00000000000..c817b1ec175 --- /dev/null +++ b/src/include/histogram.h @@ -0,0 +1,76 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#ifndef HISTOGRAM_H_ +#define HISTOGRAM_H_ + +/** + * power of 2 histogram + */ +struct pow2_hist_t { // + /** + * histogram + * + * bin size is 2^index + * value is count of elements that are <= the current bin but > the previous bin. + */ + vector<int32_t> h; + +private: + /// expand to at least another's size + void _expand_to(unsigned s) { + if (s > h.size()) + h.resize(s, 0); + } + /// drop useless trailing 0's + void _contract() { + unsigned p = h.size(); + while (p > 0 && h[p-1] == 0) + --p; + h.resize(p); + } + +public: + void clear() { + h.clear(); + } + void set(int bin, int32_t v) { + _expand_to(bin + 1); + h[bin] = v; + _contract(); + } + + void add(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] += o.h[p]; + _contract(); + } + void sub(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] -= o.h[p]; + _contract(); + } + + int32_t upper_bound() const { + return 1 << h.size(); + } + + void dump(Formatter *f) const; + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + static void generate_test_instances(std::list<pow2_hist_t*>& o); +}; +WRITE_CLASS_ENCODER(pow2_hist_t) + +#endif /* HISTOGRAM_H_ */ diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index 44015395e94..d29f47c1c43 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -28,6 +28,7 @@ using namespace std; #include "PaxosService.h" #include "include/types.h" #include "include/utime.h" +#include "include/histogram.h" #include "msg/Messenger.h" #include "common/config.h" #include "mon/MonitorDBStore.h" diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc index 1ac224cdfe7..bb26c752f9b 100644 --- a/src/objclass/class_api.cc +++ b/src/objclass/class_api.cc @@ -177,7 +177,7 @@ int cls_read(cls_method_context_t hctx, int ofs, int len, int cls_get_request_origin(cls_method_context_t hctx, entity_inst_t *origin) { ReplicatedPG::OpContext **pctx = static_cast<ReplicatedPG::OpContext **>(hctx); - *origin = (*pctx)->op->request->get_orig_source_inst(); + *origin = (*pctx)->op->get_req()->get_orig_source_inst(); return 0; } diff --git a/src/os/Makefile.am b/src/os/Makefile.am index b7fef8dd209..4f12a6a3278 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -13,7 +13,8 @@ libos_la_SOURCES = \ os/WBThrottle.cc \ os/BtrfsFileStoreBackend.cc \ os/GenericFileStoreBackend.cc \ - os/ZFSFileStoreBackend.cc + os/ZFSFileStoreBackend.cc \ + common/TrackedOp.cc noinst_LTLIBRARIES += libos.la noinst_HEADERS += \ diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index 9d3bc1d5e47..cae02015fce 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -16,6 +16,7 @@ libosd_la_SOURCES = \ osd/Watch.cc \ osd/ClassHandler.cc \ osd/OpRequest.cc \ + common/TrackedOp.cc \ osd/SnapMapper.cc \ osd/osd_types.cc \ objclass/class_api.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8ce11bb558c..b2aa2ebbcd2 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -907,6 +907,10 @@ OSD::OSD(CephContext *cct_, int id, Messenger *internal_messenger, Messenger *ex service(this) { monc->set_messenger(client_messenger); + op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time, + cct->_conf->osd_op_log_threshold); + op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size, + cct->_conf->osd_op_history_duration); } OSD::~OSD() @@ -4539,7 +4543,7 @@ void OSD::do_waiters() void OSD::dispatch_op(OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case MSG_OSD_PG_CREATE: handle_pg_create(op); @@ -4665,7 +4669,7 @@ void OSD::_dispatch(Message *m) default: { - OpRequestRef op = op_tracker.create_request(m); + OpRequestRef op = op_tracker.create_request<OpRequest>(m); op->mark_event("waiting_for_osdmap"); // no map? starting up? if (!osdmap) { @@ -5711,9 +5715,9 @@ bool OSD::require_mon_peer(Message *m) bool OSD::require_osd_peer(OpRequestRef op) { - if (!op->request->get_connection()->peer_is_osd()) { - dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr() - << " " << *op->request << dendl; + if (!op->get_req()->get_connection()->peer_is_osd()) { + dout(0) << "require_osd_peer received from non-osd " << op->get_req()->get_connection()->get_peer_addr() + << " " << *op->get_req() << dendl; return false; } return true; @@ -5725,7 +5729,7 @@ bool OSD::require_osd_peer(OpRequestRef op) */ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) { - Message *m = op->request; + Message *m = op->get_req(); dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl; assert(osd_lock.is_locked()); @@ -5837,7 +5841,7 @@ void OSD::split_pgs( */ void OSD::handle_pg_create(OpRequestRef op) { - MOSDPGCreate *m = (MOSDPGCreate*)op->request; + MOSDPGCreate *m = (MOSDPGCreate*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_CREATE); dout(10) << "handle_pg_create " << *m << dendl; @@ -5857,11 +5861,16 @@ void OSD::handle_pg_create(OpRequestRef op) } } - if (!require_mon_peer(op->request)) { - // we have to hack around require_mon_peer's interface limits - op->request = NULL; + /* we have to hack around require_mon_peer's interface limits, so + * grab an extra reference before going in. If the peer isn't + * a Monitor, the reference is put for us (and then cleared + * up automatically by our OpTracker infrastructure). Otherwise, + * we put the extra ref ourself. + */ + if (!require_mon_peer(op->get_req()->get())) { return; } + op->get_req()->put(); if (!require_same_or_newer_map(op, m->epoch)) return; @@ -6166,7 +6175,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info */ void OSD::handle_pg_notify(OpRequestRef op) { - MOSDPGNotify *m = (MOSDPGNotify*)op->request; + MOSDPGNotify *m = (MOSDPGNotify*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_NOTIFY); dout(7) << "handle_pg_notify from " << m->get_source() << dendl; @@ -6201,7 +6210,7 @@ void OSD::handle_pg_notify(OpRequestRef op) void OSD::handle_pg_log(OpRequestRef op) { - MOSDPGLog *m = (MOSDPGLog*) op->request; + MOSDPGLog *m = (MOSDPGLog*) op->get_req(); assert(m->get_header().type == MSG_OSD_PG_LOG); dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl; @@ -6229,7 +6238,7 @@ void OSD::handle_pg_log(OpRequestRef op) void OSD::handle_pg_info(OpRequestRef op) { - MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->request); + MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_INFO); dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl; @@ -6262,7 +6271,7 @@ void OSD::handle_pg_info(OpRequestRef op) void OSD::handle_pg_trim(OpRequestRef op) { - MOSDPGTrim *m = (MOSDPGTrim *)op->request; + MOSDPGTrim *m = (MOSDPGTrim *)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_TRIM); dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl; @@ -6315,7 +6324,7 @@ void OSD::handle_pg_trim(OpRequestRef op) void OSD::handle_pg_scan(OpRequestRef op) { - MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request); + MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "handle_pg_scan " << *m << " from " << m->get_source() << dendl; @@ -6343,7 +6352,7 @@ void OSD::handle_pg_scan(OpRequestRef op) void OSD::handle_pg_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request); + MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl; @@ -6371,7 +6380,7 @@ void OSD::handle_pg_backfill(OpRequestRef op) void OSD::handle_pg_backfill_reserve(OpRequestRef op) { - MBackfillReserve *m = static_cast<MBackfillReserve*>(op->request); + MBackfillReserve *m = static_cast<MBackfillReserve*>(op->get_req()); assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE); if (!require_osd_peer(op)) @@ -6415,7 +6424,7 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op) void OSD::handle_pg_recovery_reserve(OpRequestRef op) { - MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->request); + MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->get_req()); assert(m->get_header().type == MSG_OSD_RECOVERY_RESERVE); if (!require_osd_peer(op)) @@ -6467,7 +6476,7 @@ void OSD::handle_pg_query(OpRequestRef op) { assert(osd_lock.is_locked()); - MOSDPGQuery *m = (MOSDPGQuery*)op->request; + MOSDPGQuery *m = (MOSDPGQuery*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_QUERY); if (!require_osd_peer(op)) @@ -6554,7 +6563,7 @@ void OSD::handle_pg_query(OpRequestRef op) void OSD::handle_pg_remove(OpRequestRef op) { - MOSDPGRemove *m = (MOSDPGRemove *)op->request; + MOSDPGRemove *m = (MOSDPGRemove *)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_REMOVE); assert(osd_lock.is_locked()); @@ -6827,7 +6836,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err) void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); int flags; flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); @@ -6839,7 +6848,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (m->get_map_epoch() < pg->info.history.same_primary_since) { @@ -6858,7 +6867,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) void OSD::handle_op(OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (op_is_discardable(m)) { dout(10) << " discardable " << *m << dendl; @@ -6993,7 +7002,7 @@ void OSD::handle_op(OpRequestRef op) template<typename T, int MSGTYPE> void OSD::handle_replica_op(OpRequestRef op) { - T *m = static_cast<T *>(op->request); + T *m = static_cast<T *>(op->get_req()); assert(m->get_header().type == MSGTYPE); dout(10) << __func__ << *m << " epoch " << m->map_epoch << dendl; @@ -7047,24 +7056,24 @@ bool OSD::op_is_discardable(MOSDOp *op) */ void OSD::enqueue_op(PG *pg, OpRequestRef op) { - utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp(); - dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority() - << " cost " << op->request->get_cost() + utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp(); + dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority() + << " cost " << op->get_req()->get_cost() << " latency " << latency - << " " << *(op->request) << dendl; + << " " << *(op->get_req()) << dendl; pg->queue_op(op); } void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item) { - unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_cost(); + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict( - item.second->request->get_source_inst(), + item.second->get_req()->get_source_inst(), priority, item); else - pqueue.enqueue(item.second->request->get_source_inst(), + pqueue.enqueue(item.second->get_req()->get_source_inst(), priority, cost, item); osd->logger->set(l_osd_opq, pqueue.length()); } @@ -7079,14 +7088,14 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) pg_for_processing[&*(item.first)].pop_back(); } } - unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_cost(); + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict_front( - item.second->request->get_source_inst(), + item.second->get_req()->get_source_inst(), priority, item); else - pqueue.enqueue_front(item.second->request->get_source_inst(), + pqueue.enqueue_front(item.second->get_req()->get_source_inst(), priority, cost, item); osd->logger->set(l_osd_opq, pqueue.length()); } @@ -7138,11 +7147,11 @@ void OSD::dequeue_op( PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle) { - utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp(); - dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() - << " cost " << op->request->get_cost() + utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp(); + dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority() + << " cost " << op->get_req()->get_cost() << " latency " << latency - << " " << *(op->request) + << " " << *(op->get_req()) << " pg " << *pg << dendl; if (pg->deleting) return; @@ -7243,6 +7252,8 @@ const char** OSD::get_tracked_conf_keys() const { static const char* KEYS[] = { "osd_max_backfills", + "osd_op_complaint_time", "osd_op_log_threshold", + "osd_op_history_size", "osd_op_history_duration", NULL }; return KEYS; @@ -7255,13 +7266,23 @@ void OSD::handle_conf_change(const struct md_config_t *conf, service.local_reserver.set_max(cct->_conf->osd_max_backfills); service.remote_reserver.set_max(cct->_conf->osd_max_backfills); } + if (changed.count("osd_op_complaint_time") || + changed.count("osd_op_log_threshold")) { + op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time, + cct->_conf->osd_op_log_threshold); + } + if (changed.count("osd_op_history_size") || + changed.count("osd_op_history_duration")) { + op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size, + cct->_conf->osd_op_history_duration); + } } // -------------------------------- int OSD::init_op_flags(OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); vector<OSDOp>::iterator iter; // client flags have no bearing on whether an op is a read, write, etc. diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 1ffe3073051..2ed7a23086f 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -11,229 +11,21 @@ #include "messages/MOSDSubOp.h" #include "include/assert.h" -#define dout_subsys ceph_subsys_optracker -#undef dout_prefix -#define dout_prefix _prefix(_dout) -static ostream& _prefix(std::ostream* _dout) -{ - return *_dout << "--OSD::tracker-- "; -} OpRequest::OpRequest(Message *req, OpTracker *tracker) : - request(req), xitem(this), + TrackedOp(req, tracker), rmw_flags(0), - warn_interval_multiplier(1), - lock("OpRequest::lock"), - tracker(tracker), - hit_flag_points(0), latest_flag_point(0), - seq(0) { - received_time = request->get_recv_stamp(); - tracker->register_inflight_op(&xitem); + hit_flag_points(0), latest_flag_point(0) { if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) { // don't warn as quickly for low priority ops warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple; } } -void OpHistory::on_shutdown() -{ - arrived.clear(); - duration.clear(); - shutdown = true; -} - -void OpHistory::insert(utime_t now, OpRequestRef op) -{ - if (shutdown) - return; - duration.insert(make_pair(op->get_duration(), op)); - arrived.insert(make_pair(op->get_arrived(), op)); - cleanup(now); -} - -void OpHistory::cleanup(utime_t now) -{ - while (arrived.size() && - (now - arrived.begin()->first > - (double)(tracker->cct->_conf->osd_op_history_duration))) { - duration.erase(make_pair( - arrived.begin()->second->get_duration(), - arrived.begin()->second)); - arrived.erase(arrived.begin()); - } - - while (duration.size() > tracker->cct->_conf->osd_op_history_size) { - arrived.erase(make_pair( - duration.begin()->second->get_arrived(), - duration.begin()->second)); - duration.erase(duration.begin()); - } -} - -void OpHistory::dump_ops(utime_t now, Formatter *f) -{ - cleanup(now); - f->open_object_section("OpHistory"); - f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size); - f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration); - { - f->open_array_section("Ops"); - for (set<pair<utime_t, OpRequestRef> >::const_iterator i = - arrived.begin(); - i != arrived.end(); - ++i) { - f->open_object_section("Op"); - i->second->dump(now, f); - f->close_section(); - } - f->close_section(); - } - f->close_section(); -} - -void OpTracker::dump_historic_ops(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - utime_t now = ceph_clock_now(cct); - history.dump_ops(now, f); -} - -void OpTracker::dump_ops_in_flight(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - f->open_object_section("ops_in_flight"); // overall dump - f->dump_int("num_ops", ops_in_flight.size()); - f->open_array_section("ops"); // list of OpRequests - utime_t now = ceph_clock_now(cct); - for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { - f->open_object_section("op"); - (*p)->dump(now, f); - f->close_section(); // this OpRequest - } - f->close_section(); // list of OpRequests - f->close_section(); // overall dump -} - -void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - ops_in_flight.push_back(i); - ops_in_flight.back()->seq = seq++; -} - -void OpTracker::unregister_inflight_op(OpRequest *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - assert(i->xitem.get_list() == &ops_in_flight); - utime_t now = ceph_clock_now(cct); - i->xitem.remove_myself(); - i->request->clear_data(); - history.insert(now, OpRequestRef(i)); -} - -bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) -{ - Mutex::Locker locker(ops_in_flight_lock); - if (!ops_in_flight.size()) - return false; - - utime_t now = ceph_clock_now(cct); - utime_t too_old = now; - too_old -= cct->_conf->osd_op_complaint_time; - - utime_t oldest_secs = now - ops_in_flight.front()->received_time; - - dout(10) << "ops_in_flight.size: " << ops_in_flight.size() - << "; oldest is " << oldest_secs - << " seconds old" << dendl; - - if (oldest_secs < cct->_conf->osd_op_complaint_time) - return false; - - xlist<OpRequest*>::iterator i = ops_in_flight.begin(); - warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1); - - int slow = 0; // total slow - int warned = 0; // total logged - while (!i.end() && (*i)->received_time < too_old) { - slow++; - - // exponential backoff of warning intervals - if (((*i)->received_time + - (cct->_conf->osd_op_complaint_time * - (*i)->warn_interval_multiplier)) < now) { - // will warn - if (warning_vector.empty()) - warning_vector.push_back(""); - warned++; - if (warned > cct->_conf->osd_op_log_threshold) - break; - - utime_t age = now - (*i)->received_time; - stringstream ss; - ss << "slow request " << age << " seconds old, received at " << (*i)->received_time - << ": " << *((*i)->request) << " currently " - << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); - warning_vector.push_back(ss.str()); - - // only those that have been shown will backoff - (*i)->warn_interval_multiplier *= 2; - } - ++i; - } - - // only summarize if we warn about any. if everything has backed - // off, we will stay silent. - if (warned > 0) { - stringstream ss; - ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " - << oldest_secs << " secs"; - warning_vector[0] = ss.str(); - } - - return warning_vector.size(); -} - -void OpTracker::get_age_ms_histogram(pow2_hist_t *h) -{ - Mutex::Locker locker(ops_in_flight_lock); - - h->clear(); - - utime_t now = ceph_clock_now(NULL); - unsigned bin = 30; - uint32_t lb = 1 << (bin-1); // lower bound for this bin - int count = 0; - for (xlist<OpRequest*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { - utime_t age = now - (*i)->received_time; - uint32_t ms = (long)(age * 1000.0); - if (ms >= lb) { - count++; - continue; - } - if (count) - h->set(bin, count); - while (lb > ms) { - bin--; - lb >>= 1; - } - count = 1; - } - if (count) - h->set(bin, count); -} - -void OpRequest::dump(utime_t now, Formatter *f) const +void OpRequest::_dump(utime_t now, Formatter *f) const { Message *m = request; - stringstream name; - m->print(name); - f->dump_string("description", name.str().c_str()); // this OpRequest - f->dump_unsigned("rmw_flags", rmw_flags); - f->dump_stream("received_at") << received_time; - f->dump_float("age", now - received_time); - f->dump_float("duration", get_duration()); f->dump_string("flag_point", state_string()); if (m->get_orig_source().is_client()) { f->open_object_section("client_info"); @@ -257,50 +49,11 @@ void OpRequest::dump(utime_t now, Formatter *f) const } } -void OpTracker::mark_event(OpRequest *op, const string &dest) -{ - utime_t now = ceph_clock_now(cct); - return _mark_event(op, dest, now); -} - -void OpTracker::_mark_event(OpRequest *op, const string &evt, - utime_t time) -{ - Mutex::Locker locker(ops_in_flight_lock); - dout(5) << "reqid: " << op->get_reqid() << ", seq: " << op->seq - << ", time: " << time << ", event: " << evt - << ", request: " << *op->request << dendl; -} - -void OpTracker::RemoveOnDelete::operator()(OpRequest *op) { - op->mark_event("done"); - tracker->unregister_inflight_op(op); - // Do not delete op, unregister_inflight_op took control -} - -OpRequestRef OpTracker::create_request(Message *ref) -{ - OpRequestRef retval(new OpRequest(ref, this), - RemoveOnDelete(this)); - - if (ref->get_type() == CEPH_MSG_OSD_OP) { - retval->reqid = static_cast<MOSDOp*>(ref)->get_reqid(); - } else if (ref->get_type() == MSG_OSD_SUBOP) { - retval->reqid = static_cast<MOSDSubOp*>(ref)->reqid; - } - _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); - _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); - _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); - _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); - return retval; -} - -void OpRequest::mark_event(const string &event) +void OpRequest::init_from_message() { - utime_t now = ceph_clock_now(tracker->cct); - { - Mutex::Locker l(lock); - events.push_back(make_pair(now, event)); + if (request->get_type() == CEPH_MSG_OSD_OP) { + reqid = static_cast<MOSDOp*>(request)->get_reqid(); + } else if (request->get_type() == MSG_OSD_SUBOP) { + reqid = static_cast<MOSDSubOp*>(request)->reqid; } - tracker->mark_event(this, event); } diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 9634be87846..87571f58787 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -25,87 +25,12 @@ #include "common/TrackedOp.h" #include "osd/osd_types.h" -struct OpRequest; -class OpTracker; -typedef std::tr1::shared_ptr<OpRequest> OpRequestRef; -class OpHistory { - set<pair<utime_t, OpRequestRef> > arrived; - set<pair<double, OpRequestRef> > duration; - void cleanup(utime_t now); - bool shutdown; - OpTracker *tracker; - -public: - OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {} - ~OpHistory() { - assert(arrived.empty()); - assert(duration.empty()); - } - void insert(utime_t now, OpRequestRef op); - void dump_ops(utime_t now, Formatter *f); - void on_shutdown(); -}; - -class OpTracker { - class RemoveOnDelete { - OpTracker *tracker; - public: - RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} - void operator()(OpRequest *op); - }; - friend class RemoveOnDelete; - friend class OpRequest; - friend class OpHistory; - uint64_t seq; - Mutex ops_in_flight_lock; - xlist<OpRequest *> ops_in_flight; - OpHistory history; - -protected: - CephContext *cct; - -public: - OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {} - void dump_ops_in_flight(Formatter *f); - void dump_historic_ops(Formatter *f); - void register_inflight_op(xlist<OpRequest*>::item *i); - void unregister_inflight_op(OpRequest *i); - - void get_age_ms_histogram(pow2_hist_t *h); - - /** - * Look for Ops which are too old, and insert warning - * strings for each Op that is too old. - * - * @param warning_strings A vector<string> reference which is filled - * with a warning string for each old Op. - * @return True if there are any Ops to warn on, false otherwise. - */ - bool check_ops_in_flight(std::vector<string> &warning_strings); - void mark_event(OpRequest *op, const string &evt); - void _mark_event(OpRequest *op, const string &evt, utime_t now); - OpRequestRef create_request(Message *req); - void on_shutdown() { - Mutex::Locker l(ops_in_flight_lock); - history.on_shutdown(); - } - ~OpTracker() { - assert(ops_in_flight.empty()); - } -}; - /** * The OpRequest takes in a Message* and takes over a single reference * to it, which it puts() when destroyed. - * OpRequest is itself ref-counted. The expectation is that you get a Message - * you want to track, create an OpRequest with it, and then pass around that OpRequest - * the way you used to pass around the Message. */ struct OpRequest : public TrackedOp { friend class OpTracker; - friend class OpHistory; - Message *request; - xlist<OpRequest*>::item xitem; // rmw flags int rmw_flags; @@ -134,28 +59,12 @@ struct OpRequest : public TrackedOp { void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; } void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; } - utime_t received_time; - uint32_t warn_interval_multiplier; - utime_t get_arrived() const { - return received_time; - } - double get_duration() const { - return events.size() ? - (events.rbegin()->first - received_time) : - 0.0; - } - - void dump(utime_t now, Formatter *f) const; + void _dump(utime_t now, Formatter *f) const; private: - list<pair<utime_t, string> > events; - string current; - Mutex lock; - OpTracker *tracker; osd_reqid_t reqid; uint8_t hit_flag_points; uint8_t latest_flag_point; - uint64_t seq; static const uint8_t flag_queued_for_pg=1 << 0; static const uint8_t flag_reached_pg = 1 << 1; static const uint8_t flag_delayed = 1 << 2; @@ -164,12 +73,8 @@ private: static const uint8_t flag_commit_sent = 1 << 5; OpRequest(Message *req, OpTracker *tracker); -public: - ~OpRequest() { - assert(request); - request->put(); - } +public: bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; } bool been_reached_pg() { return hit_flag_points & flag_reached_pg; } bool been_delayed() { return hit_flag_points & flag_delayed; } @@ -233,10 +138,15 @@ public: latest_flag_point = flag_commit_sent; } - void mark_event(const string &event); osd_reqid_t get_reqid() const { return reqid; } + + void init_from_message(); + + typedef std::tr1::shared_ptr<OpRequest> Ref; }; +typedef OpRequest::Ref OpRequestRef; + #endif /* OPREQUEST_H_ */ diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 1d9ed5f6a31..8f7d3ccb684 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1332,10 +1332,10 @@ void PG::do_pending_flush() bool PG::op_has_sufficient_caps(OpRequestRef op) { // only check MOSDOp - if (op->request->get_type() != CEPH_MSG_OSD_OP) + if (op->get_req()->get_type() != CEPH_MSG_OSD_OP) return true; - MOSDOp *req = static_cast<MOSDOp*>(op->request); + MOSDOp *req = static_cast<MOSDOp*>(op->get_req()); OSD::Session *session = (OSD::Session *)req->get_connection()->get_priv(); if (!session) { @@ -1417,7 +1417,7 @@ void PG::replay_queued_ops() c = p->first; } dout(10) << "activate replay " << p->first << " " - << *p->second->request << dendl; + << *p->second->get_req() << dendl; replay.push_back(p->second); } replay_queue.clear(); @@ -2618,7 +2618,7 @@ void PG::unreg_next_scrub() void PG::sub_op_scrub_map(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_map" << dendl; @@ -2804,7 +2804,7 @@ void PG::_request_scrub_map(int replica, eversion_t version, void PG::sub_op_scrub_reserve(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_reserve" << dendl; @@ -2824,7 +2824,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op) void PG::sub_op_scrub_reserve_reply(OpRequestRef op) { - MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request); + MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req()); assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(7) << "sub_op_scrub_reserve_reply" << dendl; @@ -2857,7 +2857,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequestRef op) void PG::sub_op_scrub_unreserve(OpRequestRef op) { - assert(op->request->get_header().type == MSG_OSD_SUBOP); + assert(op->get_req()->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_unreserve" << dendl; op->mark_started(); @@ -2869,7 +2869,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op) { op->mark_started(); - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_stop" << dendl; @@ -4732,7 +4732,7 @@ ostream& operator<<(ostream& out, const PG& pg) bool PG::can_discard_op(OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); if (OSD::op_is_discardable(m)) { dout(20) << " discard " << *m << dendl; return true; @@ -4760,7 +4760,7 @@ bool PG::can_discard_op(OpRequestRef op) template<typename T, int MSGTYPE> bool PG::can_discard_replica_op(OpRequestRef op) { - T *m = static_cast<T *>(op->request); + T *m = static_cast<T *>(op->get_req()); assert(m->get_header().type == MSGTYPE); // same pg? @@ -4776,7 +4776,7 @@ bool PG::can_discard_replica_op(OpRequestRef op) bool PG::can_discard_scan(OpRequestRef op) { - MOSDPGScan *m = static_cast<MOSDPGScan *>(op->request); + MOSDPGScan *m = static_cast<MOSDPGScan *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); if (old_peering_msg(m->map_epoch, m->query_epoch)) { @@ -4788,7 +4788,7 @@ bool PG::can_discard_scan(OpRequestRef op) bool PG::can_discard_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->request); + MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); if (old_peering_msg(m->map_epoch, m->query_epoch)) { @@ -4802,7 +4802,7 @@ bool PG::can_discard_backfill(OpRequestRef op) bool PG::can_discard_request(OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: return can_discard_op(op); case MSG_OSD_SUBOP: @@ -4827,55 +4827,55 @@ bool PG::can_discard_request(OpRequestRef op) bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits) { unsigned mask = ~((~0)<<bits); - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: - return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match; + return (static_cast<MOSDOp*>(op->get_req())->get_pg().m_seed & mask) == match; } return false; } bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: return !have_same_or_newer_map( curmap, - static_cast<MOSDOp*>(op->request)->get_map_epoch()); + static_cast<MOSDOp*>(op->get_req())->get_map_epoch()); case MSG_OSD_SUBOP: return !have_same_or_newer_map( curmap, - static_cast<MOSDSubOp*>(op->request)->map_epoch); + static_cast<MOSDSubOp*>(op->get_req())->map_epoch); case MSG_OSD_SUBOPREPLY: return !have_same_or_newer_map( curmap, - static_cast<MOSDSubOpReply*>(op->request)->map_epoch); + static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch); case MSG_OSD_PG_SCAN: return !have_same_or_newer_map( curmap, - static_cast<MOSDPGScan*>(op->request)->map_epoch); + static_cast<MOSDPGScan*>(op->get_req())->map_epoch); case MSG_OSD_PG_BACKFILL: return !have_same_or_newer_map( curmap, - static_cast<MOSDPGBackfill*>(op->request)->map_epoch); + static_cast<MOSDPGBackfill*>(op->get_req())->map_epoch); case MSG_OSD_PG_PUSH: return !have_same_or_newer_map( curmap, - static_cast<MOSDPGPush*>(op->request)->map_epoch); + static_cast<MOSDPGPush*>(op->get_req())->map_epoch); case MSG_OSD_PG_PULL: return !have_same_or_newer_map( curmap, - static_cast<MOSDPGPull*>(op->request)->map_epoch); + static_cast<MOSDPGPull*>(op->get_req())->map_epoch); case MSG_OSD_PG_PUSH_REPLY: return !have_same_or_newer_map( curmap, - static_cast<MOSDPGPushReply*>(op->request)->map_epoch); + static_cast<MOSDPGPushReply*>(op->get_req())->map_epoch); } assert(0); return false; diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index ddc39d70372..9529e15ae77 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -96,7 +96,7 @@ bool ReplicatedBackend::handle_message( ) { dout(10) << __func__ << ": " << op << dendl; - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case MSG_OSD_PG_PUSH: // TODOXXX: needs to be active possibly do_push(op); @@ -111,7 +111,7 @@ bool ReplicatedBackend::handle_message( return true; case MSG_OSD_SUBOP: { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); if (m->ops.size() >= 1) { OSDOp *first = &m->ops[0]; switch (first->op.op) { @@ -130,7 +130,7 @@ bool ReplicatedBackend::handle_message( } case MSG_OSD_SUBOPREPLY: { - MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request); + MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req()); if (r->ops.size() >= 1) { OSDOp &first = r->ops[0]; switch (first.op.op) { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6c8b092ca01..f466eb8ccdc 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -86,9 +86,9 @@ static void log_subop_stats( { utime_t now = ceph_clock_now(g_ceph_context); utime_t latency = now; - latency -= op->request->get_recv_stamp(); + latency -= op->get_req()->get_recv_stamp(); - uint64_t inb = op->request->get_data().length(); + uint64_t inb = op->get_req()->get_data().length(); osd->logger->inc(l_osd_sop); @@ -583,7 +583,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op) void ReplicatedPG::do_pg_op(OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp *>(op->request); + MOSDOp *m = static_cast<MOSDOp *>(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); dout(10) << "do_pg_op " << *m << dendl; @@ -828,7 +828,7 @@ void ReplicatedPG::do_request( if (pgbackend->handle_message(op)) return; - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: if (is_replay() || !is_active()) { dout(20) << " replay, waiting for active on " << op << dendl; @@ -866,7 +866,7 @@ void ReplicatedPG::do_request( */ void ReplicatedPG::do_op(OpRequestRef op) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (op->includes_pg_op()) { if (pg_op_must_wait(m)) { @@ -1172,7 +1172,7 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc) { - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(), flags); @@ -1188,7 +1188,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) { dout(10) << __func__ << " " << ctx << dendl; OpRequestRef op = ctx->op; - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); ObjectContextRef obc = ctx->obc; const hobject_t& soid = obc->obs.oi.soid; map<hobject_t,ObjectContextRef>& src_obc = ctx->src_obc; @@ -1412,16 +1412,16 @@ void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv) void ReplicatedPG::log_op_stats(OpContext *ctx) { OpRequestRef op = ctx->op; - MOSDOp *m = static_cast<MOSDOp*>(op->request); + MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); utime_t now = ceph_clock_now(cct); utime_t latency = now; - latency -= ctx->op->request->get_recv_stamp(); + latency -= ctx->op->get_req()->get_recv_stamp(); utime_t rlatency; if (ctx->readable_stamp != utime_t()) { rlatency = ctx->readable_stamp; - rlatency -= ctx->op->request->get_recv_stamp(); + rlatency -= ctx->op->get_req()->get_recv_stamp(); } uint64_t inb = ctx->bytes_written; @@ -1460,10 +1460,10 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) void ReplicatedPG::do_sub_op(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(have_same_or_newer_map(m->map_epoch)); assert(m->get_header().type == MSG_OSD_SUBOP); - dout(15) << "do_sub_op " << *op->request << dendl; + dout(15) << "do_sub_op " << *op->get_req() << dendl; OSDOp *first = NULL; if (m->ops.size() >= 1) { @@ -1501,7 +1501,7 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) void ReplicatedPG::do_sub_op_reply(OpRequestRef op) { - MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->request); + MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->get_req()); assert(r->get_header().type == MSG_OSD_SUBOPREPLY); if (r->ops.size() >= 1) { OSDOp& first = r->ops[0]; @@ -1519,7 +1519,7 @@ void ReplicatedPG::do_scan( OpRequestRef op, ThreadPool::TPHandle &handle) { - MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request); + MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "do_scan " << *m << dendl; @@ -1594,7 +1594,7 @@ void ReplicatedPG::do_scan( void ReplicatedBackend::_do_push(OpRequestRef op) { - MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request); + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@ -1646,7 +1646,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> { void ReplicatedBackend::_do_pull_response(OpRequestRef op) { - MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request); + MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@ -1691,7 +1691,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op) void ReplicatedBackend::do_pull(OpRequestRef op) { - MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request); + MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PULL); int from = m->get_source().num(); @@ -1707,7 +1707,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op) void ReplicatedBackend::do_push_reply(OpRequestRef op) { - MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request); + MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY); int from = m->get_source().num(); @@ -1728,7 +1728,7 @@ void ReplicatedBackend::do_push_reply(OpRequestRef op) void ReplicatedPG::do_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request); + MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "do_backfill " << *m << dendl; @@ -2392,7 +2392,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ObjectContextRef src_obc; if (ceph_osd_op_type_multi(op.op)) { - MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request); + MOSDOp *m = static_cast<MOSDOp *>(ctx->op->get_req()); object_locator_t src_oloc; get_src_oloc(soid.oid, m->get_object_locator(), src_oloc); hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash, @@ -3190,10 +3190,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) << " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl; dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl; dout(10) << "watch: peer_addr=" - << ctx->op->request->get_connection()->get_peer_addr() << dendl; + << ctx->op->get_req()->get_connection()->get_peer_addr() << dendl; watch_info_t w(cookie, cct->_conf->osd_client_watch_timeout, - ctx->op->request->get_connection()->get_peer_addr()); + ctx->op->get_req()->get_connection()->get_peer_addr()); if (do_watch) { if (oi.watchers.count(make_pair(cookie, entity))) { dout(10) << " found existing watch " << w << " by " << entity << dendl; @@ -4038,7 +4038,7 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum void ReplicatedPG::do_osd_op_effects(OpContext *ctx) { - ConnectionRef conn(ctx->op->request->get_connection()); + ConnectionRef conn(ctx->op->get_req()->get_connection()); boost::intrusive_ptr<OSD::Session> session( (OSD::Session *)conn->get_priv()); session->put(); // get_priv() takes a ref, and so does the intrusive_ptr @@ -4697,7 +4697,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) { MOSDOp *m = NULL; if (repop->ctx->op) - m = static_cast<MOSDOp *>(repop->ctx->op->request); + m = static_cast<MOSDOp *>(repop->ctx->op->get_req()); if (m) dout(10) << "eval_repop " << *repop @@ -4773,7 +4773,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin(); i != waiting_for_ack[repop->v].end(); ++i) { - MOSDOp *m = (MOSDOp*)(*i)->request; + MOSDOp *m = (MOSDOp*)(*i)->get_req(); MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->set_reply_versions(repop->ctx->at_version, repop->ctx->user_at_version); @@ -4869,7 +4869,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) get_osdmap()->get_epoch(), repop->rep_tid, repop->ctx->at_version); if (ctx->op && - ((static_cast<MOSDOp *>(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { + ((static_cast<MOSDOp *>(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { // replicate original op for parallel execution on replica assert(0 == "broken implementation, do not use"); } @@ -4910,7 +4910,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe tid_t rep_tid) { if (ctx->op) - dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl; + dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl; else dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl; @@ -4941,7 +4941,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, MOSDOp *m = NULL; if (repop->ctx->op) - m = static_cast<MOSDOp *>(repop->ctx->op->request); + m = static_cast<MOSDOp *>(repop->ctx->op->get_req()); if (m) dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m @@ -5487,7 +5487,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) void ReplicatedPG::sub_op_modify(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); const hobject_t& soid = m->poid; @@ -5606,8 +5606,8 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) rm->applied = true; if (!pg_has_reset_since(rm->epoch_started)) { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; - MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->request); + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl; + MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); if (!rm->committed) { @@ -5629,7 +5629,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) } } } else { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << " from epoch " << rm->epoch_started << " < last_peering_reset " << last_peering_reset << dendl; } @@ -5651,19 +5651,19 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) if (!pg_has_reset_since(rm->epoch_started)) { // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op->request + dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() << ", sending commit to osd." << rm->ackerosd << dendl; if (get_osdmap()->is_up(rm->ackerosd)) { last_complete_ondisk = rm->last_complete; - MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->request), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); commit->set_last_complete_ondisk(rm->last_complete); commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); } } else { - dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request + dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req() << " from epoch " << rm->epoch_started << " < last_peering_reset " << last_peering_reset << dendl; } @@ -5680,7 +5680,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) void ReplicatedPG::sub_op_modify_reply(OpRequestRef op) { - MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request); + MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req()); assert(r->get_header().type == MSG_OSD_SUBOPREPLY); op->mark_started(); @@ -6630,7 +6630,7 @@ void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) { - MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request); + MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req()); const hobject_t& soid = reply->get_poid(); assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; @@ -6643,7 +6643,7 @@ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) PushOp pop; bool more = handle_push_reply(peer, rop, &pop); if (more) - send_push_op_legacy(op->request->get_priority(), peer, pop); + send_push_op_legacy(op->get_req()->get_priority(), peer, pop); } bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) @@ -6724,7 +6724,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) */ void ReplicatedBackend::sub_op_pull(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); op->mark_started(); @@ -6917,7 +6917,7 @@ void ReplicatedBackend::trim_pushed_data( void ReplicatedBackend::sub_op_push(OpRequestRef op) { op->mark_started(); - MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req()); PushOp pop; pop.soid = m->recovery_info.soid; @@ -6949,14 +6949,14 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) C_ReplicatedBackend_OnPullComplete *c = new C_ReplicatedBackend_OnPullComplete( this, - op->request->get_priority()); + op->get_req()->get_priority()); c->to_continue.swap(to_continue); t->register_on_complete( new C_QueueInWQ( &osd->push_wq, get_parent()->bless_gencontext(c))); } - run_recovery_op(h, op->request->get_priority()); + run_recovery_op(h, op->get_req()->get_priority()); } else { PushReplyOp resp; MOSDSubOpReply *reply = new MOSDSubOpReply( @@ -7001,7 +7001,7 @@ void ReplicatedBackend::_failed_push(int from, const hobject_t &soid) void ReplicatedPG::sub_op_remove(OpRequestRef op) { - MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request); + MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_remove " << m->poid << dendl; @@ -7224,7 +7224,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) if (requeue) { if (repop->ctx->op) { - dout(10) << " requeuing " << *repop->ctx->op->request << dendl; + dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl; rq.push_back(repop->ctx->op); repop->ctx->op = OpRequestRef(); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c277c0d3f86..27c9d1bb605 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -993,7 +993,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) //<< " wfnvram=" << repop.waitfor_nvram << " wfdisk=" << repop.waitfor_disk; if (repop.ctx->op) - out << " op=" << *(repop.ctx->op->request); + out << " op=" << *(repop.ctx->op->get_req()); out << ")"; return out; } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 59b71cc6f67..a54fc65f375 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -23,6 +23,7 @@ #include "include/types.h" #include "include/utime.h" #include "include/CompatSet.h" +#include "include/histogram.h" #include "include/interval_set.h" #include "common/snap_types.h" #include "common/Formatter.h" @@ -555,67 +556,6 @@ inline ostream& operator<<(ostream& out, const eversion_t e) { return out << e.epoch << "'" << e.version; } - -/** - * power of 2 histogram - */ -struct pow2_hist_t { - /** - * histogram - * - * bin size is 2^index - * value is count of elements that are <= the current bin but > the previous bin. - */ - vector<int32_t> h; - -private: - /// expand to at least another's size - void _expand_to(unsigned s) { - if (s > h.size()) - h.resize(s, 0); - } - /// drop useless trailing 0's - void _contract() { - unsigned p = h.size(); - while (p > 0 && h[p-1] == 0) - --p; - h.resize(p); - } - -public: - void clear() { - h.clear(); - } - void set(int bin, int32_t v) { - _expand_to(bin + 1); - h[bin] = v; - _contract(); - } - - void add(const pow2_hist_t& o) { - _expand_to(o.h.size()); - for (unsigned p = 0; p < o.h.size(); ++p) - h[p] += o.h[p]; - _contract(); - } - void sub(const pow2_hist_t& o) { - _expand_to(o.h.size()); - for (unsigned p = 0; p < o.h.size(); ++p) - h[p] -= o.h[p]; - _contract(); - } - - int32_t upper_bound() const { - return 1 << h.size(); - } - - void dump(Formatter *f) const; - void encode(bufferlist &bl) const; - void decode(bufferlist::iterator &bl); - static void generate_test_instances(std::list<pow2_hist_t*>& o); -}; -WRITE_CLASS_ENCODER(pow2_hist_t) - /** * filestore_perf_stat_t * diff --git a/src/test/common/test_bloom_filter.cc b/src/test/common/test_bloom_filter.cc index 8e3661b2cc1..cfd41305caa 100644 --- a/src/test/common/test_bloom_filter.cc +++ b/src/test/common/test_bloom_filter.cc @@ -23,7 +23,17 @@ TEST(BloomFilter, Basic) { ASSERT_TRUE(bf.contains("bar")); } +TEST(BloomFilter, Empty) { + bloom_filter bf; + for (int i=0; i<100; ++i) { + ASSERT_FALSE(bf.contains(i)); + ASSERT_FALSE(bf.contains(stringify(i))); + } +} + TEST(BloomFilter, Sweep) { + std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield); + std::cout.precision(5); std::cout << "# max\tfpp\tactual\tsize\tB/insert" << std::endl; for (int ex = 3; ex < 12; ex += 2) { for (float fpp = .001; fpp < .5; fpp *= 4.0) { @@ -62,7 +72,9 @@ TEST(BloomFilter, Sweep) { } TEST(BloomFilter, SweepInt) { - std::cout << "# max\tfpp\tactual\tsize\tB/insert" << std::endl; + std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield); + std::cout.precision(5); + std::cout << "# max\tfpp\tactual\tsize\tB/insert\tdensity\tapprox_element_count" << std::endl; for (int ex = 3; ex < 12; ex += 2) { for (float fpp = .001; fpp < .5; fpp *= 4.0) { int max = 2 << ex; @@ -92,15 +104,70 @@ TEST(BloomFilter, SweepInt) { double byte_per_insert = (double)bl.length() / (double)max; - std::cout << max << "\t" << fpp << "\t" << actual << "\t" << bl.length() << "\t" << byte_per_insert << std::endl; + std::cout << max << "\t" << fpp << "\t" << actual << "\t" << bl.length() << "\t" << byte_per_insert + << "\t" << bf.density() << "\t" << bf.approx_unique_element_count() << std::endl; ASSERT_TRUE(actual < fpp * 10); ASSERT_TRUE(actual > fpp / 10); + ASSERT_TRUE(bf.density() > 0.40); + ASSERT_TRUE(bf.density() < 0.60); } } } +TEST(BloomFilter, CompressibleSweep) { + std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield); + std::cout.precision(5); + std::cout << "# max\tins\test ins\tafter\ttgtfpp\tactual\tsize\tb/elem\n"; + float fpp = .01; + int max = 1024; + for (int div = 1; div < 10; div++) { + compressible_bloom_filter bf(max, fpp, 1); + int t = max/div; + for (int n = 0; n < t; n++) + bf.insert(n); + + unsigned est = bf.approx_unique_element_count(); + if (div > 1) + bf.compress(1.0 / div); + + for (int n = 0; n < t; n++) + ASSERT_TRUE(bf.contains(n)); + + int test = max * 100; + int hit = 0; + for (int n = 0; n < test; n++) + if (bf.contains(100000 + n)) + hit++; + + double actual = (double)hit / (double)test; + + bufferlist bl; + ::encode(bf, bl); + + double byte_per_insert = (double)bl.length() / (double)max; + unsigned est_after = bf.approx_unique_element_count(); + std::cout << max + << "\t" << t + << "\t" << est + << "\t" << est_after + << "\t" << fpp + << "\t" << actual + << "\t" << bl.length() << "\t" << byte_per_insert + << std::endl; + + ASSERT_TRUE(actual < fpp * 2.0); + ASSERT_TRUE(actual > fpp / 2.0); + ASSERT_TRUE(est_after < est * 2); + ASSERT_TRUE(est_after > est / 2); + } +} + + + TEST(BloomFilter, BinSweep) { + std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield); + std::cout.precision(5); int total_max = 16384; float total_fpp = .01; std::cout << "total_inserts " << total_max << " target-fpp " << total_fpp << std::endl; diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index 59e55a11b23..18ed795c3ef 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -6,6 +6,7 @@ TYPE(filepath) #include "common/bloom_filter.hpp" TYPE(bloom_filter) +TYPE(compressible_bloom_filter) #include "common/snap_types.h" TYPE(SnapContext) @@ -35,13 +36,15 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental) #include "crush/CrushWrapper.h" TYPE(CrushWrapper) +#include "include/histogram.h" +TYPE(pow2_hist_t) + #include "osd/osd_types.h" TYPE(osd_reqid_t) TYPE(object_locator_t) TYPE(request_redirect_t) TYPE(pg_t) TYPE(coll_t) -TYPE(pow2_hist_t) TYPE(filestore_perf_stat_t) TYPE(osd_stat_t) TYPE(OSDSuperblock) |