diff options
-rw-r--r-- | src/common/TrackedOp.h | 34 | ||||
-rw-r--r-- | src/osd/OSD.cc | 2 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 55 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 64 |
4 files changed, 87 insertions, 68 deletions
diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 753331df7f3..835cf86e9d3 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -22,9 +22,41 @@ #include <tr1/memory> class TrackedOp { +protected: + list<pair<utime_t, string> > events; /// list of events and their times + Mutex lock; /// to protect the events list public: - virtual void mark_event(const string &event) = 0; + // move these to private once friended OpTracker + Message *request; + xlist<TrackedOp*>::item xitem; + utime_t received_time; + // figure out how to get rid of this one? + uint8_t warn_interval_multiplier; + string current; + uint64_t seq; + + TrackedOp(Message *req) : + lock("TrackedOp::lock"), + request(req), + xitem(this), + warn_interval_multiplier(1), + seq(0) {} + virtual ~TrackedOp() {} + + utime_t get_arrived() const { + return received_time; + } + // This function maybe needs some work; assumes last event is completion time + double get_duration() const { + return events.size() ? + (events.rbegin()->first - received_time) : + 0.0; + } + + virtual void mark_event(const string &event) = 0; + virtual const char *state_string() const = 0; + virtual void dump(utime_t now, Formatter *f) const = 0; }; typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ff1276969d8..d51e9dc4434 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4724,7 +4724,7 @@ void OSD::_dispatch(Message *m) default: { - OpRequestRef op = op_tracker.create_request(m); + OpRequestRef op = op_tracker.create_request<OpRequest, OpRequestRef>(m); op->mark_event("waiting_for_osdmap"); // no map? starting up? if (!osdmap) { diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 1ffe3073051..388faf36763 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -21,13 +21,10 @@ static ostream& _prefix(std::ostream* _dout) } OpRequest::OpRequest(Message *req, OpTracker *tracker) : - request(req), xitem(this), + TrackedOp(req), rmw_flags(0), - warn_interval_multiplier(1), - lock("OpRequest::lock"), tracker(tracker), - hit_flag_points(0), latest_flag_point(0), - seq(0) { + hit_flag_points(0), latest_flag_point(0) { received_time = request->get_recv_stamp(); tracker->register_inflight_op(&xitem); if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) { @@ -43,7 +40,7 @@ void OpHistory::on_shutdown() shutdown = true; } -void OpHistory::insert(utime_t now, OpRequestRef op) +void OpHistory::insert(utime_t now, TrackedOpRef op) { if (shutdown) return; @@ -79,7 +76,7 @@ void OpHistory::dump_ops(utime_t now, Formatter *f) f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration); { f->open_array_section("Ops"); - for (set<pair<utime_t, OpRequestRef> >::const_iterator i = + for (set<pair<utime_t, TrackedOpRef> >::const_iterator i = arrived.begin(); i != arrived.end(); ++i) { @@ -104,32 +101,32 @@ void OpTracker::dump_ops_in_flight(Formatter *f) Mutex::Locker locker(ops_in_flight_lock); f->open_object_section("ops_in_flight"); // overall dump f->dump_int("num_ops", ops_in_flight.size()); - f->open_array_section("ops"); // list of OpRequests + f->open_array_section("ops"); // list of TrackedOps utime_t now = ceph_clock_now(cct); - for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) { f->open_object_section("op"); (*p)->dump(now, f); - f->close_section(); // this OpRequest + f->close_section(); // this TrackedOp } - f->close_section(); // list of OpRequests + f->close_section(); // list of TrackedOps f->close_section(); // overall dump } -void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i) +void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i) { Mutex::Locker locker(ops_in_flight_lock); ops_in_flight.push_back(i); ops_in_flight.back()->seq = seq++; } -void OpTracker::unregister_inflight_op(OpRequest *i) +void OpTracker::unregister_inflight_op(TrackedOp *i) { Mutex::Locker locker(ops_in_flight_lock); assert(i->xitem.get_list() == &ops_in_flight); utime_t now = ceph_clock_now(cct); i->xitem.remove_myself(); i->request->clear_data(); - history.insert(now, OpRequestRef(i)); + history.insert(now, TrackedOpRef(i)); } bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) @@ -151,7 +148,7 @@ bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) if (oldest_secs < cct->_conf->osd_op_complaint_time) return false; - xlist<OpRequest*>::iterator i = ops_in_flight.begin(); + xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1); int slow = 0; // total slow @@ -205,7 +202,7 @@ void OpTracker::get_age_ms_histogram(pow2_hist_t *h) unsigned bin = 30; uint32_t lb = 1 << (bin-1); // lower bound for this bin int count = 0; - for (xlist<OpRequest*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) { utime_t age = now - (*i)->received_time; uint32_t ms = (long)(age * 1000.0); if (ms >= lb) { @@ -257,44 +254,28 @@ void OpRequest::dump(utime_t now, Formatter *f) const } } -void OpTracker::mark_event(OpRequest *op, const string &dest) +void OpTracker::mark_event(TrackedOp *op, const string &dest) { utime_t now = ceph_clock_now(cct); return _mark_event(op, dest, now); } -void OpTracker::_mark_event(OpRequest *op, const string &evt, +void OpTracker::_mark_event(TrackedOp *op, const string &evt, utime_t time) { Mutex::Locker locker(ops_in_flight_lock); - dout(5) << "reqid: " << op->get_reqid() << ", seq: " << op->seq + dout(5) << //"reqid: " << op->get_reqid() << + ", seq: " << op->seq << ", time: " << time << ", event: " << evt << ", request: " << *op->request << dendl; } -void OpTracker::RemoveOnDelete::operator()(OpRequest *op) { +void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { op->mark_event("done"); tracker->unregister_inflight_op(op); // Do not delete op, unregister_inflight_op took control } -OpRequestRef OpTracker::create_request(Message *ref) -{ - OpRequestRef retval(new OpRequest(ref, this), - RemoveOnDelete(this)); - - if (ref->get_type() == CEPH_MSG_OSD_OP) { - retval->reqid = static_cast<MOSDOp*>(ref)->get_reqid(); - } else if (ref->get_type() == MSG_OSD_SUBOP) { - retval->reqid = static_cast<MOSDSubOp*>(ref)->reqid; - } - _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); - _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); - _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); - _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); - return retval; -} - void OpRequest::mark_event(const string &event) { utime_t now = ceph_clock_now(tracker->cct); diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index d429e6e717f..f501c649699 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -25,13 +25,14 @@ #include <tr1/memory> #include "common/TrackedOp.h" #include "osd/osd_types.h" +// FIXME: augh, get these outta here! +#include "messages/MOSDOp.h" +#include "messages/MOSDSubOp.h" -struct OpRequest; class OpTracker; -typedef std::tr1::shared_ptr<OpRequest> OpRequestRef; class OpHistory { - set<pair<utime_t, OpRequestRef> > arrived; - set<pair<double, OpRequestRef> > duration; + set<pair<utime_t, TrackedOpRef> > arrived; + set<pair<double, TrackedOpRef> > duration; void cleanup(utime_t now); bool shutdown; OpTracker *tracker; @@ -42,7 +43,7 @@ public: assert(arrived.empty()); assert(duration.empty()); } - void insert(utime_t now, OpRequestRef op); + void insert(utime_t now, TrackedOpRef op); void dump_ops(utime_t now, Formatter *f); void on_shutdown(); }; @@ -52,14 +53,14 @@ class OpTracker { OpTracker *tracker; public: RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} - void operator()(OpRequest *op); + void operator()(TrackedOp *op); }; friend class RemoveOnDelete; friend class OpRequest; friend class OpHistory; uint64_t seq; Mutex ops_in_flight_lock; - xlist<OpRequest *> ops_in_flight; + xlist<TrackedOp *> ops_in_flight; OpHistory history; protected: @@ -69,8 +70,8 @@ public: OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {} void dump_ops_in_flight(Formatter *f); void dump_historic_ops(Formatter *f); - void register_inflight_op(xlist<OpRequest*>::item *i); - void unregister_inflight_op(OpRequest *i); + void register_inflight_op(xlist<TrackedOp*>::item *i); + void unregister_inflight_op(TrackedOp *i); void get_age_ms_histogram(pow2_hist_t *h); @@ -83,9 +84,9 @@ public: * @return True if there are any Ops to warn on, false otherwise. */ bool check_ops_in_flight(std::vector<string> &warning_strings); - void mark_event(OpRequest *op, const string &evt); - void _mark_event(OpRequest *op, const string &evt, utime_t now); - OpRequestRef create_request(Message *req); + void mark_event(TrackedOp *op, const string &evt); + void _mark_event(TrackedOp *op, const string &evt, utime_t now); + void on_shutdown() { Mutex::Locker l(ops_in_flight_lock); history.on_shutdown(); @@ -93,6 +94,24 @@ public: ~OpTracker() { assert(ops_in_flight.empty()); } + + template <typename T, typename TRef> + TRef create_request(Message *ref) + { + TRef retval(new T(ref, this), + RemoveOnDelete(this)); + + if (ref->get_type() == CEPH_MSG_OSD_OP) { + retval->reqid = static_cast<MOSDOp*>(ref)->get_reqid(); + } else if (ref->get_type() == MSG_OSD_SUBOP) { + retval->reqid = static_cast<MOSDSubOp*>(ref)->reqid; + } + _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); + _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); + _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); + _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); + return retval; + } }; /** @@ -102,8 +121,6 @@ public: struct OpRequest : public TrackedOp { friend class OpTracker; friend class OpHistory; - Message *request; - xlist<OpRequest*>::item xitem; // rmw flags int rmw_flags; @@ -132,28 +149,13 @@ struct OpRequest : public TrackedOp { void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; } void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; } - utime_t received_time; - uint8_t warn_interval_multiplier; - utime_t get_arrived() const { - return received_time; - } - double get_duration() const { - return events.size() ? - (events.rbegin()->first - received_time) : - 0.0; - } - void dump(utime_t now, Formatter *f) const; private: - list<pair<utime_t, string> > events; - string current; - Mutex lock; OpTracker *tracker; osd_reqid_t reqid; uint8_t hit_flag_points; uint8_t latest_flag_point; - uint64_t seq; static const uint8_t flag_queued_for_pg=1 << 0; static const uint8_t flag_reached_pg = 1 << 1; static const uint8_t flag_delayed = 1 << 2; @@ -162,6 +164,7 @@ private: static const uint8_t flag_commit_sent = 1 << 5; OpRequest(Message *req, OpTracker *tracker); + public: ~OpRequest() { assert(request); @@ -237,4 +240,7 @@ public: } }; +typedef std::tr1::shared_ptr<OpRequest> OpRequestRef; + + #endif /* OPREQUEST_H_ */ |