diff options
Diffstat (limited to 'src/common/TrackedOp.cc')
-rw-r--r-- | src/common/TrackedOp.cc | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc new file mode 100644 index 00000000000..d1dbc1e7135 --- /dev/null +++ b/src/common/TrackedOp.cc @@ -0,0 +1,265 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "TrackedOp.h" +#include "common/Formatter.h" +#include <iostream> +#include <vector> +#include "common/debug.h" +#include "common/config.h" +#include "msg/Message.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_optracker +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "-- op tracker -- "; +} + +void OpHistory::on_shutdown() +{ + arrived.clear(); + duration.clear(); + shutdown = true; +} + +void OpHistory::insert(utime_t now, TrackedOpRef op) +{ + if (shutdown) + return; + duration.insert(make_pair(op->get_duration(), op)); + arrived.insert(make_pair(op->get_arrived(), op)); + cleanup(now); +} + +void OpHistory::cleanup(utime_t now) +{ + while (arrived.size() && + (now - arrived.begin()->first > + (double)(history_duration))) { + duration.erase(make_pair( + arrived.begin()->second->get_duration(), + arrived.begin()->second)); + arrived.erase(arrived.begin()); + } + + while (duration.size() > history_size) { + arrived.erase(make_pair( + duration.begin()->second->get_arrived(), + duration.begin()->second)); + duration.erase(duration.begin()); + } +} + +void OpHistory::dump_ops(utime_t now, Formatter *f) +{ + cleanup(now); + f->open_object_section("OpHistory"); + f->dump_int("num to keep", history_size); + f->dump_int("duration to keep", history_duration); + { + f->open_array_section("Ops"); + for (set<pair<utime_t, TrackedOpRef> >::const_iterator i = + arrived.begin(); + i != arrived.end(); + ++i) { + f->open_object_section("Op"); + i->second->dump(now, f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); +} + +void OpTracker::dump_historic_ops(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + utime_t now = ceph_clock_now(cct); + history.dump_ops(now, f); +} + +void OpTracker::dump_ops_in_flight(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + f->open_object_section("ops_in_flight"); // overall dump + f->dump_int("num_ops", ops_in_flight.size()); + f->open_array_section("ops"); // list of TrackedOps + utime_t now = ceph_clock_now(cct); + for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + f->open_object_section("op"); + (*p)->dump(now, f); + f->close_section(); // this TrackedOp + } + f->close_section(); // list of TrackedOps + f->close_section(); // overall dump +} + +void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + ops_in_flight.push_back(i); + ops_in_flight.back()->seq = seq++; +} + +void OpTracker::unregister_inflight_op(TrackedOp *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + assert(i->xitem.get_list() == &ops_in_flight); + utime_t now = ceph_clock_now(cct); + i->xitem.remove_myself(); + i->request->clear_data(); + history.insert(now, TrackedOpRef(i)); +} + +bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) +{ + Mutex::Locker locker(ops_in_flight_lock); + if (!ops_in_flight.size()) + return false; + + utime_t now = ceph_clock_now(cct); + utime_t too_old = now; + too_old -= complaint_time; + + utime_t oldest_secs = now - ops_in_flight.front()->get_arrived(); + + dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + << "; oldest is " << oldest_secs + << " seconds old" << dendl; + + if (oldest_secs < complaint_time) + return false; + + xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); + warning_vector.reserve(log_threshold + 1); + + int slow = 0; // total slow + int warned = 0; // total logged + while (!i.end() && (*i)->get_arrived() < too_old) { + slow++; + + // exponential backoff of warning intervals + if (((*i)->get_arrived() + + (complaint_time * (*i)->warn_interval_multiplier)) < now) { + // will warn + if (warning_vector.empty()) + warning_vector.push_back(""); + warned++; + if (warned > log_threshold) + break; + + utime_t age = now - (*i)->get_arrived(); + stringstream ss; + ss << "slow request " << age << " seconds old, received at " << (*i)->get_arrived() + << ": " << *((*i)->request) << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); + warning_vector.push_back(ss.str()); + + // only those that have been shown will backoff + (*i)->warn_interval_multiplier *= 2; + } + ++i; + } + + // only summarize if we warn about any. if everything has backed + // off, we will stay silent. + if (warned > 0) { + stringstream ss; + ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " + << oldest_secs << " secs"; + warning_vector[0] = ss.str(); + } + + return warning_vector.size(); +} + +void OpTracker::get_age_ms_histogram(pow2_hist_t *h) +{ + Mutex::Locker locker(ops_in_flight_lock); + + h->clear(); + + utime_t now = ceph_clock_now(NULL); + unsigned bin = 30; + uint32_t lb = 1 << (bin-1); // lower bound for this bin + int count = 0; + for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + utime_t age = now - (*i)->get_arrived(); + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; + } + if (count) + h->set(bin, count); +} + +void OpTracker::mark_event(TrackedOp *op, const string &dest) +{ + utime_t now = ceph_clock_now(cct); + return _mark_event(op, dest, now); +} + +void OpTracker::_mark_event(TrackedOp *op, const string &evt, + utime_t time) +{ + Mutex::Locker locker(ops_in_flight_lock); + dout(5) << //"reqid: " << op->get_reqid() << + ", seq: " << op->seq + << ", time: " << time << ", event: " << evt + << ", request: " << *op->request << dendl; +} + +void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { + op->mark_event("done"); + tracker->unregister_inflight_op(op); + // Do not delete op, unregister_inflight_op took control +} + +void TrackedOp::mark_event(const string &event) +{ + utime_t now = ceph_clock_now(g_ceph_context); + { + Mutex::Locker l(lock); + events.push_back(make_pair(now, event)); + } + tracker->mark_event(this, event); + _event_marked(); +} + +void TrackedOp::dump(utime_t now, Formatter *f) const +{ + Message *m = request; + stringstream name; + m->print(name); + f->dump_string("description", name.str().c_str()); // this TrackedOp + f->dump_stream("received_at") << get_arrived(); + f->dump_float("age", now - get_arrived()); + f->dump_float("duration", get_duration()); + { + f->open_array_section("type_data"); + _dump(now, f); + f->close_section(); + } +} |