summaryrefslogtreecommitdiff
path: root/src/librbd/AioCompletion.h
blob: 361d425531f24015a5aea06a89c8189feaa99f2a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_AIOCOMPLETION_H
#define CEPH_LIBRBD_AIOCOMPLETION_H

#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/ceph_context.h"
#include "common/perf_counters.h"
#include "include/Context.h"
#include "include/utime.h"
#include "include/rbd/librbd.hpp"

#include "librbd/ImageCtx.h"
#include "librbd/internal.h"

#include "osdc/Striper.h"

namespace librbd {

  class AioRead;

  typedef enum {
    AIO_TYPE_READ = 0,
    AIO_TYPE_WRITE,
    AIO_TYPE_DISCARD,
    AIO_TYPE_NONE,
  } aio_type_t;

  /**
   * AioCompletion is the overall completion for a single
   * rbd I/O request. It may be composed of many AioRequests,
   * which each go to a single object.
   *
   * The retrying of individual requests is handled at a lower level,
   * so all AioCompletion cares about is the count of outstanding
   * requests. Note that this starts at 1 to prevent the reference
   * count from reaching 0 while more requests are being added. When
   * all requests have been added, finish_adding_requests() releases
   * this initial reference.
   */
  struct AioCompletion {
    Mutex lock;
    Cond cond;
    bool done;
    ssize_t rval;
    callback_t complete_cb;
    void *complete_arg;
    rbd_completion_t rbd_comp;
    int pending_count;   ///< number of requests
    bool building;       ///< true if we are still building this completion
    int ref;
    bool released;
    ImageCtx *ictx;
    utime_t start_time;
    aio_type_t aio_type;

    Striper::StripedReadResult destriper;
    bufferlist *read_bl;
    char *read_buf;
    size_t read_buf_len;

    AioCompletion() : lock("AioCompletion::lock", true),
		      done(false), rval(0), complete_cb(NULL),
		      complete_arg(NULL), rbd_comp(NULL),
		      pending_count(0), building(true),
		      ref(1), released(false), ictx(NULL),
		      aio_type(AIO_TYPE_NONE),
		      read_bl(NULL), read_buf(NULL), read_buf_len(0) {
    }
    ~AioCompletion() {
    }

    int wait_for_complete() {
      lock.Lock();
      while (!done)
	cond.Wait(lock);
      lock.Unlock();
      return 0;
    }

    void add_request() {
      lock.Lock();
      pending_count++;
      lock.Unlock();
      get();
    }

    void finalize(CephContext *cct, ssize_t rval);

    void finish_adding_requests(CephContext *cct);

    void init_time(ImageCtx *i, aio_type_t t) {
      ictx = i;
      aio_type = t;
      start_time = ceph_clock_now(ictx->cct);
    }

    void complete() {
      utime_t elapsed;
      assert(lock.is_locked());
      elapsed = ceph_clock_now(ictx->cct) - start_time;
      if (complete_cb) {
	complete_cb(rbd_comp, complete_arg);
      }
      switch (aio_type) {
      case AIO_TYPE_READ: 
	ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
      case AIO_TYPE_WRITE:
	ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
      case AIO_TYPE_DISCARD:
	ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
      default:
	lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
	break;
      }
      done = true;
      cond.Signal();
    }

    void set_complete_cb(void *cb_arg, callback_t cb) {
      complete_cb = cb;
      complete_arg = cb_arg;
    }

    void complete_request(CephContext *cct, ssize_t r);

    ssize_t get_return_value() {
      lock.Lock();
      ssize_t r = rval;
      lock.Unlock();
      return r;
    }

    void get() {
      lock.Lock();
      assert(ref > 0);
      ref++;
      lock.Unlock();
    }
    void release() {
      lock.Lock();
      assert(!released);
      released = true;
      put_unlock();
    }
    void put() {
      lock.Lock();
      put_unlock();
    }
    void put_unlock() {
      assert(ref > 0);
      int n = --ref;
      lock.Unlock();
      if (!n)
	delete this;
    }
  };

  class C_AioRead : public Context {
  public:
    C_AioRead(CephContext *cct, AioCompletion *completion)
      : m_cct(cct), m_completion(completion), m_req(NULL)
    { }
    virtual ~C_AioRead() {}
    virtual void finish(int r);
    void set_req(AioRead *req) {
      m_req = req;
    }
  private:
    CephContext *m_cct;
    AioCompletion *m_completion;
    AioRead *m_req;
  };

  class C_AioWrite : public Context {
  public:
    C_AioWrite(CephContext *cct, AioCompletion *completion)
      : m_cct(cct), m_completion(completion) {}
    virtual ~C_AioWrite() {}
    virtual void finish(int r) {
      m_completion->complete_request(m_cct, r);
    }
  private:
    CephContext *m_cct;
    AioCompletion *m_completion;
  };

  class C_CacheRead : public Context {
  public:
    C_CacheRead(Context *completion, AioRead *req)
      : m_completion(completion), m_req(req) {}
    virtual ~C_CacheRead() {}
    virtual void finish(int r);
  private:
    Context *m_completion;
    AioRead *m_req;
  };
}

#endif