summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--PendingReleaseNotes34
-rw-r--r--doc/release-notes.rst53
-rw-r--r--src/common/TrackedOp.cc265
-rw-r--r--src/common/TrackedOp.h154
-rw-r--r--src/common/bloom_filter.cc89
-rw-r--r--src/common/bloom_filter.hpp181
-rw-r--r--src/include/Makefile.am1
-rw-r--r--src/include/histogram.h76
-rw-r--r--src/mon/PGMonitor.h1
-rw-r--r--src/objclass/class_api.cc2
-rw-r--r--src/os/Makefile.am3
-rw-r--r--src/osd/Makefile.am1
-rw-r--r--src/osd/OSD.cc103
-rw-r--r--src/osd/OpRequest.cc263
-rw-r--r--src/osd/OpRequest.h106
-rw-r--r--src/osd/PG.cc48
-rw-r--r--src/osd/ReplicatedBackend.cc6
-rw-r--r--src/osd/ReplicatedPG.cc88
-rw-r--r--src/osd/ReplicatedPG.h2
-rw-r--r--src/osd/osd_types.h62
-rw-r--r--src/test/common/test_bloom_filter.cc71
-rw-r--r--src/test/encoding/types.h5
22 files changed, 977 insertions, 637 deletions
diff --git a/PendingReleaseNotes b/PendingReleaseNotes
index 779a081480f..a3ec73290f3 100644
--- a/PendingReleaseNotes
+++ b/PendingReleaseNotes
@@ -1,37 +1,3 @@
-v0.70
-~~~~~
-
-* librados::Rados::pool_create_async() and librados::Rados::pool_delete_async()
- don't drop a reference to the completion object on error, caller needs to take
- care of that. This has never really worked correctly and we were leaking an
- object
-
-* 'ceph osd crush set <id> <weight> <loc..>' no longer adds the osd to the
- specified location, as that's a job for 'ceph osd crush add'. It will
- however continue to work just the same as long as the osd already exists
- in the crush map.
-
-* The OSD now enforces that class write methods cannot both mutate an
- object and return data. The rbd.assign_bid method, the lone
- offender, has been removed. This breaks compatibility with
- pre-bobtail librbd clients by preventing them from creating new
- images.
-
-* librados now returns on commit instead of ack for synchronous calls.
- This is a bit safer in the case where both OSDs and the client crash, and
- is probably how it should have been acting from the beginning. Users are
- unlikely to notice but it could result in lower performance in some
- circumstances. Those who care should switch to using the async interfaces,
- which let you specify safety semantics precisely.
-
-* The C++ librados AioComplete::get_version() method was incorrectly
- returning an int (usually 32-bits). To avoid breaking library
- compatibility, a get_version64() method is added that returns the
- full-width value. The old method is deprecated and will be removed
- in a future release. Users of the C++ librados API that make use of
- the get_version() method should modify their code to avoid getting a
- value that is truncated from 64 to to 32 bits.
-
v0.71
~~~~~
diff --git a/doc/release-notes.rst b/doc/release-notes.rst
index bb1dfe4bfec..2b566baa0ea 100644
--- a/doc/release-notes.rst
+++ b/doc/release-notes.rst
@@ -2,6 +2,37 @@
Release Notes
===============
+v0.70
+-----
+
+Upgrading
+~~~~~~~~~
+
+* librados::Rados::pool_create_async() and librados::Rados::pool_delete_async()
+ don't drop a reference to the completion object on error, caller needs to take
+ care of that. This has never really worked correctly and we were leaking an
+ object
+
+* 'ceph osd crush set <id> <weight> <loc..>' no longer adds the osd to the
+ specified location, as that's a job for 'ceph osd crush add'. It will
+ however continue to work just the same as long as the osd already exists
+ in the crush map.
+
+Notable Changes
+~~~~~~~~~~~~~~~
+
+* mon: a few 'ceph mon add' races fixed (command is now idempotent) (Joao Luis)
+* crush: fix name caching
+* rgw: fix a few minor memory leaks (Yehuda Sadeh)
+* ceph: improve parsing of CEPH_ARGS (Benoit Knecht)
+* mon: avoid rewriting full osdmaps on restart (Joao Luis)
+* crc32c: fix optimized crc32c code (it now detects arch support properly)
+* mon: fix 'ceph osd crush reweight ...' (Joao Luis)
+* osd: revert xattr size limit (fixes large rgw uploads)
+* mds: fix heap profiler commands (Joao Luis)
+* rgw: fix inefficient use of std::list::size() (Yehuda Sadeh)
+
+
v0.69
-----
@@ -19,6 +50,28 @@ Upgrading
the because the server-side behavior has changed it is possible that
an application misusing the interface may now get errors.
+* The OSD now enforces that class write methods cannot both mutate an
+ object and return data. The rbd.assign_bid method, the lone
+ offender, has been removed. This breaks compatibility with
+ pre-bobtail librbd clients by preventing them from creating new
+ images.
+
+* librados now returns on commit instead of ack for synchronous calls.
+ This is a bit safer in the case where both OSDs and the client crash, and
+ is probably how it should have been acting from the beginning. Users are
+ unlikely to notice but it could result in lower performance in some
+ circumstances. Those who care should switch to using the async interfaces,
+ which let you specify safety semantics precisely.
+
+* The C++ librados AioComplete::get_version() method was incorrectly
+ returning an int (usually 32-bits). To avoid breaking library
+ compatibility, a get_version64() method is added that returns the
+ full-width value. The old method is deprecated and will be removed
+ in a future release. Users of the C++ librados API that make use of
+ the get_version() method should modify their code to avoid getting a
+ value that is truncated from 64 to to 32 bits.
+
+
Notable Changes
~~~~~~~~~~~~~~~
diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc
new file mode 100644
index 00000000000..d1dbc1e7135
--- /dev/null
+++ b/src/common/TrackedOp.cc
@@ -0,0 +1,265 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ * Copyright 2013 Inktank
+ */
+
+#include "TrackedOp.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include <vector>
+#include "common/debug.h"
+#include "common/config.h"
+#include "msg/Message.h"
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_optracker
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static ostream& _prefix(std::ostream* _dout)
+{
+ return *_dout << "-- op tracker -- ";
+}
+
+void OpHistory::on_shutdown()
+{
+ arrived.clear();
+ duration.clear();
+ shutdown = true;
+}
+
+void OpHistory::insert(utime_t now, TrackedOpRef op)
+{
+ if (shutdown)
+ return;
+ duration.insert(make_pair(op->get_duration(), op));
+ arrived.insert(make_pair(op->get_arrived(), op));
+ cleanup(now);
+}
+
+void OpHistory::cleanup(utime_t now)
+{
+ while (arrived.size() &&
+ (now - arrived.begin()->first >
+ (double)(history_duration))) {
+ duration.erase(make_pair(
+ arrived.begin()->second->get_duration(),
+ arrived.begin()->second));
+ arrived.erase(arrived.begin());
+ }
+
+ while (duration.size() > history_size) {
+ arrived.erase(make_pair(
+ duration.begin()->second->get_arrived(),
+ duration.begin()->second));
+ duration.erase(duration.begin());
+ }
+}
+
+void OpHistory::dump_ops(utime_t now, Formatter *f)
+{
+ cleanup(now);
+ f->open_object_section("OpHistory");
+ f->dump_int("num to keep", history_size);
+ f->dump_int("duration to keep", history_duration);
+ {
+ f->open_array_section("Ops");
+ for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
+ arrived.begin();
+ i != arrived.end();
+ ++i) {
+ f->open_object_section("Op");
+ i->second->dump(now, f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void OpTracker::dump_historic_ops(Formatter *f)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ utime_t now = ceph_clock_now(cct);
+ history.dump_ops(now, f);
+}
+
+void OpTracker::dump_ops_in_flight(Formatter *f)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ f->open_object_section("ops_in_flight"); // overall dump
+ f->dump_int("num_ops", ops_in_flight.size());
+ f->open_array_section("ops"); // list of TrackedOps
+ utime_t now = ceph_clock_now(cct);
+ for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
+ f->open_object_section("op");
+ (*p)->dump(now, f);
+ f->close_section(); // this TrackedOp
+ }
+ f->close_section(); // list of TrackedOps
+ f->close_section(); // overall dump
+}
+
+void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ ops_in_flight.push_back(i);
+ ops_in_flight.back()->seq = seq++;
+}
+
+void OpTracker::unregister_inflight_op(TrackedOp *i)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ assert(i->xitem.get_list() == &ops_in_flight);
+ utime_t now = ceph_clock_now(cct);
+ i->xitem.remove_myself();
+ i->request->clear_data();
+ history.insert(now, TrackedOpRef(i));
+}
+
+bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ if (!ops_in_flight.size())
+ return false;
+
+ utime_t now = ceph_clock_now(cct);
+ utime_t too_old = now;
+ too_old -= complaint_time;
+
+ utime_t oldest_secs = now - ops_in_flight.front()->get_arrived();
+
+ dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
+ << "; oldest is " << oldest_secs
+ << " seconds old" << dendl;
+
+ if (oldest_secs < complaint_time)
+ return false;
+
+ xlist<TrackedOp*>::iterator i = ops_in_flight.begin();
+ warning_vector.reserve(log_threshold + 1);
+
+ int slow = 0; // total slow
+ int warned = 0; // total logged
+ while (!i.end() && (*i)->get_arrived() < too_old) {
+ slow++;
+
+ // exponential backoff of warning intervals
+ if (((*i)->get_arrived() +
+ (complaint_time * (*i)->warn_interval_multiplier)) < now) {
+ // will warn
+ if (warning_vector.empty())
+ warning_vector.push_back("");
+ warned++;
+ if (warned > log_threshold)
+ break;
+
+ utime_t age = now - (*i)->get_arrived();
+ stringstream ss;
+ ss << "slow request " << age << " seconds old, received at " << (*i)->get_arrived()
+ << ": " << *((*i)->request) << " currently "
+ << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
+ warning_vector.push_back(ss.str());
+
+ // only those that have been shown will backoff
+ (*i)->warn_interval_multiplier *= 2;
+ }
+ ++i;
+ }
+
+ // only summarize if we warn about any. if everything has backed
+ // off, we will stay silent.
+ if (warned > 0) {
+ stringstream ss;
+ ss << slow << " slow requests, " << warned << " included below; oldest blocked for > "
+ << oldest_secs << " secs";
+ warning_vector[0] = ss.str();
+ }
+
+ return warning_vector.size();
+}
+
+void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+
+ h->clear();
+
+ utime_t now = ceph_clock_now(NULL);
+ unsigned bin = 30;
+ uint32_t lb = 1 << (bin-1); // lower bound for this bin
+ int count = 0;
+ for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
+ utime_t age = now - (*i)->get_arrived();
+ uint32_t ms = (long)(age * 1000.0);
+ if (ms >= lb) {
+ count++;
+ continue;
+ }
+ if (count)
+ h->set(bin, count);
+ while (lb > ms) {
+ bin--;
+ lb >>= 1;
+ }
+ count = 1;
+ }
+ if (count)
+ h->set(bin, count);
+}
+
+void OpTracker::mark_event(TrackedOp *op, const string &dest)
+{
+ utime_t now = ceph_clock_now(cct);
+ return _mark_event(op, dest, now);
+}
+
+void OpTracker::_mark_event(TrackedOp *op, const string &evt,
+ utime_t time)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ dout(5) << //"reqid: " << op->get_reqid() <<
+ ", seq: " << op->seq
+ << ", time: " << time << ", event: " << evt
+ << ", request: " << *op->request << dendl;
+}
+
+void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) {
+ op->mark_event("done");
+ tracker->unregister_inflight_op(op);
+ // Do not delete op, unregister_inflight_op took control
+}
+
+void TrackedOp::mark_event(const string &event)
+{
+ utime_t now = ceph_clock_now(g_ceph_context);
+ {
+ Mutex::Locker l(lock);
+ events.push_back(make_pair(now, event));
+ }
+ tracker->mark_event(this, event);
+ _event_marked();
+}
+
+void TrackedOp::dump(utime_t now, Formatter *f) const
+{
+ Message *m = request;
+ stringstream name;
+ m->print(name);
+ f->dump_string("description", name.str().c_str()); // this TrackedOp
+ f->dump_stream("received_at") << get_arrived();
+ f->dump_float("age", now - get_arrived());
+ f->dump_float("duration", get_duration());
+ {
+ f->open_array_section("type_data");
+ _dump(now, f);
+ f->close_section();
+ }
+}
diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h
index 753331df7f3..44e03905759 100644
--- a/src/common/TrackedOp.h
+++ b/src/common/TrackedOp.h
@@ -17,15 +17,163 @@
#include <stdint.h>
#include <include/utime.h>
#include "common/Mutex.h"
+#include "include/histogram.h"
#include "include/xlist.h"
#include "msg/Message.h"
#include <tr1/memory>
+class TrackedOp;
+typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
+
+class OpTracker;
+class OpHistory {
+ set<pair<utime_t, TrackedOpRef> > arrived;
+ set<pair<double, TrackedOpRef> > duration;
+ void cleanup(utime_t now);
+ bool shutdown;
+ OpTracker *tracker;
+ uint32_t history_size;
+ uint32_t history_duration;
+
+public:
+ OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_),
+ history_size(0), history_duration(0) {}
+ ~OpHistory() {
+ assert(arrived.empty());
+ assert(duration.empty());
+ }
+ void insert(utime_t now, TrackedOpRef op);
+ void dump_ops(utime_t now, Formatter *f);
+ void on_shutdown();
+ void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
+ history_size = new_size;
+ history_duration = new_duration;
+ }
+};
+
+class OpTracker {
+ class RemoveOnDelete {
+ OpTracker *tracker;
+ public:
+ RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
+ void operator()(TrackedOp *op);
+ };
+ friend class RemoveOnDelete;
+ friend class OpHistory;
+ uint64_t seq;
+ Mutex ops_in_flight_lock;
+ xlist<TrackedOp *> ops_in_flight;
+ OpHistory history;
+ float complaint_time;
+ int log_threshold;
+
+public:
+ CephContext *cct;
+ OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"),
+ history(this), complaint_time(0), log_threshold(0), cct(cct_) {}
+ void set_complaint_and_threshold(float time, int threshold) {
+ complaint_time = time;
+ log_threshold = threshold;
+ }
+ void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
+ history.set_size_and_duration(new_size, new_duration);
+ }
+ void dump_ops_in_flight(Formatter *f);
+ void dump_historic_ops(Formatter *f);
+ void register_inflight_op(xlist<TrackedOp*>::item *i);
+ void unregister_inflight_op(TrackedOp *i);
+
+ void get_age_ms_histogram(pow2_hist_t *h);
+
+ /**
+ * Look for Ops which are too old, and insert warning
+ * strings for each Op that is too old.
+ *
+ * @param warning_strings A vector<string> reference which is filled
+ * with a warning string for each old Op.
+ * @return True if there are any Ops to warn on, false otherwise.
+ */
+ bool check_ops_in_flight(std::vector<string> &warning_strings);
+ void mark_event(TrackedOp *op, const string &evt);
+ void _mark_event(TrackedOp *op, const string &evt, utime_t now);
+
+ void on_shutdown() {
+ Mutex::Locker l(ops_in_flight_lock);
+ history.on_shutdown();
+ }
+ ~OpTracker() {
+ assert(ops_in_flight.empty());
+ }
+
+ template <typename T>
+ typename T::Ref create_request(Message *ref)
+ {
+ typename T::Ref retval(new T(ref, this),
+ RemoveOnDelete(this));
+
+ _mark_event(retval.get(), "header_read", ref->get_recv_stamp());
+ _mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
+ _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
+ _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
+
+ retval->init_from_message();
+
+ return retval;
+ }
+};
+
class TrackedOp {
+private:
+ friend class OpHistory;
+ friend class OpTracker;
+ xlist<TrackedOp*>::item xitem;
+protected:
+ Message *request; /// the logical request we are tracking
+ OpTracker *tracker; /// the tracker we are associated with
+
+ list<pair<utime_t, string> > events; /// list of events and their times
+ Mutex lock; /// to protect the events list
+ string current; /// the current state the event is in
+ uint64_t seq; /// a unique value set by the OpTracker
+
+ uint32_t warn_interval_multiplier; // limits output of a given op warning
+
+ TrackedOp(Message *req, OpTracker *_tracker) :
+ xitem(this),
+ request(req),
+ tracker(_tracker),
+ lock("TrackedOp::lock"),
+ seq(0),
+ warn_interval_multiplier(1)
+ {
+ tracker->register_inflight_op(&xitem);
+ }
+
+ virtual void init_from_message() {}
+ /// output any type-specific data you want to get when dump() is called
+ virtual void _dump(utime_t now, Formatter *f) const {}
+ /// if you want something else to happen when events are marked, implement
+ virtual void _event_marked() {}
+
public:
- virtual void mark_event(const string &event) = 0;
- virtual ~TrackedOp() {}
+ virtual ~TrackedOp() { assert(request); request->put(); }
+
+ utime_t get_arrived() const {
+ return request->get_recv_stamp();
+ }
+ // This function maybe needs some work; assumes last event is completion time
+ double get_duration() const {
+ return events.size() ?
+ (events.rbegin()->first - get_arrived()) :
+ 0.0;
+ }
+ Message *get_req() const { return request; }
+
+ void mark_event(const string &event);
+ virtual const char *state_string() const {
+ return events.rbegin()->second.c_str();
+ }
+ void dump(utime_t now, Formatter *f) const;
};
-typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
#endif
diff --git a/src/common/bloom_filter.cc b/src/common/bloom_filter.cc
index f602b80149e..68875e925bf 100644
--- a/src/common/bloom_filter.cc
+++ b/src/common/bloom_filter.cc
@@ -6,26 +6,26 @@
void bloom_filter::encode(bufferlist& bl) const
{
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 2, bl);
::encode((uint64_t)salt_count_, bl);
- ::encode((uint64_t)table_size_, bl);
- ::encode((uint64_t)inserted_element_count_, bl);
+ ::encode((uint64_t)insert_count_, bl);
+ ::encode((uint64_t)target_element_count_, bl);
::encode((uint64_t)random_seed_, bl);
- bufferptr bp((const char*)bit_table_, raw_table_size_);
+ bufferptr bp((const char*)bit_table_, table_size_);
::encode(bp, bl);
ENCODE_FINISH(bl);
}
void bloom_filter::decode(bufferlist::iterator& p)
{
- DECODE_START(1, p);
+ DECODE_START(2, p);
uint64_t v;
::decode(v, p);
salt_count_ = v;
::decode(v, p);
- table_size_ = v;
+ insert_count_ = v;
::decode(v, p);
- inserted_element_count_ = v;
+ target_element_count_ = v;
::decode(v, p);
random_seed_ = v;
bufferlist t;
@@ -33,11 +33,14 @@ void bloom_filter::decode(bufferlist::iterator& p)
salt_.clear();
generate_unique_salt();
- raw_table_size_ = t.length();
- assert(raw_table_size_ == table_size_ / bits_per_char);
+ table_size_ = t.length();
delete bit_table_;
- bit_table_ = new cell_type[raw_table_size_];
- t.copy(0, raw_table_size_, (char *)bit_table_);
+ if (table_size_) {
+ bit_table_ = new cell_type[table_size_];
+ t.copy(0, table_size_, (char *)bit_table_);
+ } else {
+ bit_table_ = NULL;
+ }
DECODE_FINISH(p);
}
@@ -46,8 +49,8 @@ void bloom_filter::dump(Formatter *f) const
{
f->dump_unsigned("salt_count", salt_count_);
f->dump_unsigned("table_size", table_size_);
- f->dump_unsigned("raw_table_size", raw_table_size_);
- f->dump_unsigned("insert_count", inserted_element_count_);
+ f->dump_unsigned("insert_count", insert_count_);
+ f->dump_unsigned("target_element_count", target_element_count_);
f->dump_unsigned("random_seed", random_seed_);
f->open_array_section("salt_table");
@@ -56,7 +59,7 @@ void bloom_filter::dump(Formatter *f) const
f->close_section();
f->open_array_section("bit_table");
- for (unsigned i = 0; i < raw_table_size_; ++i)
+ for (unsigned i = 0; i < table_size_; ++i)
f->dump_unsigned("byte", (unsigned)bit_table_[i]);
f->close_section();
}
@@ -74,3 +77,61 @@ void bloom_filter::generate_test_instances(list<bloom_filter*>& ls)
ls.back()->insert("boof");
ls.back()->insert("boogggg");
}
+
+
+void compressible_bloom_filter::encode(bufferlist& bl) const
+{
+ ENCODE_START(2, 2, bl);
+ bloom_filter::encode(bl);
+
+ uint32_t s = size_list.size();
+ ::encode(s, bl);
+ for (vector<size_t>::const_iterator p = size_list.begin();
+ p != size_list.end(); ++p)
+ ::encode((uint64_t)*p, bl);
+
+ ENCODE_FINISH(bl);
+}
+
+void compressible_bloom_filter::decode(bufferlist::iterator& p)
+{
+ DECODE_START(2, p);
+ bloom_filter::decode(p);
+
+ uint32_t s;
+ ::decode(s, p);
+ size_list.resize(s);
+ for (unsigned i = 0; i < s; i++) {
+ uint64_t v;
+ ::decode(v, p);
+ size_list[i] = v;
+ }
+
+ DECODE_FINISH(p);
+}
+
+void compressible_bloom_filter::dump(Formatter *f) const
+{
+ bloom_filter::dump(f);
+
+ f->open_array_section("table_sizes");
+ for (vector<size_t>::const_iterator p = size_list.begin();
+ p != size_list.end(); ++p)
+ f->dump_unsigned("size", (uint64_t)*p);
+ f->close_section();
+}
+
+void compressible_bloom_filter::generate_test_instances(list<compressible_bloom_filter*>& ls)
+{
+ ls.push_back(new compressible_bloom_filter(10, .5, 1));
+ ls.push_back(new compressible_bloom_filter(10, .5, 1));
+ ls.back()->insert("foo");
+ ls.back()->insert("bar");
+ ls.push_back(new compressible_bloom_filter(50, .5, 1));
+ ls.back()->insert("foo");
+ ls.back()->insert("bar");
+ ls.back()->insert("baz");
+ ls.back()->insert("boof");
+ ls.back()->compress(20);
+ ls.back()->insert("boogggg");
+}
diff --git a/src/common/bloom_filter.hpp b/src/common/bloom_filter.hpp
index 6216c7fb34d..93787a89a60 100644
--- a/src/common/bloom_filter.hpp
+++ b/src/common/bloom_filter.hpp
@@ -53,14 +53,22 @@ protected:
typedef unsigned int bloom_type;
typedef unsigned char cell_type;
+ unsigned char* bit_table_; ///< pointer to bit map
+ std::vector<bloom_type> salt_; ///< vector of salts
+ std::size_t salt_count_; ///< number of salts
+ std::size_t table_size_; ///< bit table size in bytes
+ std::size_t insert_count_; ///< insertion count
+ std::size_t target_element_count_; ///< target number of unique insertions
+ std::size_t random_seed_; ///< random seed
+
public:
bloom_filter()
: bit_table_(0),
salt_count_(0),
table_size_(0),
- raw_table_size_(0),
- inserted_element_count_(0),
+ insert_count_(0),
+ target_element_count_(0),
random_seed_(0)
{}
@@ -68,7 +76,8 @@ public:
const double& false_positive_probability,
const std::size_t& random_seed)
: bit_table_(0),
- inserted_element_count_(0),
+ insert_count_(0),
+ target_element_count_(predicted_inserted_element_count),
random_seed_((random_seed) ? random_seed : 0xA5A5A5A5)
{
find_optimal_parameters(predicted_inserted_element_count, false_positive_probability,
@@ -76,12 +85,15 @@ public:
init();
}
- bloom_filter(const std::size_t& salt_count, std::size_t table_size,
- const std::size_t& random_seed)
+ bloom_filter(const std::size_t& salt_count,
+ std::size_t table_size,
+ const std::size_t& random_seed,
+ std::size_t target_element_count)
: bit_table_(0),
salt_count_(salt_count),
table_size_(table_size),
- inserted_element_count_(0),
+ insert_count_(0),
+ target_element_count_(target_element_count),
random_seed_((random_seed) ? random_seed : 0xA5A5A5A5)
{
init();
@@ -89,9 +101,12 @@ public:
void init() {
generate_unique_salt();
- raw_table_size_ = table_size_ / bits_per_char;
- bit_table_ = new cell_type[raw_table_size_];
- std::fill_n(bit_table_,raw_table_size_,0x00);
+ if (table_size_) {
+ bit_table_ = new cell_type[table_size_];
+ std::fill_n(bit_table_, table_size_, 0x00);
+ } else {
+ bit_table_ = NULL;
+ }
}
bloom_filter(const bloom_filter& filter)
@@ -104,12 +119,11 @@ public:
if (this != &filter) {
salt_count_ = filter.salt_count_;
table_size_ = filter.table_size_;
- raw_table_size_ = filter.raw_table_size_;
- inserted_element_count_ = filter.inserted_element_count_;
+ insert_count_ = filter.insert_count_;
random_seed_ = filter.random_seed_;
delete[] bit_table_;
- bit_table_ = new cell_type[raw_table_size_];
- std::copy(filter.bit_table_,filter.bit_table_ + raw_table_size_,bit_table_);
+ bit_table_ = new cell_type[table_size_];
+ std::copy(filter.bit_table_, filter.bit_table_ + table_size_, bit_table_);
salt_ = filter.salt_;
}
return *this;
@@ -127,8 +141,9 @@ public:
inline void clear()
{
- std::fill_n(bit_table_,raw_table_size_,0x00);
- inserted_element_count_ = 0;
+ if (bit_table_)
+ std::fill_n(bit_table_, table_size_, 0x00);
+ insert_count_ = 0;
}
/**
@@ -141,26 +156,28 @@ public:
* @param val integer value to insert
*/
inline void insert(uint32_t val) {
+ assert(bit_table_);
std::size_t bit_index = 0;
std::size_t bit = 0;
for (std::size_t i = 0; i < salt_.size(); ++i)
{
compute_indices(hash_ap(val,salt_[i]),bit_index,bit);
- bit_table_[bit_index / bits_per_char] |= bit_mask[bit];
+ bit_table_[bit_index >> 3] |= bit_mask[bit];
}
- ++inserted_element_count_;
+ ++insert_count_;
}
inline void insert(const unsigned char* key_begin, const std::size_t& length)
{
+ assert(bit_table_);
std::size_t bit_index = 0;
std::size_t bit = 0;
for (std::size_t i = 0; i < salt_.size(); ++i)
{
compute_indices(hash_ap(key_begin,length,salt_[i]),bit_index,bit);
- bit_table_[bit_index / bits_per_char] |= bit_mask[bit];
+ bit_table_[bit_index >> 3] |= bit_mask[bit];
}
- ++inserted_element_count_;
+ ++insert_count_;
}
template<typename T>
@@ -202,12 +219,14 @@ public:
*/
inline virtual bool contains(uint32_t val) const
{
+ if (!bit_table_)
+ return false;
std::size_t bit_index = 0;
std::size_t bit = 0;
for (std::size_t i = 0; i < salt_.size(); ++i)
{
compute_indices(hash_ap(val,salt_[i]),bit_index,bit);
- if ((bit_table_[bit_index / bits_per_char] & bit_mask[bit]) != bit_mask[bit])
+ if ((bit_table_[bit_index >> 3] & bit_mask[bit]) != bit_mask[bit])
{
return false;
}
@@ -217,12 +236,14 @@ public:
inline virtual bool contains(const unsigned char* key_begin, const std::size_t length) const
{
+ if (!bit_table_)
+ return false;
std::size_t bit_index = 0;
std::size_t bit = 0;
for (std::size_t i = 0; i < salt_.size(); ++i)
{
compute_indices(hash_ap(key_begin,length,salt_[i]),bit_index,bit);
- if ((bit_table_[bit_index / bits_per_char] & bit_mask[bit]) != bit_mask[bit])
+ if ((bit_table_[bit_index >> 3] & bit_mask[bit]) != bit_mask[bit])
{
return false;
}
@@ -278,12 +299,41 @@ public:
inline virtual std::size_t size() const
{
- return table_size_;
+ return table_size_ * bits_per_char;
}
inline std::size_t element_count() const
{
- return inserted_element_count_;
+ return insert_count_;
+ }
+
+ /*
+ * density of bits set. inconvenient units, but:
+ * .3 = ~50% target insertions
+ * .5 = 100% target insertions, "perfectly full"
+ * .75 = 200% target insertions
+ * 1.0 = all bits set... infinite insertions
+ */
+ inline double density() const
+ {
+ if (!bit_table_)
+ return 0.0;
+ size_t set = 0;
+ uint8_t *p = bit_table_;
+ size_t left = table_size_;
+ while (left-- > 0) {
+ uint8_t c = *p;
+ for (; c; ++set)
+ c &= c - 1;
+ ++p;
+ }
+ return (double)set / (double)(table_size_ << 3);
+ }
+
+ virtual inline double approx_unique_element_count() const {
+ // this is not a very good estimate; a better solution should have
+ // some asymptotic behavior as density() approaches 1.0.
+ return (double)target_element_count_ * 2.0 * density();
}
inline double effective_fpp() const
@@ -295,7 +345,7 @@ public:
the current number of inserted elements - not the user defined
predicated/expected number of inserted elements.
*/
- return std::pow(1.0 - std::exp(-1.0 * salt_.size() * inserted_element_count_ / size()), 1.0 * salt_.size());
+ return std::pow(1.0 - std::exp(-1.0 * salt_.size() * insert_count_ / size()), 1.0 * salt_.size());
}
inline bloom_filter& operator &= (const bloom_filter& filter)
@@ -306,7 +356,7 @@ public:
(table_size_ == filter.table_size_) &&
(random_seed_ == filter.random_seed_)
) {
- for (std::size_t i = 0; i < raw_table_size_; ++i) {
+ for (std::size_t i = 0; i < table_size_; ++i) {
bit_table_[i] &= filter.bit_table_[i];
}
}
@@ -321,7 +371,7 @@ public:
(table_size_ == filter.table_size_) &&
(random_seed_ == filter.random_seed_)
) {
- for (std::size_t i = 0; i < raw_table_size_; ++i) {
+ for (std::size_t i = 0; i < table_size_; ++i) {
bit_table_[i] |= filter.bit_table_[i];
}
}
@@ -336,7 +386,7 @@ public:
(table_size_ == filter.table_size_) &&
(random_seed_ == filter.random_seed_)
) {
- for (std::size_t i = 0; i < raw_table_size_; ++i) {
+ for (std::size_t i = 0; i < table_size_; ++i) {
bit_table_[i] ^= filter.bit_table_[i];
}
}
@@ -352,8 +402,8 @@ protected:
inline virtual void compute_indices(const bloom_type& hash, std::size_t& bit_index, std::size_t& bit) const
{
- bit_index = hash % table_size_;
- bit = bit_index % bits_per_char;
+ bit_index = hash % (table_size_ << 3);
+ bit = bit_index & 7;
}
void generate_unique_salt()
@@ -418,7 +468,8 @@ protected:
}
else
{
- std::copy(predef_salt,predef_salt + predef_salt_count,std::back_inserter(salt_));
+ std::copy(predef_salt,predef_salt + predef_salt_count,
+ std::back_inserter(salt_));
srand(static_cast<unsigned int>(random_seed_));
while (salt_.size() < salt_count_)
{
@@ -466,8 +517,8 @@ protected:
*salt_count = static_cast<std::size_t>(min_k);
size_t t = static_cast<std::size_t>(min_m);
- t += (((t % bits_per_char) != 0) ? (bits_per_char - (t % bits_per_char)) : 0);
- *table_size = t;
+ t += (((t & 7) != 0) ? (bits_per_char - (t & 7)) : 0);
+ *table_size = t >> 3;
}
inline bloom_type hash_ap(uint32_t val, bloom_type hash) const
@@ -507,14 +558,6 @@ protected:
return hash;
}
- std::vector<bloom_type> salt_;
- unsigned char* bit_table_;
- std::size_t salt_count_;
- std::size_t table_size_;
- std::size_t raw_table_size_;
- std::size_t inserted_element_count_;
- std::size_t random_seed_;
-
public:
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
@@ -549,53 +592,77 @@ class compressible_bloom_filter : public bloom_filter
{
public:
+ compressible_bloom_filter() : bloom_filter() {}
+
compressible_bloom_filter(const std::size_t& predicted_element_count,
const double& false_positive_probability,
const std::size_t& random_seed)
- : bloom_filter(predicted_element_count,false_positive_probability,random_seed)
+ : bloom_filter(predicted_element_count, false_positive_probability, random_seed)
+ {
+ size_list.push_back(table_size_);
+ }
+
+ compressible_bloom_filter(const std::size_t& salt_count,
+ std::size_t table_size,
+ const std::size_t& random_seed,
+ std::size_t target_count)
+ : bloom_filter(salt_count, table_size, random_seed, target_count)
{
size_list.push_back(table_size_);
}
inline virtual std::size_t size() const
{
- return size_list.back();
+ return size_list.back() * bits_per_char;
}
- inline bool compress(const double& percentage)
+ inline bool compress(const double& target_ratio)
{
- if ((0.0 >= percentage) || (percentage >= 100.0))
+ if (!bit_table_)
+ return false;
+
+ if ((0.0 >= target_ratio) || (target_ratio >= 1.0))
{
return false;
}
std::size_t original_table_size = size_list.back();
- std::size_t new_table_size = static_cast<std::size_t>((size_list.back() * (1.0 - (percentage / 100.0))));
- new_table_size -= (((new_table_size % bits_per_char) != 0) ? (new_table_size % bits_per_char) : 0);
+ std::size_t new_table_size = static_cast<std::size_t>(size_list.back() * target_ratio);
- if ((bits_per_char > new_table_size) || (new_table_size >= original_table_size))
+ if ((!new_table_size) || (new_table_size >= original_table_size))
{
return false;
}
- cell_type* tmp = new cell_type[new_table_size / bits_per_char];
- std::copy(bit_table_, bit_table_ + (new_table_size / bits_per_char), tmp);
- cell_type* itr = bit_table_ + (new_table_size / bits_per_char);
- cell_type* end = bit_table_ + (original_table_size / bits_per_char);
+ cell_type* tmp = new cell_type[new_table_size];
+ std::copy(bit_table_, bit_table_ + (new_table_size), tmp);
+ cell_type* itr = bit_table_ + (new_table_size);
+ cell_type* end = bit_table_ + (original_table_size);
cell_type* itr_tmp = tmp;
-
+ cell_type* itr_end = tmp + (new_table_size);
while (end != itr)
{
*(itr_tmp++) |= (*itr++);
+ if (itr_tmp == itr_end)
+ itr_tmp = tmp;
}
delete[] bit_table_;
bit_table_ = tmp;
size_list.push_back(new_table_size);
+ table_size_ = new_table_size;
return true;
}
+ virtual inline double approx_unique_element_count() const {
+ // this is not a very good estimate; a better solution should have
+ // some asymptotic behavior as density() approaches 1.0.
+ //
+ // the compress() correction is also bad; it tends to under-estimate.
+ return (double)target_element_count_ * 2.0 * density() * (double)size_list.back() / (double)size_list.front();
+ }
+
private:
inline virtual void compute_indices(const bloom_type& hash, std::size_t& bit_index, std::size_t& bit) const
@@ -603,13 +670,19 @@ private:
bit_index = hash;
for (std::size_t i = 0; i < size_list.size(); ++i)
{
- bit_index %= size_list[i];
+ bit_index %= size_list[i] << 3;
}
- bit = bit_index % bits_per_char;
+ bit = bit_index & 7;
}
std::vector<std::size_t> size_list;
+public:
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& bl);
+ void dump(Formatter *f) const;
+ static void generate_test_instances(std::list<compressible_bloom_filter*>& ls);
};
+WRITE_CLASS_ENCODER(compressible_bloom_filter)
#endif
diff --git a/src/include/Makefile.am b/src/include/Makefile.am
index 2d98e777f00..c8823ce523d 100644
--- a/src/include/Makefile.am
+++ b/src/include/Makefile.am
@@ -43,6 +43,7 @@ noinst_HEADERS += \
include/filepath.h \
include/frag.h \
include/hash.h \
+ include/histogram.h \
include/intarith.h \
include/interval_set.h \
include/int_types.h \
diff --git a/src/include/histogram.h b/src/include/histogram.h
new file mode 100644
index 00000000000..c817b1ec175
--- /dev/null
+++ b/src/include/histogram.h
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ * Copyright 2013 Inktank
+ */
+
+#ifndef HISTOGRAM_H_
+#define HISTOGRAM_H_
+
+/**
+ * power of 2 histogram
+ */
+struct pow2_hist_t { //
+ /**
+ * histogram
+ *
+ * bin size is 2^index
+ * value is count of elements that are <= the current bin but > the previous bin.
+ */
+ vector<int32_t> h;
+
+private:
+ /// expand to at least another's size
+ void _expand_to(unsigned s) {
+ if (s > h.size())
+ h.resize(s, 0);
+ }
+ /// drop useless trailing 0's
+ void _contract() {
+ unsigned p = h.size();
+ while (p > 0 && h[p-1] == 0)
+ --p;
+ h.resize(p);
+ }
+
+public:
+ void clear() {
+ h.clear();
+ }
+ void set(int bin, int32_t v) {
+ _expand_to(bin + 1);
+ h[bin] = v;
+ _contract();
+ }
+
+ void add(const pow2_hist_t& o) {
+ _expand_to(o.h.size());
+ for (unsigned p = 0; p < o.h.size(); ++p)
+ h[p] += o.h[p];
+ _contract();
+ }
+ void sub(const pow2_hist_t& o) {
+ _expand_to(o.h.size());
+ for (unsigned p = 0; p < o.h.size(); ++p)
+ h[p] -= o.h[p];
+ _contract();
+ }
+
+ int32_t upper_bound() const {
+ return 1 << h.size();
+ }
+
+ void dump(Formatter *f) const;
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &bl);
+ static void generate_test_instances(std::list<pow2_hist_t*>& o);
+};
+WRITE_CLASS_ENCODER(pow2_hist_t)
+
+#endif /* HISTOGRAM_H_ */
diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h
index 44015395e94..d29f47c1c43 100644
--- a/src/mon/PGMonitor.h
+++ b/src/mon/PGMonitor.h
@@ -28,6 +28,7 @@ using namespace std;
#include "PaxosService.h"
#include "include/types.h"
#include "include/utime.h"
+#include "include/histogram.h"
#include "msg/Messenger.h"
#include "common/config.h"
#include "mon/MonitorDBStore.h"
diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc
index 1ac224cdfe7..bb26c752f9b 100644
--- a/src/objclass/class_api.cc
+++ b/src/objclass/class_api.cc
@@ -177,7 +177,7 @@ int cls_read(cls_method_context_t hctx, int ofs, int len,
int cls_get_request_origin(cls_method_context_t hctx, entity_inst_t *origin)
{
ReplicatedPG::OpContext **pctx = static_cast<ReplicatedPG::OpContext **>(hctx);
- *origin = (*pctx)->op->request->get_orig_source_inst();
+ *origin = (*pctx)->op->get_req()->get_orig_source_inst();
return 0;
}
diff --git a/src/os/Makefile.am b/src/os/Makefile.am
index b7fef8dd209..4f12a6a3278 100644
--- a/src/os/Makefile.am
+++ b/src/os/Makefile.am
@@ -13,7 +13,8 @@ libos_la_SOURCES = \
os/WBThrottle.cc \
os/BtrfsFileStoreBackend.cc \
os/GenericFileStoreBackend.cc \
- os/ZFSFileStoreBackend.cc
+ os/ZFSFileStoreBackend.cc \
+ common/TrackedOp.cc
noinst_LTLIBRARIES += libos.la
noinst_HEADERS += \
diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am
index 9d3bc1d5e47..cae02015fce 100644
--- a/src/osd/Makefile.am
+++ b/src/osd/Makefile.am
@@ -16,6 +16,7 @@ libosd_la_SOURCES = \
osd/Watch.cc \
osd/ClassHandler.cc \
osd/OpRequest.cc \
+ common/TrackedOp.cc \
osd/SnapMapper.cc \
osd/osd_types.cc \
objclass/class_api.cc
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 8ce11bb558c..b2aa2ebbcd2 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -907,6 +907,10 @@ OSD::OSD(CephContext *cct_, int id, Messenger *internal_messenger, Messenger *ex
service(this)
{
monc->set_messenger(client_messenger);
+ op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
+ cct->_conf->osd_op_log_threshold);
+ op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
+ cct->_conf->osd_op_history_duration);
}
OSD::~OSD()
@@ -4539,7 +4543,7 @@ void OSD::do_waiters()
void OSD::dispatch_op(OpRequestRef op)
{
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
@@ -4665,7 +4669,7 @@ void OSD::_dispatch(Message *m)
default:
{
- OpRequestRef op = op_tracker.create_request(m);
+ OpRequestRef op = op_tracker.create_request<OpRequest>(m);
op->mark_event("waiting_for_osdmap");
// no map? starting up?
if (!osdmap) {
@@ -5711,9 +5715,9 @@ bool OSD::require_mon_peer(Message *m)
bool OSD::require_osd_peer(OpRequestRef op)
{
- if (!op->request->get_connection()->peer_is_osd()) {
- dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
- << " " << *op->request << dendl;
+ if (!op->get_req()->get_connection()->peer_is_osd()) {
+ dout(0) << "require_osd_peer received from non-osd " << op->get_req()->get_connection()->get_peer_addr()
+ << " " << *op->get_req() << dendl;
return false;
}
return true;
@@ -5725,7 +5729,7 @@ bool OSD::require_osd_peer(OpRequestRef op)
*/
bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
{
- Message *m = op->request;
+ Message *m = op->get_req();
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
assert(osd_lock.is_locked());
@@ -5837,7 +5841,7 @@ void OSD::split_pgs(
*/
void OSD::handle_pg_create(OpRequestRef op)
{
- MOSDPGCreate *m = (MOSDPGCreate*)op->request;
+ MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
assert(m->get_header().type == MSG_OSD_PG_CREATE);
dout(10) << "handle_pg_create " << *m << dendl;
@@ -5857,11 +5861,16 @@ void OSD::handle_pg_create(OpRequestRef op)
}
}
- if (!require_mon_peer(op->request)) {
- // we have to hack around require_mon_peer's interface limits
- op->request = NULL;
+ /* we have to hack around require_mon_peer's interface limits, so
+ * grab an extra reference before going in. If the peer isn't
+ * a Monitor, the reference is put for us (and then cleared
+ * up automatically by our OpTracker infrastructure). Otherwise,
+ * we put the extra ref ourself.
+ */
+ if (!require_mon_peer(op->get_req()->get())) {
return;
}
+ op->get_req()->put();
if (!require_same_or_newer_map(op, m->epoch)) return;
@@ -6166,7 +6175,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
*/
void OSD::handle_pg_notify(OpRequestRef op)
{
- MOSDPGNotify *m = (MOSDPGNotify*)op->request;
+ MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
@@ -6201,7 +6210,7 @@ void OSD::handle_pg_notify(OpRequestRef op)
void OSD::handle_pg_log(OpRequestRef op)
{
- MOSDPGLog *m = (MOSDPGLog*) op->request;
+ MOSDPGLog *m = (MOSDPGLog*) op->get_req();
assert(m->get_header().type == MSG_OSD_PG_LOG);
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
@@ -6229,7 +6238,7 @@ void OSD::handle_pg_log(OpRequestRef op)
void OSD::handle_pg_info(OpRequestRef op)
{
- MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->request);
+ MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_INFO);
dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
@@ -6262,7 +6271,7 @@ void OSD::handle_pg_info(OpRequestRef op)
void OSD::handle_pg_trim(OpRequestRef op)
{
- MOSDPGTrim *m = (MOSDPGTrim *)op->request;
+ MOSDPGTrim *m = (MOSDPGTrim *)op->get_req();
assert(m->get_header().type == MSG_OSD_PG_TRIM);
dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
@@ -6315,7 +6324,7 @@ void OSD::handle_pg_trim(OpRequestRef op)
void OSD::handle_pg_scan(OpRequestRef op)
{
- MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
+ MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_SCAN);
dout(10) << "handle_pg_scan " << *m << " from " << m->get_source() << dendl;
@@ -6343,7 +6352,7 @@ void OSD::handle_pg_scan(OpRequestRef op)
void OSD::handle_pg_backfill(OpRequestRef op)
{
- MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
+ MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl;
@@ -6371,7 +6380,7 @@ void OSD::handle_pg_backfill(OpRequestRef op)
void OSD::handle_pg_backfill_reserve(OpRequestRef op)
{
- MBackfillReserve *m = static_cast<MBackfillReserve*>(op->request);
+ MBackfillReserve *m = static_cast<MBackfillReserve*>(op->get_req());
assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE);
if (!require_osd_peer(op))
@@ -6415,7 +6424,7 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
void OSD::handle_pg_recovery_reserve(OpRequestRef op)
{
- MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->request);
+ MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->get_req());
assert(m->get_header().type == MSG_OSD_RECOVERY_RESERVE);
if (!require_osd_peer(op))
@@ -6467,7 +6476,7 @@ void OSD::handle_pg_query(OpRequestRef op)
{
assert(osd_lock.is_locked());
- MOSDPGQuery *m = (MOSDPGQuery*)op->request;
+ MOSDPGQuery *m = (MOSDPGQuery*)op->get_req();
assert(m->get_header().type == MSG_OSD_PG_QUERY);
if (!require_osd_peer(op))
@@ -6554,7 +6563,7 @@ void OSD::handle_pg_query(OpRequestRef op)
void OSD::handle_pg_remove(OpRequestRef op)
{
- MOSDPGRemove *m = (MOSDPGRemove *)op->request;
+ MOSDPGRemove *m = (MOSDPGRemove *)op->get_req();
assert(m->get_header().type == MSG_OSD_PG_REMOVE);
assert(osd_lock.is_locked());
@@ -6827,7 +6836,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err)
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
version_t uv)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
int flags;
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
@@ -6839,7 +6848,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (m->get_map_epoch() < pg->info.history.same_primary_since) {
@@ -6858,7 +6867,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
void OSD::handle_op(OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (op_is_discardable(m)) {
dout(10) << " discardable " << *m << dendl;
@@ -6993,7 +7002,7 @@ void OSD::handle_op(OpRequestRef op)
template<typename T, int MSGTYPE>
void OSD::handle_replica_op(OpRequestRef op)
{
- T *m = static_cast<T *>(op->request);
+ T *m = static_cast<T *>(op->get_req());
assert(m->get_header().type == MSGTYPE);
dout(10) << __func__ << *m << " epoch " << m->map_epoch << dendl;
@@ -7047,24 +7056,24 @@ bool OSD::op_is_discardable(MOSDOp *op)
*/
void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
- utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
- dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority()
- << " cost " << op->request->get_cost()
+ utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
+ dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
+ << " cost " << op->get_req()->get_cost()
<< " latency " << latency
- << " " << *(op->request) << dendl;
+ << " " << *(op->get_req()) << dendl;
pg->queue_op(op);
}
void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
{
- unsigned priority = item.second->request->get_priority();
- unsigned cost = item.second->request->get_cost();
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
pqueue.enqueue_strict(
- item.second->request->get_source_inst(),
+ item.second->get_req()->get_source_inst(),
priority, item);
else
- pqueue.enqueue(item.second->request->get_source_inst(),
+ pqueue.enqueue(item.second->get_req()->get_source_inst(),
priority, cost, item);
osd->logger->set(l_osd_opq, pqueue.length());
}
@@ -7079,14 +7088,14 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
pg_for_processing[&*(item.first)].pop_back();
}
}
- unsigned priority = item.second->request->get_priority();
- unsigned cost = item.second->request->get_cost();
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
pqueue.enqueue_strict_front(
- item.second->request->get_source_inst(),
+ item.second->get_req()->get_source_inst(),
priority, item);
else
- pqueue.enqueue_front(item.second->request->get_source_inst(),
+ pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
priority, cost, item);
osd->logger->set(l_osd_opq, pqueue.length());
}
@@ -7138,11 +7147,11 @@ void OSD::dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle)
{
- utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
- dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
- << " cost " << op->request->get_cost()
+ utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
+ dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
+ << " cost " << op->get_req()->get_cost()
<< " latency " << latency
- << " " << *(op->request)
+ << " " << *(op->get_req())
<< " pg " << *pg << dendl;
if (pg->deleting)
return;
@@ -7243,6 +7252,8 @@ const char** OSD::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"osd_max_backfills",
+ "osd_op_complaint_time", "osd_op_log_threshold",
+ "osd_op_history_size", "osd_op_history_duration",
NULL
};
return KEYS;
@@ -7255,13 +7266,23 @@ void OSD::handle_conf_change(const struct md_config_t *conf,
service.local_reserver.set_max(cct->_conf->osd_max_backfills);
service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
}
+ if (changed.count("osd_op_complaint_time") ||
+ changed.count("osd_op_log_threshold")) {
+ op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
+ cct->_conf->osd_op_log_threshold);
+ }
+ if (changed.count("osd_op_history_size") ||
+ changed.count("osd_op_history_duration")) {
+ op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
+ cct->_conf->osd_op_history_duration);
+ }
}
// --------------------------------
int OSD::init_op_flags(OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
vector<OSDOp>::iterator iter;
// client flags have no bearing on whether an op is a read, write, etc.
diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc
index 1ffe3073051..2ed7a23086f 100644
--- a/src/osd/OpRequest.cc
+++ b/src/osd/OpRequest.cc
@@ -11,229 +11,21 @@
#include "messages/MOSDSubOp.h"
#include "include/assert.h"
-#define dout_subsys ceph_subsys_optracker
-#undef dout_prefix
-#define dout_prefix _prefix(_dout)
-static ostream& _prefix(std::ostream* _dout)
-{
- return *_dout << "--OSD::tracker-- ";
-}
OpRequest::OpRequest(Message *req, OpTracker *tracker) :
- request(req), xitem(this),
+ TrackedOp(req, tracker),
rmw_flags(0),
- warn_interval_multiplier(1),
- lock("OpRequest::lock"),
- tracker(tracker),
- hit_flag_points(0), latest_flag_point(0),
- seq(0) {
- received_time = request->get_recv_stamp();
- tracker->register_inflight_op(&xitem);
+ hit_flag_points(0), latest_flag_point(0) {
if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
// don't warn as quickly for low priority ops
warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
}
}
-void OpHistory::on_shutdown()
-{
- arrived.clear();
- duration.clear();
- shutdown = true;
-}
-
-void OpHistory::insert(utime_t now, OpRequestRef op)
-{
- if (shutdown)
- return;
- duration.insert(make_pair(op->get_duration(), op));
- arrived.insert(make_pair(op->get_arrived(), op));
- cleanup(now);
-}
-
-void OpHistory::cleanup(utime_t now)
-{
- while (arrived.size() &&
- (now - arrived.begin()->first >
- (double)(tracker->cct->_conf->osd_op_history_duration))) {
- duration.erase(make_pair(
- arrived.begin()->second->get_duration(),
- arrived.begin()->second));
- arrived.erase(arrived.begin());
- }
-
- while (duration.size() > tracker->cct->_conf->osd_op_history_size) {
- arrived.erase(make_pair(
- duration.begin()->second->get_arrived(),
- duration.begin()->second));
- duration.erase(duration.begin());
- }
-}
-
-void OpHistory::dump_ops(utime_t now, Formatter *f)
-{
- cleanup(now);
- f->open_object_section("OpHistory");
- f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size);
- f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration);
- {
- f->open_array_section("Ops");
- for (set<pair<utime_t, OpRequestRef> >::const_iterator i =
- arrived.begin();
- i != arrived.end();
- ++i) {
- f->open_object_section("Op");
- i->second->dump(now, f);
- f->close_section();
- }
- f->close_section();
- }
- f->close_section();
-}
-
-void OpTracker::dump_historic_ops(Formatter *f)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- utime_t now = ceph_clock_now(cct);
- history.dump_ops(now, f);
-}
-
-void OpTracker::dump_ops_in_flight(Formatter *f)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- f->open_object_section("ops_in_flight"); // overall dump
- f->dump_int("num_ops", ops_in_flight.size());
- f->open_array_section("ops"); // list of OpRequests
- utime_t now = ceph_clock_now(cct);
- for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
- f->open_object_section("op");
- (*p)->dump(now, f);
- f->close_section(); // this OpRequest
- }
- f->close_section(); // list of OpRequests
- f->close_section(); // overall dump
-}
-
-void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- ops_in_flight.push_back(i);
- ops_in_flight.back()->seq = seq++;
-}
-
-void OpTracker::unregister_inflight_op(OpRequest *i)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- assert(i->xitem.get_list() == &ops_in_flight);
- utime_t now = ceph_clock_now(cct);
- i->xitem.remove_myself();
- i->request->clear_data();
- history.insert(now, OpRequestRef(i));
-}
-
-bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- if (!ops_in_flight.size())
- return false;
-
- utime_t now = ceph_clock_now(cct);
- utime_t too_old = now;
- too_old -= cct->_conf->osd_op_complaint_time;
-
- utime_t oldest_secs = now - ops_in_flight.front()->received_time;
-
- dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
- << "; oldest is " << oldest_secs
- << " seconds old" << dendl;
-
- if (oldest_secs < cct->_conf->osd_op_complaint_time)
- return false;
-
- xlist<OpRequest*>::iterator i = ops_in_flight.begin();
- warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1);
-
- int slow = 0; // total slow
- int warned = 0; // total logged
- while (!i.end() && (*i)->received_time < too_old) {
- slow++;
-
- // exponential backoff of warning intervals
- if (((*i)->received_time +
- (cct->_conf->osd_op_complaint_time *
- (*i)->warn_interval_multiplier)) < now) {
- // will warn
- if (warning_vector.empty())
- warning_vector.push_back("");
- warned++;
- if (warned > cct->_conf->osd_op_log_threshold)
- break;
-
- utime_t age = now - (*i)->received_time;
- stringstream ss;
- ss << "slow request " << age << " seconds old, received at " << (*i)->received_time
- << ": " << *((*i)->request) << " currently "
- << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
- warning_vector.push_back(ss.str());
-
- // only those that have been shown will backoff
- (*i)->warn_interval_multiplier *= 2;
- }
- ++i;
- }
-
- // only summarize if we warn about any. if everything has backed
- // off, we will stay silent.
- if (warned > 0) {
- stringstream ss;
- ss << slow << " slow requests, " << warned << " included below; oldest blocked for > "
- << oldest_secs << " secs";
- warning_vector[0] = ss.str();
- }
-
- return warning_vector.size();
-}
-
-void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
-{
- Mutex::Locker locker(ops_in_flight_lock);
-
- h->clear();
-
- utime_t now = ceph_clock_now(NULL);
- unsigned bin = 30;
- uint32_t lb = 1 << (bin-1); // lower bound for this bin
- int count = 0;
- for (xlist<OpRequest*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
- utime_t age = now - (*i)->received_time;
- uint32_t ms = (long)(age * 1000.0);
- if (ms >= lb) {
- count++;
- continue;
- }
- if (count)
- h->set(bin, count);
- while (lb > ms) {
- bin--;
- lb >>= 1;
- }
- count = 1;
- }
- if (count)
- h->set(bin, count);
-}
-
-void OpRequest::dump(utime_t now, Formatter *f) const
+void OpRequest::_dump(utime_t now, Formatter *f) const
{
Message *m = request;
- stringstream name;
- m->print(name);
- f->dump_string("description", name.str().c_str()); // this OpRequest
- f->dump_unsigned("rmw_flags", rmw_flags);
- f->dump_stream("received_at") << received_time;
- f->dump_float("age", now - received_time);
- f->dump_float("duration", get_duration());
f->dump_string("flag_point", state_string());
if (m->get_orig_source().is_client()) {
f->open_object_section("client_info");
@@ -257,50 +49,11 @@ void OpRequest::dump(utime_t now, Formatter *f) const
}
}
-void OpTracker::mark_event(OpRequest *op, const string &dest)
-{
- utime_t now = ceph_clock_now(cct);
- return _mark_event(op, dest, now);
-}
-
-void OpTracker::_mark_event(OpRequest *op, const string &evt,
- utime_t time)
-{
- Mutex::Locker locker(ops_in_flight_lock);
- dout(5) << "reqid: " << op->get_reqid() << ", seq: " << op->seq
- << ", time: " << time << ", event: " << evt
- << ", request: " << *op->request << dendl;
-}
-
-void OpTracker::RemoveOnDelete::operator()(OpRequest *op) {
- op->mark_event("done");
- tracker->unregister_inflight_op(op);
- // Do not delete op, unregister_inflight_op took control
-}
-
-OpRequestRef OpTracker::create_request(Message *ref)
-{
- OpRequestRef retval(new OpRequest(ref, this),
- RemoveOnDelete(this));
-
- if (ref->get_type() == CEPH_MSG_OSD_OP) {
- retval->reqid = static_cast<MOSDOp*>(ref)->get_reqid();
- } else if (ref->get_type() == MSG_OSD_SUBOP) {
- retval->reqid = static_cast<MOSDSubOp*>(ref)->reqid;
- }
- _mark_event(retval.get(), "header_read", ref->get_recv_stamp());
- _mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
- _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
- _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
- return retval;
-}
-
-void OpRequest::mark_event(const string &event)
+void OpRequest::init_from_message()
{
- utime_t now = ceph_clock_now(tracker->cct);
- {
- Mutex::Locker l(lock);
- events.push_back(make_pair(now, event));
+ if (request->get_type() == CEPH_MSG_OSD_OP) {
+ reqid = static_cast<MOSDOp*>(request)->get_reqid();
+ } else if (request->get_type() == MSG_OSD_SUBOP) {
+ reqid = static_cast<MOSDSubOp*>(request)->reqid;
}
- tracker->mark_event(this, event);
}
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index 9634be87846..87571f58787 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -25,87 +25,12 @@
#include "common/TrackedOp.h"
#include "osd/osd_types.h"
-struct OpRequest;
-class OpTracker;
-typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
-class OpHistory {
- set<pair<utime_t, OpRequestRef> > arrived;
- set<pair<double, OpRequestRef> > duration;
- void cleanup(utime_t now);
- bool shutdown;
- OpTracker *tracker;
-
-public:
- OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {}
- ~OpHistory() {
- assert(arrived.empty());
- assert(duration.empty());
- }
- void insert(utime_t now, OpRequestRef op);
- void dump_ops(utime_t now, Formatter *f);
- void on_shutdown();
-};
-
-class OpTracker {
- class RemoveOnDelete {
- OpTracker *tracker;
- public:
- RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
- void operator()(OpRequest *op);
- };
- friend class RemoveOnDelete;
- friend class OpRequest;
- friend class OpHistory;
- uint64_t seq;
- Mutex ops_in_flight_lock;
- xlist<OpRequest *> ops_in_flight;
- OpHistory history;
-
-protected:
- CephContext *cct;
-
-public:
- OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {}
- void dump_ops_in_flight(Formatter *f);
- void dump_historic_ops(Formatter *f);
- void register_inflight_op(xlist<OpRequest*>::item *i);
- void unregister_inflight_op(OpRequest *i);
-
- void get_age_ms_histogram(pow2_hist_t *h);
-
- /**
- * Look for Ops which are too old, and insert warning
- * strings for each Op that is too old.
- *
- * @param warning_strings A vector<string> reference which is filled
- * with a warning string for each old Op.
- * @return True if there are any Ops to warn on, false otherwise.
- */
- bool check_ops_in_flight(std::vector<string> &warning_strings);
- void mark_event(OpRequest *op, const string &evt);
- void _mark_event(OpRequest *op, const string &evt, utime_t now);
- OpRequestRef create_request(Message *req);
- void on_shutdown() {
- Mutex::Locker l(ops_in_flight_lock);
- history.on_shutdown();
- }
- ~OpTracker() {
- assert(ops_in_flight.empty());
- }
-};
-
/**
* The OpRequest takes in a Message* and takes over a single reference
* to it, which it puts() when destroyed.
- * OpRequest is itself ref-counted. The expectation is that you get a Message
- * you want to track, create an OpRequest with it, and then pass around that OpRequest
- * the way you used to pass around the Message.
*/
struct OpRequest : public TrackedOp {
friend class OpTracker;
- friend class OpHistory;
- Message *request;
- xlist<OpRequest*>::item xitem;
// rmw flags
int rmw_flags;
@@ -134,28 +59,12 @@ struct OpRequest : public TrackedOp {
void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
- utime_t received_time;
- uint32_t warn_interval_multiplier;
- utime_t get_arrived() const {
- return received_time;
- }
- double get_duration() const {
- return events.size() ?
- (events.rbegin()->first - received_time) :
- 0.0;
- }
-
- void dump(utime_t now, Formatter *f) const;
+ void _dump(utime_t now, Formatter *f) const;
private:
- list<pair<utime_t, string> > events;
- string current;
- Mutex lock;
- OpTracker *tracker;
osd_reqid_t reqid;
uint8_t hit_flag_points;
uint8_t latest_flag_point;
- uint64_t seq;
static const uint8_t flag_queued_for_pg=1 << 0;
static const uint8_t flag_reached_pg = 1 << 1;
static const uint8_t flag_delayed = 1 << 2;
@@ -164,12 +73,8 @@ private:
static const uint8_t flag_commit_sent = 1 << 5;
OpRequest(Message *req, OpTracker *tracker);
-public:
- ~OpRequest() {
- assert(request);
- request->put();
- }
+public:
bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
bool been_reached_pg() { return hit_flag_points & flag_reached_pg; }
bool been_delayed() { return hit_flag_points & flag_delayed; }
@@ -233,10 +138,15 @@ public:
latest_flag_point = flag_commit_sent;
}
- void mark_event(const string &event);
osd_reqid_t get_reqid() const {
return reqid;
}
+
+ void init_from_message();
+
+ typedef std::tr1::shared_ptr<OpRequest> Ref;
};
+typedef OpRequest::Ref OpRequestRef;
+
#endif /* OPREQUEST_H_ */
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 1d9ed5f6a31..8f7d3ccb684 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1332,10 +1332,10 @@ void PG::do_pending_flush()
bool PG::op_has_sufficient_caps(OpRequestRef op)
{
// only check MOSDOp
- if (op->request->get_type() != CEPH_MSG_OSD_OP)
+ if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
return true;
- MOSDOp *req = static_cast<MOSDOp*>(op->request);
+ MOSDOp *req = static_cast<MOSDOp*>(op->get_req());
OSD::Session *session = (OSD::Session *)req->get_connection()->get_priv();
if (!session) {
@@ -1417,7 +1417,7 @@ void PG::replay_queued_ops()
c = p->first;
}
dout(10) << "activate replay " << p->first << " "
- << *p->second->request << dendl;
+ << *p->second->get_req() << dendl;
replay.push_back(p->second);
}
replay_queue.clear();
@@ -2618,7 +2618,7 @@ void PG::unreg_next_scrub()
void PG::sub_op_scrub_map(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_map" << dendl;
@@ -2804,7 +2804,7 @@ void PG::_request_scrub_map(int replica, eversion_t version,
void PG::sub_op_scrub_reserve(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_reserve" << dendl;
@@ -2824,7 +2824,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op)
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
{
- MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
+ MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(7) << "sub_op_scrub_reserve_reply" << dendl;
@@ -2857,7 +2857,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
void PG::sub_op_scrub_unreserve(OpRequestRef op)
{
- assert(op->request->get_header().type == MSG_OSD_SUBOP);
+ assert(op->get_req()->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_unreserve" << dendl;
op->mark_started();
@@ -2869,7 +2869,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op)
{
op->mark_started();
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_stop" << dendl;
@@ -4732,7 +4732,7 @@ ostream& operator<<(ostream& out, const PG& pg)
bool PG::can_discard_op(OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
if (OSD::op_is_discardable(m)) {
dout(20) << " discard " << *m << dendl;
return true;
@@ -4760,7 +4760,7 @@ bool PG::can_discard_op(OpRequestRef op)
template<typename T, int MSGTYPE>
bool PG::can_discard_replica_op(OpRequestRef op)
{
- T *m = static_cast<T *>(op->request);
+ T *m = static_cast<T *>(op->get_req());
assert(m->get_header().type == MSGTYPE);
// same pg?
@@ -4776,7 +4776,7 @@ bool PG::can_discard_replica_op(OpRequestRef op)
bool PG::can_discard_scan(OpRequestRef op)
{
- MOSDPGScan *m = static_cast<MOSDPGScan *>(op->request);
+ MOSDPGScan *m = static_cast<MOSDPGScan *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_SCAN);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
@@ -4788,7 +4788,7 @@ bool PG::can_discard_scan(OpRequestRef op)
bool PG::can_discard_backfill(OpRequestRef op)
{
- MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->request);
+ MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
@@ -4802,7 +4802,7 @@ bool PG::can_discard_backfill(OpRequestRef op)
bool PG::can_discard_request(OpRequestRef op)
{
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
return can_discard_op(op);
case MSG_OSD_SUBOP:
@@ -4827,55 +4827,55 @@ bool PG::can_discard_request(OpRequestRef op)
bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
{
unsigned mask = ~((~0)<<bits);
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
- return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
+ return (static_cast<MOSDOp*>(op->get_req())->get_pg().m_seed & mask) == match;
}
return false;
}
bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op)
{
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDOp*>(op->request)->get_map_epoch());
+ static_cast<MOSDOp*>(op->get_req())->get_map_epoch());
case MSG_OSD_SUBOP:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDSubOp*>(op->request)->map_epoch);
+ static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
case MSG_OSD_SUBOPREPLY:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDSubOpReply*>(op->request)->map_epoch);
+ static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
case MSG_OSD_PG_SCAN:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDPGScan*>(op->request)->map_epoch);
+ static_cast<MOSDPGScan*>(op->get_req())->map_epoch);
case MSG_OSD_PG_BACKFILL:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
+ static_cast<MOSDPGBackfill*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDPGPush*>(op->request)->map_epoch);
+ static_cast<MOSDPGPush*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PULL:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDPGPull*>(op->request)->map_epoch);
+ static_cast<MOSDPGPull*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH_REPLY:
return !have_same_or_newer_map(
curmap,
- static_cast<MOSDPGPushReply*>(op->request)->map_epoch);
+ static_cast<MOSDPGPushReply*>(op->get_req())->map_epoch);
}
assert(0);
return false;
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index ddc39d70372..9529e15ae77 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -96,7 +96,7 @@ bool ReplicatedBackend::handle_message(
)
{
dout(10) << __func__ << ": " << op << dendl;
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case MSG_OSD_PG_PUSH:
// TODOXXX: needs to be active possibly
do_push(op);
@@ -111,7 +111,7 @@ bool ReplicatedBackend::handle_message(
return true;
case MSG_OSD_SUBOP: {
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
if (m->ops.size() >= 1) {
OSDOp *first = &m->ops[0];
switch (first->op.op) {
@@ -130,7 +130,7 @@ bool ReplicatedBackend::handle_message(
}
case MSG_OSD_SUBOPREPLY: {
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
if (r->ops.size() >= 1) {
OSDOp &first = r->ops[0];
switch (first.op.op) {
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 6c8b092ca01..f466eb8ccdc 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -86,9 +86,9 @@ static void log_subop_stats(
{
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
- latency -= op->request->get_recv_stamp();
+ latency -= op->get_req()->get_recv_stamp();
- uint64_t inb = op->request->get_data().length();
+ uint64_t inb = op->get_req()->get_data().length();
osd->logger->inc(l_osd_sop);
@@ -583,7 +583,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
void ReplicatedPG::do_pg_op(OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp *>(op->request);
+ MOSDOp *m = static_cast<MOSDOp *>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
dout(10) << "do_pg_op " << *m << dendl;
@@ -828,7 +828,7 @@ void ReplicatedPG::do_request(
if (pgbackend->handle_message(op))
return;
- switch (op->request->get_type()) {
+ switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
if (is_replay() || !is_active()) {
dout(20) << " replay, waiting for active on " << op << dendl;
@@ -866,7 +866,7 @@ void ReplicatedPG::do_request(
*/
void ReplicatedPG::do_op(OpRequestRef op)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (op->includes_pg_op()) {
if (pg_op_must_wait(m)) {
@@ -1172,7 +1172,7 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
{
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
get_osdmap()->get_epoch(), flags);
@@ -1188,7 +1188,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
{
dout(10) << __func__ << " " << ctx << dendl;
OpRequestRef op = ctx->op;
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
ObjectContextRef obc = ctx->obc;
const hobject_t& soid = obc->obs.oi.soid;
map<hobject_t,ObjectContextRef>& src_obc = ctx->src_obc;
@@ -1412,16 +1412,16 @@ void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
void ReplicatedPG::log_op_stats(OpContext *ctx)
{
OpRequestRef op = ctx->op;
- MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
utime_t now = ceph_clock_now(cct);
utime_t latency = now;
- latency -= ctx->op->request->get_recv_stamp();
+ latency -= ctx->op->get_req()->get_recv_stamp();
utime_t rlatency;
if (ctx->readable_stamp != utime_t()) {
rlatency = ctx->readable_stamp;
- rlatency -= ctx->op->request->get_recv_stamp();
+ rlatency -= ctx->op->get_req()->get_recv_stamp();
}
uint64_t inb = ctx->bytes_written;
@@ -1460,10 +1460,10 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
void ReplicatedPG::do_sub_op(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(have_same_or_newer_map(m->map_epoch));
assert(m->get_header().type == MSG_OSD_SUBOP);
- dout(15) << "do_sub_op " << *op->request << dendl;
+ dout(15) << "do_sub_op " << *op->get_req() << dendl;
OSDOp *first = NULL;
if (m->ops.size() >= 1) {
@@ -1501,7 +1501,7 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
{
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->request);
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->get_req());
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
@@ -1519,7 +1519,7 @@ void ReplicatedPG::do_scan(
OpRequestRef op,
ThreadPool::TPHandle &handle)
{
- MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
+ MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_SCAN);
dout(10) << "do_scan " << *m << dendl;
@@ -1594,7 +1594,7 @@ void ReplicatedPG::do_scan(
void ReplicatedBackend::_do_push(OpRequestRef op)
{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH);
int from = m->get_source().num();
@@ -1646,7 +1646,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH);
int from = m->get_source().num();
@@ -1691,7 +1691,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
void ReplicatedBackend::do_pull(OpRequestRef op)
{
- MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
+ MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PULL);
int from = m->get_source().num();
@@ -1707,7 +1707,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op)
void ReplicatedBackend::do_push_reply(OpRequestRef op)
{
- MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
+ MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
int from = m->get_source().num();
@@ -1728,7 +1728,7 @@ void ReplicatedBackend::do_push_reply(OpRequestRef op)
void ReplicatedPG::do_backfill(OpRequestRef op)
{
- MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
+ MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
dout(10) << "do_backfill " << *m << dendl;
@@ -2392,7 +2392,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ObjectContextRef src_obc;
if (ceph_osd_op_type_multi(op.op)) {
- MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request);
+ MOSDOp *m = static_cast<MOSDOp *>(ctx->op->get_req());
object_locator_t src_oloc;
get_src_oloc(soid.oid, m->get_object_locator(), src_oloc);
hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash,
@@ -3190,10 +3190,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl;
dout(10) << "watch: peer_addr="
- << ctx->op->request->get_connection()->get_peer_addr() << dendl;
+ << ctx->op->get_req()->get_connection()->get_peer_addr() << dendl;
watch_info_t w(cookie, cct->_conf->osd_client_watch_timeout,
- ctx->op->request->get_connection()->get_peer_addr());
+ ctx->op->get_req()->get_connection()->get_peer_addr());
if (do_watch) {
if (oi.watchers.count(make_pair(cookie, entity))) {
dout(10) << " found existing watch " << w << " by " << entity << dendl;
@@ -4038,7 +4038,7 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum
void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
{
- ConnectionRef conn(ctx->op->request->get_connection());
+ ConnectionRef conn(ctx->op->get_req()->get_connection());
boost::intrusive_ptr<OSD::Session> session(
(OSD::Session *)conn->get_priv());
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
@@ -4697,7 +4697,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
{
MOSDOp *m = NULL;
if (repop->ctx->op)
- m = static_cast<MOSDOp *>(repop->ctx->op->request);
+ m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
if (m)
dout(10) << "eval_repop " << *repop
@@ -4773,7 +4773,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
i != waiting_for_ack[repop->v].end();
++i) {
- MOSDOp *m = (MOSDOp*)(*i)->request;
+ MOSDOp *m = (MOSDOp*)(*i)->get_req();
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->set_reply_versions(repop->ctx->at_version,
repop->ctx->user_at_version);
@@ -4869,7 +4869,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
get_osdmap()->get_epoch(),
repop->rep_tid, repop->ctx->at_version);
if (ctx->op &&
- ((static_cast<MOSDOp *>(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
+ ((static_cast<MOSDOp *>(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
// replicate original op for parallel execution on replica
assert(0 == "broken implementation, do not use");
}
@@ -4910,7 +4910,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe
tid_t rep_tid)
{
if (ctx->op)
- dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl;
+ dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl;
else
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
@@ -4941,7 +4941,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
MOSDOp *m = NULL;
if (repop->ctx->op)
- m = static_cast<MOSDOp *>(repop->ctx->op->request);
+ m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
if (m)
dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
@@ -5487,7 +5487,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
void ReplicatedPG::sub_op_modify(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
const hobject_t& soid = m->poid;
@@ -5606,8 +5606,8 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
rm->applied = true;
if (!pg_has_reset_since(rm->epoch_started)) {
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
- MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->request);
+ dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl;
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
if (!rm->committed) {
@@ -5629,7 +5629,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
}
}
} else {
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request
+ dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req()
<< " from epoch " << rm->epoch_started << " < last_peering_reset "
<< last_peering_reset << dendl;
}
@@ -5651,19 +5651,19 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
if (!pg_has_reset_since(rm->epoch_started)) {
// send commit.
- dout(10) << "sub_op_modify_commit on op " << *rm->op->request
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
<< ", sending commit to osd." << rm->ackerosd
<< dendl;
if (get_osdmap()->is_up(rm->ackerosd)) {
last_complete_ondisk = rm->last_complete;
- MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->request), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
}
} else {
- dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request
+ dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req()
<< " from epoch " << rm->epoch_started << " < last_peering_reset "
<< last_peering_reset << dendl;
}
@@ -5680,7 +5680,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
{
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
op->mark_started();
@@ -6630,7 +6630,7 @@ void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
{
- MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
+ MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
const hobject_t& soid = reply->get_poid();
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
@@ -6643,7 +6643,7 @@ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
PushOp pop;
bool more = handle_push_reply(peer, rop, &pop);
if (more)
- send_push_op_legacy(op->request->get_priority(), peer, pop);
+ send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
}
bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
@@ -6724,7 +6724,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
*/
void ReplicatedBackend::sub_op_pull(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
op->mark_started();
@@ -6917,7 +6917,7 @@ void ReplicatedBackend::trim_pushed_data(
void ReplicatedBackend::sub_op_push(OpRequestRef op)
{
op->mark_started();
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
PushOp pop;
pop.soid = m->recovery_info.soid;
@@ -6949,14 +6949,14 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
C_ReplicatedBackend_OnPullComplete *c =
new C_ReplicatedBackend_OnPullComplete(
this,
- op->request->get_priority());
+ op->get_req()->get_priority());
c->to_continue.swap(to_continue);
t->register_on_complete(
new C_QueueInWQ(
&osd->push_wq,
get_parent()->bless_gencontext(c)));
}
- run_recovery_op(h, op->request->get_priority());
+ run_recovery_op(h, op->get_req()->get_priority());
} else {
PushReplyOp resp;
MOSDSubOpReply *reply = new MOSDSubOpReply(
@@ -7001,7 +7001,7 @@ void ReplicatedBackend::_failed_push(int from, const hobject_t &soid)
void ReplicatedPG::sub_op_remove(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_remove " << m->poid << dendl;
@@ -7224,7 +7224,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
if (requeue) {
if (repop->ctx->op) {
- dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
+ dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl;
rq.push_back(repop->ctx->op);
repop->ctx->op = OpRequestRef();
}
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index c277c0d3f86..27c9d1bb605 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -993,7 +993,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
//<< " wfnvram=" << repop.waitfor_nvram
<< " wfdisk=" << repop.waitfor_disk;
if (repop.ctx->op)
- out << " op=" << *(repop.ctx->op->request);
+ out << " op=" << *(repop.ctx->op->get_req());
out << ")";
return out;
}
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 59b71cc6f67..a54fc65f375 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -23,6 +23,7 @@
#include "include/types.h"
#include "include/utime.h"
#include "include/CompatSet.h"
+#include "include/histogram.h"
#include "include/interval_set.h"
#include "common/snap_types.h"
#include "common/Formatter.h"
@@ -555,67 +556,6 @@ inline ostream& operator<<(ostream& out, const eversion_t e) {
return out << e.epoch << "'" << e.version;
}
-
-/**
- * power of 2 histogram
- */
-struct pow2_hist_t {
- /**
- * histogram
- *
- * bin size is 2^index
- * value is count of elements that are <= the current bin but > the previous bin.
- */
- vector<int32_t> h;
-
-private:
- /// expand to at least another's size
- void _expand_to(unsigned s) {
- if (s > h.size())
- h.resize(s, 0);
- }
- /// drop useless trailing 0's
- void _contract() {
- unsigned p = h.size();
- while (p > 0 && h[p-1] == 0)
- --p;
- h.resize(p);
- }
-
-public:
- void clear() {
- h.clear();
- }
- void set(int bin, int32_t v) {
- _expand_to(bin + 1);
- h[bin] = v;
- _contract();
- }
-
- void add(const pow2_hist_t& o) {
- _expand_to(o.h.size());
- for (unsigned p = 0; p < o.h.size(); ++p)
- h[p] += o.h[p];
- _contract();
- }
- void sub(const pow2_hist_t& o) {
- _expand_to(o.h.size());
- for (unsigned p = 0; p < o.h.size(); ++p)
- h[p] -= o.h[p];
- _contract();
- }
-
- int32_t upper_bound() const {
- return 1 << h.size();
- }
-
- void dump(Formatter *f) const;
- void encode(bufferlist &bl) const;
- void decode(bufferlist::iterator &bl);
- static void generate_test_instances(std::list<pow2_hist_t*>& o);
-};
-WRITE_CLASS_ENCODER(pow2_hist_t)
-
/**
* filestore_perf_stat_t
*
diff --git a/src/test/common/test_bloom_filter.cc b/src/test/common/test_bloom_filter.cc
index 8e3661b2cc1..cfd41305caa 100644
--- a/src/test/common/test_bloom_filter.cc
+++ b/src/test/common/test_bloom_filter.cc
@@ -23,7 +23,17 @@ TEST(BloomFilter, Basic) {
ASSERT_TRUE(bf.contains("bar"));
}
+TEST(BloomFilter, Empty) {
+ bloom_filter bf;
+ for (int i=0; i<100; ++i) {
+ ASSERT_FALSE(bf.contains(i));
+ ASSERT_FALSE(bf.contains(stringify(i)));
+ }
+}
+
TEST(BloomFilter, Sweep) {
+ std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield);
+ std::cout.precision(5);
std::cout << "# max\tfpp\tactual\tsize\tB/insert" << std::endl;
for (int ex = 3; ex < 12; ex += 2) {
for (float fpp = .001; fpp < .5; fpp *= 4.0) {
@@ -62,7 +72,9 @@ TEST(BloomFilter, Sweep) {
}
TEST(BloomFilter, SweepInt) {
- std::cout << "# max\tfpp\tactual\tsize\tB/insert" << std::endl;
+ std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield);
+ std::cout.precision(5);
+ std::cout << "# max\tfpp\tactual\tsize\tB/insert\tdensity\tapprox_element_count" << std::endl;
for (int ex = 3; ex < 12; ex += 2) {
for (float fpp = .001; fpp < .5; fpp *= 4.0) {
int max = 2 << ex;
@@ -92,15 +104,70 @@ TEST(BloomFilter, SweepInt) {
double byte_per_insert = (double)bl.length() / (double)max;
- std::cout << max << "\t" << fpp << "\t" << actual << "\t" << bl.length() << "\t" << byte_per_insert << std::endl;
+ std::cout << max << "\t" << fpp << "\t" << actual << "\t" << bl.length() << "\t" << byte_per_insert
+ << "\t" << bf.density() << "\t" << bf.approx_unique_element_count() << std::endl;
ASSERT_TRUE(actual < fpp * 10);
ASSERT_TRUE(actual > fpp / 10);
+ ASSERT_TRUE(bf.density() > 0.40);
+ ASSERT_TRUE(bf.density() < 0.60);
}
}
}
+TEST(BloomFilter, CompressibleSweep) {
+ std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield);
+ std::cout.precision(5);
+ std::cout << "# max\tins\test ins\tafter\ttgtfpp\tactual\tsize\tb/elem\n";
+ float fpp = .01;
+ int max = 1024;
+ for (int div = 1; div < 10; div++) {
+ compressible_bloom_filter bf(max, fpp, 1);
+ int t = max/div;
+ for (int n = 0; n < t; n++)
+ bf.insert(n);
+
+ unsigned est = bf.approx_unique_element_count();
+ if (div > 1)
+ bf.compress(1.0 / div);
+
+ for (int n = 0; n < t; n++)
+ ASSERT_TRUE(bf.contains(n));
+
+ int test = max * 100;
+ int hit = 0;
+ for (int n = 0; n < test; n++)
+ if (bf.contains(100000 + n))
+ hit++;
+
+ double actual = (double)hit / (double)test;
+
+ bufferlist bl;
+ ::encode(bf, bl);
+
+ double byte_per_insert = (double)bl.length() / (double)max;
+ unsigned est_after = bf.approx_unique_element_count();
+ std::cout << max
+ << "\t" << t
+ << "\t" << est
+ << "\t" << est_after
+ << "\t" << fpp
+ << "\t" << actual
+ << "\t" << bl.length() << "\t" << byte_per_insert
+ << std::endl;
+
+ ASSERT_TRUE(actual < fpp * 2.0);
+ ASSERT_TRUE(actual > fpp / 2.0);
+ ASSERT_TRUE(est_after < est * 2);
+ ASSERT_TRUE(est_after > est / 2);
+ }
+}
+
+
+
TEST(BloomFilter, BinSweep) {
+ std::cout.setf(std::ios_base::fixed, std::ios_base::floatfield);
+ std::cout.precision(5);
int total_max = 16384;
float total_fpp = .01;
std::cout << "total_inserts " << total_max << " target-fpp " << total_fpp << std::endl;
diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h
index 59e55a11b23..18ed795c3ef 100644
--- a/src/test/encoding/types.h
+++ b/src/test/encoding/types.h
@@ -6,6 +6,7 @@ TYPE(filepath)
#include "common/bloom_filter.hpp"
TYPE(bloom_filter)
+TYPE(compressible_bloom_filter)
#include "common/snap_types.h"
TYPE(SnapContext)
@@ -35,13 +36,15 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental)
#include "crush/CrushWrapper.h"
TYPE(CrushWrapper)
+#include "include/histogram.h"
+TYPE(pow2_hist_t)
+
#include "osd/osd_types.h"
TYPE(osd_reqid_t)
TYPE(object_locator_t)
TYPE(request_redirect_t)
TYPE(pg_t)
TYPE(coll_t)
-TYPE(pow2_hist_t)
TYPE(filestore_perf_stat_t)
TYPE(osd_stat_t)
TYPE(OSDSuperblock)