summaryrefslogtreecommitdiff
path: root/src/osd/OpRequest.h
blob: ca419f34ff88c8170c7b5f5e4660502e15d1f640 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 */

#ifndef OPREQUEST_H_
#define OPREQUEST_H_
#include <sstream>
#include <stdint.h>
#include <vector>

#include <include/utime.h>
#include "common/Mutex.h"
#include "include/xlist.h"
#include "msg/Message.h"
#include <tr1/memory>
#include "common/TrackedOp.h"
#include "osd/osd_types.h"

class OpRequest;
typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpHistory {
  set<pair<utime_t, OpRequestRef> > arrived;
  set<pair<double, OpRequestRef> > duration;
  void cleanup(utime_t now);
  bool shutdown;

public:
  OpHistory() : shutdown(false) {}
  ~OpHistory() {
    assert(arrived.empty());
    assert(duration.empty());
  }
  void insert(utime_t now, OpRequestRef op);
  void dump_ops(utime_t now, Formatter *f);
  void on_shutdown();
};

class OpTracker {
  class RemoveOnDelete {
    OpTracker *tracker;
  public:
    RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
    void operator()(OpRequest *op);
  };
  friend class RemoveOnDelete;
  uint64_t seq;
  Mutex ops_in_flight_lock;
  xlist<OpRequest *> ops_in_flight;
  OpHistory history;

public:
  OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
  void dump_ops_in_flight(std::ostream& ss);
  void dump_historic_ops(std::ostream& ss);
  void register_inflight_op(xlist<OpRequest*>::item *i);
  void unregister_inflight_op(OpRequest *i);

  /**
   * Look for Ops which are too old, and insert warning
   * strings for each Op that is too old.
   *
   * @param warning_strings A vector<string> reference which is filled
   * with a warning string for each old Op.
   * @return True if there are any Ops to warn on, false otherwise.
   */
  bool check_ops_in_flight(std::vector<string> &warning_strings);
  void mark_event(OpRequest *op, const string &evt);
  void _mark_event(OpRequest *op, const string &evt, utime_t now);
  OpRequestRef create_request(Message *req);
  void on_shutdown() {
    Mutex::Locker l(ops_in_flight_lock);
    history.on_shutdown();
  }
  ~OpTracker() {
    assert(ops_in_flight.empty());
  }
};

/**
 * The OpRequest takes in a Message* and takes over a single reference
 * to it, which it puts() when destroyed.
 * OpRequest is itself ref-counted. The expectation is that you get a Message
 * you want to track, create an OpRequest with it, and then pass around that OpRequest
 * the way you used to pass around the Message.
 */
struct OpRequest : public TrackedOp {
  friend class OpTracker;
  friend class OpHistory;
  Message *request;
  xlist<OpRequest*>::item xitem;

  // rmw flags
  int rmw_flags;

  bool check_rmw(int flag) {
    return rmw_flags & flag;
  }
  bool may_read() { return need_read_cap() || need_class_read_cap(); }
  bool may_write() { return need_write_cap() || need_class_write_cap(); }
  bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
  bool need_read_cap() {
    return check_rmw(CEPH_OSD_RMW_FLAG_READ);
  }
  bool need_write_cap() {
    return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
  }
  bool need_class_read_cap() {
    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
  }
  bool need_class_write_cap() {
    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
  }
  void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; }
  void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; }
  void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; }
  void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
  void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }

  utime_t received_time;
  uint8_t warn_interval_multiplier;
  utime_t get_arrived() const {
    return received_time;
  }
  double get_duration() const {
    return events.size() ?
      (events.rbegin()->first - received_time) :
      0.0;
  }

  void dump(utime_t now, Formatter *f) const;

private:
  list<pair<utime_t, string> > events;
  string current;
  Mutex lock;
  OpTracker *tracker;
  osd_reqid_t reqid;
  uint8_t hit_flag_points;
  uint8_t latest_flag_point;
  uint64_t seq;
  static const uint8_t flag_queued_for_pg=1 << 0;
  static const uint8_t flag_reached_pg =  1 << 1;
  static const uint8_t flag_delayed =     1 << 2;
  static const uint8_t flag_started =     1 << 3;
  static const uint8_t flag_sub_op_sent = 1 << 4;
  static const uint8_t flag_commit_sent = 1 << 5;

  OpRequest(Message *req, OpTracker *tracker) :
    request(req), xitem(this),
    rmw_flags(0),
    warn_interval_multiplier(1),
    lock("OpRequest::lock"),
    tracker(tracker),
    hit_flag_points(0), latest_flag_point(0),
    seq(0) {
    received_time = request->get_recv_stamp();
    tracker->register_inflight_op(&xitem);
  }
public:
  ~OpRequest() {
    assert(request);
    request->put();
  }

  bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
  bool been_reached_pg() { return hit_flag_points & flag_reached_pg; }
  bool been_delayed() { return hit_flag_points & flag_delayed; }
  bool been_started() { return hit_flag_points & flag_started; }
  bool been_sub_op_sent() { return hit_flag_points & flag_sub_op_sent; }
  bool been_commit_sent() { return hit_flag_points & flag_commit_sent; }
  bool currently_queued_for_pg() { return latest_flag_point & flag_queued_for_pg; }
  bool currently_reached_pg() { return latest_flag_point & flag_reached_pg; }
  bool currently_delayed() { return latest_flag_point & flag_delayed; }
  bool currently_started() { return latest_flag_point & flag_started; }
  bool currently_sub_op_sent() { return latest_flag_point & flag_sub_op_sent; }
  bool currently_commit_sent() { return latest_flag_point & flag_commit_sent; }

  const char *state_string() const {
    switch(latest_flag_point) {
    case flag_queued_for_pg: return "queued for pg";
    case flag_reached_pg: return "reached pg";
    case flag_delayed: return "delayed";
    case flag_started: return "started";
    case flag_sub_op_sent: return "waiting for sub ops";
    case flag_commit_sent: return "commit sent; apply or cleanup";
    default: break;
    }
    return "no flag points reached";
  }

  void mark_queued_for_pg() {
    mark_event("queued_for_pg");
    current = "queued for pg";
    hit_flag_points |= flag_queued_for_pg;
    latest_flag_point = flag_queued_for_pg;
  }
  void mark_reached_pg() {
    mark_event("reached_pg");
    current = "reached pg";
    hit_flag_points |= flag_reached_pg;
    latest_flag_point = flag_reached_pg;
  }
  void mark_delayed(string s) {
    mark_event(s);
    current = s;
    hit_flag_points |= flag_delayed;
    latest_flag_point = flag_delayed;
  }
  void mark_started() {
    mark_event("started");
    current = "started";
    hit_flag_points |= flag_started;
    latest_flag_point = flag_started;
  }
  void mark_sub_op_sent(string s) {
    mark_event(s);
    current = s;
    hit_flag_points |= flag_sub_op_sent;
    latest_flag_point = flag_sub_op_sent;
  }
  void mark_commit_sent() {
    mark_event("commit_sent");
    current = "commit sent";
    hit_flag_points |= flag_commit_sent;
    latest_flag_point = flag_commit_sent;
  }

  void mark_event(const string &event);
  osd_reqid_t get_reqid() const {
    return reqid;
  }
};

#endif /* OPREQUEST_H_ */