diff options
author | David Zafman <david.zafman@inktank.com> | 2013-02-21 16:11:01 -0800 |
---|---|---|
committer | David Zafman <david.zafman@inktank.com> | 2013-02-21 21:50:02 -0800 |
commit | cfe923920ceee3476e3ec8e7b74b3a515f8e3fb2 (patch) | |
tree | 59c0d0686ee07ae9450e946b40a35920c90511f8 | |
parent | bf5cf3318dd4c1af6409cdb01fe921f6ca215cf8 (diff) | |
download | ceph-cfe923920ceee3476e3ec8e7b74b3a515f8e3fb2.tar.gz |
librados: expose a list of watchers on an object
Add new op CEPH_OSD_OP_LIST_WATCHERS
Add Objecter handling
Signed-off-by: David Zafman <david.zafman@inktank.com>
-rw-r--r-- | src/include/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/rados.h | 2 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 25 | ||||
-rw-r--r-- | src/osd/osd_types.h | 64 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 44 |
5 files changed, 136 insertions, 0 deletions
diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc index 026ca4e2f99..b3c095cb223 100644 --- a/src/include/ceph_strings.cc +++ b/src/include/ceph_strings.cc @@ -25,6 +25,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_NOTIFY: return "notify"; case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; + case CEPH_OSD_OP_LIST_WATCHERS: return "list-watchers"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; diff --git a/src/include/rados.h b/src/include/rados.h index 4f7d7174c47..093a04baf86 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -177,6 +177,8 @@ enum { /* versioning */ CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8, + CEPH_OSD_OP_LIST_WATCHERS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 9, + /* write */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 37f34fcb79e..cfa6bbabe8a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2203,6 +2203,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) break; } + case CEPH_OSD_OP_LIST_WATCHERS: + { + obj_list_watch_response_t resp; + + map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator oi_iter; + for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end(); + oi_iter++) { + dout(20) << "key cookie=" << oi_iter->first.first + << " entity=" << oi_iter->first.second << " " + << oi_iter->second << dendl; + assert(oi_iter->first.first == oi_iter->second.cookie); + assert(oi_iter->first.second.is_client()); + + watch_item_t wi(oi_iter->first.second, oi_iter->second.cookie, + oi_iter->second.timeout_seconds); + resp.entries.push_back(wi); + } + + resp.encode(osd_op.outdata); + result = 0; + + ctx->delta_stats.num_rd++; + break; + } + case CEPH_OSD_OP_ASSERT_SRC_VERSION: { uint64_t ver = op.watch.ver; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 008fe8e9ded..a0755825e1e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2029,4 +2029,68 @@ struct OSDOp { ostream& operator<<(ostream& out, const OSDOp& op); +struct watch_item_t { + entity_name_t name; + uint64_t cookie; + uint32_t timeout_seconds; + + watch_item_t() : cookie(0), timeout_seconds(0) { } + watch_item_t(entity_name_t name, uint64_t cookie, uint32_t timeout) + : name(name), cookie(cookie), timeout_seconds(timeout) { } + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(name, bl); + ::encode(cookie, bl); + ::encode(timeout_seconds, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(name, bl); + ::decode(cookie, bl); + ::decode(timeout_seconds, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(watch_item_t) + +/** + * obj list watch response format + * + */ +struct obj_list_watch_response_t { + list<watch_item_t> entries; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const { + f->open_array_section("entries"); + for (list<watch_item_t>::const_iterator p = entries.begin(); p != entries.end(); ++p) { + f->open_object_section("watch"); + f->dump_stream("watcher") << p->name; + f->dump_int("cookie", p->cookie); + f->dump_int("timeout", p->timeout_seconds); + f->close_section(); + } + f->close_section(); + } + static void generate_test_instances(list<obj_list_watch_response_t*>& o) { + o.push_back(new obj_list_watch_response_t); + o.push_back(new obj_list_watch_response_t); + o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 1), 10, 30)); + o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 2), 20, 60)); + } +}; + +WRITE_CLASS_ENCODER(obj_list_watch_response_t) + #endif diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 692deba8c32..2b604c85684 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -313,6 +313,37 @@ struct ObjectOperation { } } }; + struct C_ObjectOperation_decodewatchers : public Context { + bufferlist bl; + list<obj_watch_t> *pwatchers; + int *prval; + C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr) + : pwatchers(pw), prval(pr) {} + void finish(int r) { + if (r >= 0) { + bufferlist::iterator p = bl.begin(); + try { + obj_list_watch_response_t resp; + ::decode(resp, p); + if (pwatchers) { + for (list<watch_item_t>::iterator i = resp.entries.begin() ; + i != resp.entries.end() ; ++i) { + obj_watch_t ow; + ow.watcher_id = i->name.num(); + ow.cookie = i->cookie; + ow.timeout_seconds = i->timeout_seconds; + pwatchers->push_back(ow); + } + } + *prval = 0; + } + catch (buffer::error& e) { + if (prval) + *prval = -EIO; + } + } + } + }; void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) { add_op(CEPH_OSD_OP_GETXATTRS); if (pattrs || prval) { @@ -495,6 +526,19 @@ struct ObjectOperation { add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl); } + void list_watchers(list<obj_watch_t> *out, + int *prval) { + (void)add_op(CEPH_OSD_OP_LIST_WATCHERS); + if (prval || out) { + unsigned p = ops.size() - 1; + C_ObjectOperation_decodewatchers *h = + new C_ObjectOperation_decodewatchers(out, prval); + out_handler[p] = h; + out_bl[p] = &h->bl; + out_rval[p] = prval; + } + } + void assert_version(uint64_t ver) { bufferlist bl; add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl); |