summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Zafman <david.zafman@inktank.com>2013-02-21 16:11:01 -0800
committerDavid Zafman <david.zafman@inktank.com>2013-02-21 21:50:02 -0800
commitcfe923920ceee3476e3ec8e7b74b3a515f8e3fb2 (patch)
tree59c0d0686ee07ae9450e946b40a35920c90511f8
parentbf5cf3318dd4c1af6409cdb01fe921f6ca215cf8 (diff)
downloadceph-cfe923920ceee3476e3ec8e7b74b3a515f8e3fb2.tar.gz
librados: expose a list of watchers on an object
Add new op CEPH_OSD_OP_LIST_WATCHERS Add Objecter handling Signed-off-by: David Zafman <david.zafman@inktank.com>
-rw-r--r--src/include/ceph_strings.cc1
-rw-r--r--src/include/rados.h2
-rw-r--r--src/osd/ReplicatedPG.cc25
-rw-r--r--src/osd/osd_types.h64
-rw-r--r--src/osdc/Objecter.h44
5 files changed, 136 insertions, 0 deletions
diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc
index 026ca4e2f99..b3c095cb223 100644
--- a/src/include/ceph_strings.cc
+++ b/src/include/ceph_strings.cc
@@ -25,6 +25,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_NOTIFY: return "notify";
case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
+ case CEPH_OSD_OP_LIST_WATCHERS: return "list-watchers";
case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
diff --git a/src/include/rados.h b/src/include/rados.h
index 4f7d7174c47..093a04baf86 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -177,6 +177,8 @@ enum {
/* versioning */
CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
+ CEPH_OSD_OP_LIST_WATCHERS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 9,
+
/* write */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 37f34fcb79e..cfa6bbabe8a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -2203,6 +2203,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
break;
}
+ case CEPH_OSD_OP_LIST_WATCHERS:
+ {
+ obj_list_watch_response_t resp;
+
+ map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator oi_iter;
+ for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end();
+ oi_iter++) {
+ dout(20) << "key cookie=" << oi_iter->first.first
+ << " entity=" << oi_iter->first.second << " "
+ << oi_iter->second << dendl;
+ assert(oi_iter->first.first == oi_iter->second.cookie);
+ assert(oi_iter->first.second.is_client());
+
+ watch_item_t wi(oi_iter->first.second, oi_iter->second.cookie,
+ oi_iter->second.timeout_seconds);
+ resp.entries.push_back(wi);
+ }
+
+ resp.encode(osd_op.outdata);
+ result = 0;
+
+ ctx->delta_stats.num_rd++;
+ break;
+ }
+
case CEPH_OSD_OP_ASSERT_SRC_VERSION:
{
uint64_t ver = op.watch.ver;
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 008fe8e9ded..a0755825e1e 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2029,4 +2029,68 @@ struct OSDOp {
ostream& operator<<(ostream& out, const OSDOp& op);
+struct watch_item_t {
+ entity_name_t name;
+ uint64_t cookie;
+ uint32_t timeout_seconds;
+
+ watch_item_t() : cookie(0), timeout_seconds(0) { }
+ watch_item_t(entity_name_t name, uint64_t cookie, uint32_t timeout)
+ : name(name), cookie(cookie), timeout_seconds(timeout) { }
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(name, bl);
+ ::encode(cookie, bl);
+ ::encode(timeout_seconds, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ DECODE_START(1, bl);
+ ::decode(name, bl);
+ ::decode(cookie, bl);
+ ::decode(timeout_seconds, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(watch_item_t)
+
+/**
+ * obj list watch response format
+ *
+ */
+struct obj_list_watch_response_t {
+ list<watch_item_t> entries;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const {
+ f->open_array_section("entries");
+ for (list<watch_item_t>::const_iterator p = entries.begin(); p != entries.end(); ++p) {
+ f->open_object_section("watch");
+ f->dump_stream("watcher") << p->name;
+ f->dump_int("cookie", p->cookie);
+ f->dump_int("timeout", p->timeout_seconds);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ static void generate_test_instances(list<obj_list_watch_response_t*>& o) {
+ o.push_back(new obj_list_watch_response_t);
+ o.push_back(new obj_list_watch_response_t);
+ o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 1), 10, 30));
+ o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 2), 20, 60));
+ }
+};
+
+WRITE_CLASS_ENCODER(obj_list_watch_response_t)
+
#endif
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 692deba8c32..2b604c85684 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -313,6 +313,37 @@ struct ObjectOperation {
}
}
};
+ struct C_ObjectOperation_decodewatchers : public Context {
+ bufferlist bl;
+ list<obj_watch_t> *pwatchers;
+ int *prval;
+ C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
+ : pwatchers(pw), prval(pr) {}
+ void finish(int r) {
+ if (r >= 0) {
+ bufferlist::iterator p = bl.begin();
+ try {
+ obj_list_watch_response_t resp;
+ ::decode(resp, p);
+ if (pwatchers) {
+ for (list<watch_item_t>::iterator i = resp.entries.begin() ;
+ i != resp.entries.end() ; ++i) {
+ obj_watch_t ow;
+ ow.watcher_id = i->name.num();
+ ow.cookie = i->cookie;
+ ow.timeout_seconds = i->timeout_seconds;
+ pwatchers->push_back(ow);
+ }
+ }
+ *prval = 0;
+ }
+ catch (buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ }
+ }
+ }
+ };
void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
add_op(CEPH_OSD_OP_GETXATTRS);
if (pattrs || prval) {
@@ -495,6 +526,19 @@ struct ObjectOperation {
add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
}
+ void list_watchers(list<obj_watch_t> *out,
+ int *prval) {
+ (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
+ if (prval || out) {
+ unsigned p = ops.size() - 1;
+ C_ObjectOperation_decodewatchers *h =
+ new C_ObjectOperation_decodewatchers(out, prval);
+ out_handler[p] = h;
+ out_bl[p] = &h->bl;
+ out_rval[p] = prval;
+ }
+ }
+
void assert_version(uint64_t ver) {
bufferlist bl;
add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);