diff options
author | Greg Farnum <greg@inktank.com> | 2013-09-19 17:29:40 -0700 |
---|---|---|
committer | Greg Farnum <greg@inktank.com> | 2013-09-19 18:14:59 -0700 |
commit | 3cb6abec4b9384e0a78ab9f8310f7dc6cda7fc90 (patch) | |
tree | aa6ab54436dc3a7352ae7702253225dbc2facaf7 | |
parent | 5fdaccd2d7fdceef402ec2536eff3992f6b28833 (diff) | |
download | ceph-3cb6abec4b9384e0a78ab9f8310f7dc6cda7fc90.tar.gz |
OpTracker: move the OpTracker and OpHistory into common/TrackedOp.[h|cc]
Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/common/TrackedOp.cc | 249 | ||||
-rw-r--r-- | src/common/TrackedOp.h | 90 | ||||
-rw-r--r-- | src/osd/Makefile.am | 1 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 227 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 86 |
5 files changed, 338 insertions, 315 deletions
diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc new file mode 100644 index 00000000000..3056db7eeb4 --- /dev/null +++ b/src/common/TrackedOp.cc @@ -0,0 +1,249 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "TrackedOp.h" +#include "common/Formatter.h" +#include <iostream> +#include <vector> +#include "common/debug.h" +#include "common/config.h" +#include "msg/Message.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_optracker +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "-- op tracker -- "; +} + +void OpHistory::on_shutdown() +{ + arrived.clear(); + duration.clear(); + shutdown = true; +} + +void OpHistory::insert(utime_t now, TrackedOpRef op) +{ + if (shutdown) + return; + duration.insert(make_pair(op->get_duration(), op)); + arrived.insert(make_pair(op->get_arrived(), op)); + cleanup(now); +} + +void OpHistory::cleanup(utime_t now) +{ + while (arrived.size() && + (now - arrived.begin()->first > + (double)(tracker->cct->_conf->osd_op_history_duration))) { + duration.erase(make_pair( + arrived.begin()->second->get_duration(), + arrived.begin()->second)); + arrived.erase(arrived.begin()); + } + + while (duration.size() > tracker->cct->_conf->osd_op_history_size) { + arrived.erase(make_pair( + duration.begin()->second->get_arrived(), + duration.begin()->second)); + duration.erase(duration.begin()); + } +} + +void OpHistory::dump_ops(utime_t now, Formatter *f) +{ + cleanup(now); + f->open_object_section("OpHistory"); + f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size); + f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration); + { + f->open_array_section("Ops"); + for (set<pair<utime_t, TrackedOpRef> >::const_iterator i = + arrived.begin(); + i != arrived.end(); + ++i) { + f->open_object_section("Op"); + i->second->dump(now, f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); +} + +void OpTracker::dump_historic_ops(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + utime_t now = ceph_clock_now(cct); + history.dump_ops(now, f); +} + +void OpTracker::dump_ops_in_flight(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + f->open_object_section("ops_in_flight"); // overall dump + f->dump_int("num_ops", ops_in_flight.size()); + f->open_array_section("ops"); // list of TrackedOps + utime_t now = ceph_clock_now(cct); + for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + f->open_object_section("op"); + (*p)->dump(now, f); + f->close_section(); // this TrackedOp + } + f->close_section(); // list of TrackedOps + f->close_section(); // overall dump +} + +void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + ops_in_flight.push_back(i); + ops_in_flight.back()->seq = seq++; +} + +void OpTracker::unregister_inflight_op(TrackedOp *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + assert(i->xitem.get_list() == &ops_in_flight); + utime_t now = ceph_clock_now(cct); + i->xitem.remove_myself(); + i->request->clear_data(); + history.insert(now, TrackedOpRef(i)); +} + +bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) +{ + Mutex::Locker locker(ops_in_flight_lock); + if (!ops_in_flight.size()) + return false; + + utime_t now = ceph_clock_now(cct); + utime_t too_old = now; + too_old -= cct->_conf->osd_op_complaint_time; + + utime_t oldest_secs = now - ops_in_flight.front()->received_time; + + dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + << "; oldest is " << oldest_secs + << " seconds old" << dendl; + + if (oldest_secs < cct->_conf->osd_op_complaint_time) + return false; + + xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); + warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1); + + int slow = 0; // total slow + int warned = 0; // total logged + while (!i.end() && (*i)->received_time < too_old) { + slow++; + + // exponential backoff of warning intervals + if (((*i)->received_time + + (cct->_conf->osd_op_complaint_time * + (*i)->warn_interval_multiplier)) < now) { + // will warn + if (warning_vector.empty()) + warning_vector.push_back(""); + warned++; + if (warned > cct->_conf->osd_op_log_threshold) + break; + + utime_t age = now - (*i)->received_time; + stringstream ss; + ss << "slow request " << age << " seconds old, received at " << (*i)->received_time + << ": " << *((*i)->request) << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); + warning_vector.push_back(ss.str()); + + // only those that have been shown will backoff + (*i)->warn_interval_multiplier *= 2; + } + ++i; + } + + // only summarize if we warn about any. if everything has backed + // off, we will stay silent. + if (warned > 0) { + stringstream ss; + ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " + << oldest_secs << " secs"; + warning_vector[0] = ss.str(); + } + + return warning_vector.size(); +} + +void OpTracker::get_age_ms_histogram(pow2_hist_t *h) +{ + Mutex::Locker locker(ops_in_flight_lock); + + h->clear(); + + utime_t now = ceph_clock_now(NULL); + unsigned bin = 30; + uint32_t lb = 1 << (bin-1); // lower bound for this bin + int count = 0; + for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + utime_t age = now - (*i)->received_time; + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; + } + if (count) + h->set(bin, count); +} + +void OpTracker::mark_event(TrackedOp *op, const string &dest) +{ + utime_t now = ceph_clock_now(cct); + return _mark_event(op, dest, now); +} + +void OpTracker::_mark_event(TrackedOp *op, const string &evt, + utime_t time) +{ + Mutex::Locker locker(ops_in_flight_lock); + dout(5) << //"reqid: " << op->get_reqid() << + ", seq: " << op->seq + << ", time: " << time << ", event: " << evt + << ", request: " << *op->request << dendl; +} + +void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { + op->mark_event("done"); + tracker->unregister_inflight_op(op); + // Do not delete op, unregister_inflight_op took control +} + +void TrackedOp::mark_event(const string &event) +{ + utime_t now = ceph_clock_now(g_ceph_context); + { + Mutex::Locker l(lock); + events.push_back(make_pair(now, event)); + } + tracker->mark_event(this, event); +} diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 7a7b66396f6..644fc0a6182 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -17,10 +17,97 @@ #include <stdint.h> #include <include/utime.h> #include "common/Mutex.h" +#include "include/histogram.h" #include "include/xlist.h" #include "msg/Message.h" #include <tr1/memory> +class TrackedOp; +typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef; + +class OpTracker; +class OpHistory { + set<pair<utime_t, TrackedOpRef> > arrived; + set<pair<double, TrackedOpRef> > duration; + void cleanup(utime_t now); + bool shutdown; + OpTracker *tracker; + +public: + OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {} + ~OpHistory() { + assert(arrived.empty()); + assert(duration.empty()); + } + void insert(utime_t now, TrackedOpRef op); + void dump_ops(utime_t now, Formatter *f); + void on_shutdown(); +}; + +class OpTracker { + class RemoveOnDelete { + OpTracker *tracker; + public: + RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} + void operator()(TrackedOp *op); + }; + friend class RemoveOnDelete; + friend class OpRequest; + friend class OpHistory; + uint64_t seq; + Mutex ops_in_flight_lock; + xlist<TrackedOp *> ops_in_flight; + OpHistory history; + +protected: + CephContext *cct; + +public: + OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {} + void dump_ops_in_flight(Formatter *f); + void dump_historic_ops(Formatter *f); + void register_inflight_op(xlist<TrackedOp*>::item *i); + void unregister_inflight_op(TrackedOp *i); + + void get_age_ms_histogram(pow2_hist_t *h); + + /** + * Look for Ops which are too old, and insert warning + * strings for each Op that is too old. + * + * @param warning_strings A vector<string> reference which is filled + * with a warning string for each old Op. + * @return True if there are any Ops to warn on, false otherwise. + */ + bool check_ops_in_flight(std::vector<string> &warning_strings); + void mark_event(TrackedOp *op, const string &evt); + void _mark_event(TrackedOp *op, const string &evt, utime_t now); + + void on_shutdown() { + Mutex::Locker l(ops_in_flight_lock); + history.on_shutdown(); + } + ~OpTracker() { + assert(ops_in_flight.empty()); + } + + template <typename T, typename TRef> + TRef create_request(Message *ref) + { + TRef retval(new T(ref, this), + RemoveOnDelete(this)); + + _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); + _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); + _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); + _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); + + retval->init_from_message(); + + return retval; + } +}; + class TrackedOp { protected: list<pair<utime_t, string> > events; /// list of events and their times @@ -56,10 +143,9 @@ public: 0.0; } - virtual void mark_event(const string &event) = 0; + virtual void mark_event(const string &event); virtual const char *state_string() const = 0; virtual void dump(utime_t now, Formatter *f) const = 0; }; -typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef; #endif diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index ea7c036f858..d6d0e363dbb 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -15,6 +15,7 @@ libosd_la_SOURCES = \ osd/Watch.cc \ osd/ClassHandler.cc \ osd/OpRequest.cc \ + common/TrackedOp.cc \ osd/SnapMapper.cc \ osd/osd_types.cc \ objclass/class_api.cc diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 5a9abd63cf6..81980ed2a57 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -11,14 +11,7 @@ #include "messages/MOSDSubOp.h" #include "include/assert.h" -#define dout_subsys ceph_subsys_optracker -#undef dout_prefix -#define dout_prefix _prefix(_dout) -static ostream& _prefix(std::ostream* _dout) -{ - return *_dout << "--OSD::tracker-- "; -} OpRequest::OpRequest(Message *req, OpTracker *tracker) : TrackedOp(req), @@ -33,194 +26,6 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) : } } -void OpHistory::on_shutdown() -{ - arrived.clear(); - duration.clear(); - shutdown = true; -} - -void OpHistory::insert(utime_t now, TrackedOpRef op) -{ - if (shutdown) - return; - duration.insert(make_pair(op->get_duration(), op)); - arrived.insert(make_pair(op->get_arrived(), op)); - cleanup(now); -} - -void OpHistory::cleanup(utime_t now) -{ - while (arrived.size() && - (now - arrived.begin()->first > - (double)(tracker->cct->_conf->osd_op_history_duration))) { - duration.erase(make_pair( - arrived.begin()->second->get_duration(), - arrived.begin()->second)); - arrived.erase(arrived.begin()); - } - - while (duration.size() > tracker->cct->_conf->osd_op_history_size) { - arrived.erase(make_pair( - duration.begin()->second->get_arrived(), - duration.begin()->second)); - duration.erase(duration.begin()); - } -} - -void OpHistory::dump_ops(utime_t now, Formatter *f) -{ - cleanup(now); - f->open_object_section("OpHistory"); - f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size); - f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration); - { - f->open_array_section("Ops"); - for (set<pair<utime_t, TrackedOpRef> >::const_iterator i = - arrived.begin(); - i != arrived.end(); - ++i) { - f->open_object_section("Op"); - i->second->dump(now, f); - f->close_section(); - } - f->close_section(); - } - f->close_section(); -} - -void OpTracker::dump_historic_ops(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - utime_t now = ceph_clock_now(cct); - history.dump_ops(now, f); -} - -void OpTracker::dump_ops_in_flight(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - f->open_object_section("ops_in_flight"); // overall dump - f->dump_int("num_ops", ops_in_flight.size()); - f->open_array_section("ops"); // list of TrackedOps - utime_t now = ceph_clock_now(cct); - for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { - f->open_object_section("op"); - (*p)->dump(now, f); - f->close_section(); // this TrackedOp - } - f->close_section(); // list of TrackedOps - f->close_section(); // overall dump -} - -void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - ops_in_flight.push_back(i); - ops_in_flight.back()->seq = seq++; -} - -void OpTracker::unregister_inflight_op(TrackedOp *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - assert(i->xitem.get_list() == &ops_in_flight); - utime_t now = ceph_clock_now(cct); - i->xitem.remove_myself(); - i->request->clear_data(); - history.insert(now, TrackedOpRef(i)); -} - -bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) -{ - Mutex::Locker locker(ops_in_flight_lock); - if (!ops_in_flight.size()) - return false; - - utime_t now = ceph_clock_now(cct); - utime_t too_old = now; - too_old -= cct->_conf->osd_op_complaint_time; - - utime_t oldest_secs = now - ops_in_flight.front()->received_time; - - dout(10) << "ops_in_flight.size: " << ops_in_flight.size() - << "; oldest is " << oldest_secs - << " seconds old" << dendl; - - if (oldest_secs < cct->_conf->osd_op_complaint_time) - return false; - - xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); - warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1); - - int slow = 0; // total slow - int warned = 0; // total logged - while (!i.end() && (*i)->received_time < too_old) { - slow++; - - // exponential backoff of warning intervals - if (((*i)->received_time + - (cct->_conf->osd_op_complaint_time * - (*i)->warn_interval_multiplier)) < now) { - // will warn - if (warning_vector.empty()) - warning_vector.push_back(""); - warned++; - if (warned > cct->_conf->osd_op_log_threshold) - break; - - utime_t age = now - (*i)->received_time; - stringstream ss; - ss << "slow request " << age << " seconds old, received at " << (*i)->received_time - << ": " << *((*i)->request) << " currently " - << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); - warning_vector.push_back(ss.str()); - - // only those that have been shown will backoff - (*i)->warn_interval_multiplier *= 2; - } - ++i; - } - - // only summarize if we warn about any. if everything has backed - // off, we will stay silent. - if (warned > 0) { - stringstream ss; - ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " - << oldest_secs << " secs"; - warning_vector[0] = ss.str(); - } - - return warning_vector.size(); -} - -void OpTracker::get_age_ms_histogram(pow2_hist_t *h) -{ - Mutex::Locker locker(ops_in_flight_lock); - - h->clear(); - - utime_t now = ceph_clock_now(NULL); - unsigned bin = 30; - uint32_t lb = 1 << (bin-1); // lower bound for this bin - int count = 0; - for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { - utime_t age = now - (*i)->received_time; - uint32_t ms = (long)(age * 1000.0); - if (ms >= lb) { - count++; - continue; - } - if (count) - h->set(bin, count); - while (lb > ms) { - bin--; - lb >>= 1; - } - count = 1; - } - if (count) - h->set(bin, count); -} - void OpRequest::dump(utime_t now, Formatter *f) const { Message *m = request; @@ -254,38 +59,6 @@ void OpRequest::dump(utime_t now, Formatter *f) const } } -void OpTracker::mark_event(TrackedOp *op, const string &dest) -{ - utime_t now = ceph_clock_now(cct); - return _mark_event(op, dest, now); -} - -void OpTracker::_mark_event(TrackedOp *op, const string &evt, - utime_t time) -{ - Mutex::Locker locker(ops_in_flight_lock); - dout(5) << //"reqid: " << op->get_reqid() << - ", seq: " << op->seq - << ", time: " << time << ", event: " << evt - << ", request: " << *op->request << dendl; -} - -void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { - op->mark_event("done"); - tracker->unregister_inflight_op(op); - // Do not delete op, unregister_inflight_op took control -} - -void OpRequest::mark_event(const string &event) -{ - utime_t now = ceph_clock_now(tracker->cct); - { - Mutex::Locker l(lock); - events.push_back(make_pair(now, event)); - } - tracker->mark_event(this, event); -} - void OpRequest::init_from_message() { if (request->get_type() == CEPH_MSG_OSD_OP) { diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 7dc8e78afa5..80dd08cb98f 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -19,103 +19,18 @@ #include <include/utime.h> #include "common/Mutex.h" -#include "include/histogram.h" #include "include/xlist.h" #include "msg/Message.h" #include <tr1/memory> #include "common/TrackedOp.h" #include "osd/osd_types.h" -class OpTracker; -class OpHistory { - set<pair<utime_t, TrackedOpRef> > arrived; - set<pair<double, TrackedOpRef> > duration; - void cleanup(utime_t now); - bool shutdown; - OpTracker *tracker; - -public: - OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {} - ~OpHistory() { - assert(arrived.empty()); - assert(duration.empty()); - } - void insert(utime_t now, TrackedOpRef op); - void dump_ops(utime_t now, Formatter *f); - void on_shutdown(); -}; - -class OpTracker { - class RemoveOnDelete { - OpTracker *tracker; - public: - RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} - void operator()(TrackedOp *op); - }; - friend class RemoveOnDelete; - friend class OpRequest; - friend class OpHistory; - uint64_t seq; - Mutex ops_in_flight_lock; - xlist<TrackedOp *> ops_in_flight; - OpHistory history; - -protected: - CephContext *cct; - -public: - OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {} - void dump_ops_in_flight(Formatter *f); - void dump_historic_ops(Formatter *f); - void register_inflight_op(xlist<TrackedOp*>::item *i); - void unregister_inflight_op(TrackedOp *i); - - void get_age_ms_histogram(pow2_hist_t *h); - - /** - * Look for Ops which are too old, and insert warning - * strings for each Op that is too old. - * - * @param warning_strings A vector<string> reference which is filled - * with a warning string for each old Op. - * @return True if there are any Ops to warn on, false otherwise. - */ - bool check_ops_in_flight(std::vector<string> &warning_strings); - void mark_event(TrackedOp *op, const string &evt); - void _mark_event(TrackedOp *op, const string &evt, utime_t now); - - void on_shutdown() { - Mutex::Locker l(ops_in_flight_lock); - history.on_shutdown(); - } - ~OpTracker() { - assert(ops_in_flight.empty()); - } - - template <typename T, typename TRef> - TRef create_request(Message *ref) - { - TRef retval(new T(ref, this), - RemoveOnDelete(this)); - - _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); - _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); - _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); - _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); - - retval->init_from_message(); - - return retval; - } -}; - /** * The OpRequest takes in a Message* and takes over a single reference * to it, which it puts() when destroyed. */ struct OpRequest : public TrackedOp { friend class OpTracker; - friend class OpHistory; // rmw flags int rmw_flags; @@ -229,7 +144,6 @@ public: latest_flag_point = flag_commit_sent; } - void mark_event(const string &event); osd_reqid_t get_reqid() const { return reqid; } |