diff options
author | David Zafman <david.zafman@inktank.com> | 2013-02-22 12:49:47 -0800 |
---|---|---|
committer | David Zafman <david.zafman@inktank.com> | 2013-02-22 12:50:19 -0800 |
commit | d612a9abacdde1844725262fbbeba8578b767333 (patch) | |
tree | a6b9c5bfdfca4767fb452d79dd7172319f0cfbce | |
parent | dc181224abf6fb8fc583730ae3d90acdf0b80f39 (diff) | |
parent | 5648117626cb60452aea8bdf51f88bd2180d8e62 (diff) | |
download | ceph-d612a9abacdde1844725262fbbeba8578b767333.tar.gz |
Merge branch 'wip-3403-4-rebase'
Feature: #3403
Signed-off-by: David Zafman <david.zafman@inktank.com>
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/include/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/rados.h | 2 | ||||
-rw-r--r-- | src/include/rados/librados.h | 1 | ||||
-rw-r--r-- | src/include/rados/librados.hpp | 10 | ||||
-rw-r--r-- | src/include/rados/rados_types.h | 16 | ||||
-rw-r--r-- | src/librados/librados.cc | 22 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 25 | ||||
-rw-r--r-- | src/osd/osd_types.h | 64 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 45 | ||||
-rw-r--r-- | src/rados.cc | 21 | ||||
-rw-r--r-- | src/test/librados/watch_notify.cc | 4 |
12 files changed, 212 insertions, 1 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 60c3f2e5203..9c453df1f53 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1182,6 +1182,7 @@ librbd_include_DATA = \ rados_includedir = $(includedir)/rados rados_include_DATA = \ $(srcdir)/include/rados/librados.h \ + $(srcdir)/include/rados/rados_types.h \ $(srcdir)/include/rados/librados.hpp \ $(srcdir)/include/buffer.h \ $(srcdir)/include/page.h \ @@ -1645,6 +1646,7 @@ noinst_HEADERS = \ include/uuid.h\ include/xlist.h\ include/rados/librados.h\ + include/rados/rados_types.h\ include/rados/librados.hpp\ include/rados/librgw.h\ include/rados/page.h\ diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc index 026ca4e2f99..b3c095cb223 100644 --- a/src/include/ceph_strings.cc +++ b/src/include/ceph_strings.cc @@ -25,6 +25,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_NOTIFY: return "notify"; case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; + case CEPH_OSD_OP_LIST_WATCHERS: return "list-watchers"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; diff --git a/src/include/rados.h b/src/include/rados.h index 4f7d7174c47..093a04baf86 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -177,6 +177,8 @@ enum { /* versioning */ CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8, + CEPH_OSD_OP_LIST_WATCHERS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 9, + /* write */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2, diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index d40d9b588c2..fec4bd6778c 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -13,6 +13,7 @@ extern "C" { #include "include/inttypes.h" #endif #include <string.h> +#include "include/rados/rados_types.h" #ifndef CEPH_OSD_TMAP_SET /* These are also defined in rados.h and objclass.h. Keep them in sync! */ diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index fa60237a353..fb585d09811 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -326,8 +326,15 @@ namespace librados std::map<std::string, bufferlist> *map, int *prval); - }; + /** + * list_watchers: Get list watchers of object + * + * @param out_watchers [out] place returned values in out_watchers on completion + * @param prval [out] place error code in prval upon completion + */ + void list_watchers(std::list<obj_watch_t> *out_watchers, int *prval); + }; /* IoCtx : This is a context in which we can perform I/O. * It includes a Pool, @@ -498,6 +505,7 @@ namespace librados librados::WatchCtx *ctx); int unwatch(const std::string& o, uint64_t handle); int notify(const std::string& o, uint64_t ver, bufferlist& bl); + int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers); void set_notify_timeout(uint32_t timeout); // assert version for next sync operations diff --git a/src/include/rados/rados_types.h b/src/include/rados/rados_types.h new file mode 100644 index 00000000000..a5579536b37 --- /dev/null +++ b/src/include/rados/rados_types.h @@ -0,0 +1,16 @@ +#ifndef CEPH_RADOS_TYPES_H +#define CEPH_RADOS_TYPES_H + +#include "include/inttypes.h" + +/** + * @struct obj_watch_t + * One item from list_watchers + */ +struct obj_watch_t { + int64_t watcher_id; + uint64_t cookie; + uint32_t timeout_seconds; +}; + +#endif diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 5a81a267f2b..0b7eaf97850 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -211,6 +211,14 @@ void librados::ObjectOperation::omap_cmp( o->omap_cmp(assertions, prval); } +void librados::ObjectReadOperation::list_watchers( + list<obj_watch_t> *out_watchers, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->list_watchers(out_watchers, prval); +} + int librados::IoCtx::omap_get_vals(const std::string& oid, const std::string& start_after, const std::string& filter_prefix, @@ -1018,6 +1026,20 @@ int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) return io_ctx_impl->notify(obj, ver, bl); } +int librados::IoCtx::list_watchers(const std::string& oid, + std::list<obj_watch_t> *out_watchers) +{ + ObjectReadOperation op; + int r; + op.list_watchers(out_watchers, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + void librados::IoCtx::set_notify_timeout(uint32_t timeout) { io_ctx_impl->set_notify_timeout(timeout); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 37f34fcb79e..cfa6bbabe8a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2203,6 +2203,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) break; } + case CEPH_OSD_OP_LIST_WATCHERS: + { + obj_list_watch_response_t resp; + + map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator oi_iter; + for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end(); + oi_iter++) { + dout(20) << "key cookie=" << oi_iter->first.first + << " entity=" << oi_iter->first.second << " " + << oi_iter->second << dendl; + assert(oi_iter->first.first == oi_iter->second.cookie); + assert(oi_iter->first.second.is_client()); + + watch_item_t wi(oi_iter->first.second, oi_iter->second.cookie, + oi_iter->second.timeout_seconds); + resp.entries.push_back(wi); + } + + resp.encode(osd_op.outdata); + result = 0; + + ctx->delta_stats.num_rd++; + break; + } + case CEPH_OSD_OP_ASSERT_SRC_VERSION: { uint64_t ver = op.watch.ver; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 008fe8e9ded..a0755825e1e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2029,4 +2029,68 @@ struct OSDOp { ostream& operator<<(ostream& out, const OSDOp& op); +struct watch_item_t { + entity_name_t name; + uint64_t cookie; + uint32_t timeout_seconds; + + watch_item_t() : cookie(0), timeout_seconds(0) { } + watch_item_t(entity_name_t name, uint64_t cookie, uint32_t timeout) + : name(name), cookie(cookie), timeout_seconds(timeout) { } + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(name, bl); + ::encode(cookie, bl); + ::encode(timeout_seconds, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(name, bl); + ::decode(cookie, bl); + ::decode(timeout_seconds, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(watch_item_t) + +/** + * obj list watch response format + * + */ +struct obj_list_watch_response_t { + list<watch_item_t> entries; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const { + f->open_array_section("entries"); + for (list<watch_item_t>::const_iterator p = entries.begin(); p != entries.end(); ++p) { + f->open_object_section("watch"); + f->dump_stream("watcher") << p->name; + f->dump_int("cookie", p->cookie); + f->dump_int("timeout", p->timeout_seconds); + f->close_section(); + } + f->close_section(); + } + static void generate_test_instances(list<obj_list_watch_response_t*>& o) { + o.push_back(new obj_list_watch_response_t); + o.push_back(new obj_list_watch_response_t); + o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 1), 10, 30)); + o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 2), 20, 60)); + } +}; + +WRITE_CLASS_ENCODER(obj_list_watch_response_t) + #endif diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 9ff02f6ab93..2b604c85684 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -24,6 +24,7 @@ #include "common/admin_socket.h" #include "common/Timer.h" +#include "include/rados/rados_types.h" #include <list> #include <map> @@ -312,6 +313,37 @@ struct ObjectOperation { } } }; + struct C_ObjectOperation_decodewatchers : public Context { + bufferlist bl; + list<obj_watch_t> *pwatchers; + int *prval; + C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr) + : pwatchers(pw), prval(pr) {} + void finish(int r) { + if (r >= 0) { + bufferlist::iterator p = bl.begin(); + try { + obj_list_watch_response_t resp; + ::decode(resp, p); + if (pwatchers) { + for (list<watch_item_t>::iterator i = resp.entries.begin() ; + i != resp.entries.end() ; ++i) { + obj_watch_t ow; + ow.watcher_id = i->name.num(); + ow.cookie = i->cookie; + ow.timeout_seconds = i->timeout_seconds; + pwatchers->push_back(ow); + } + } + *prval = 0; + } + catch (buffer::error& e) { + if (prval) + *prval = -EIO; + } + } + } + }; void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) { add_op(CEPH_OSD_OP_GETXATTRS); if (pattrs || prval) { @@ -494,6 +526,19 @@ struct ObjectOperation { add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl); } + void list_watchers(list<obj_watch_t> *out, + int *prval) { + (void)add_op(CEPH_OSD_OP_LIST_WATCHERS); + if (prval || out) { + unsigned p = ops.size() - 1; + C_ObjectOperation_decodewatchers *h = + new C_ObjectOperation_decodewatchers(out, prval); + out_handler[p] = h; + out_bl[p] = &h->bl; + out_rval[p] = prval; + } + } + void assert_version(uint64_t ver) { bufferlist bl; add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl); diff --git a/src/rados.cc b/src/rados.cc index d3de74a810b..bbd3badc082 100644 --- a/src/rados.cc +++ b/src/rados.cc @@ -15,6 +15,7 @@ #include "include/types.h" #include "include/rados/librados.hpp" +#include "include/rados/rados_types.h" #include "rados_sync.h" using namespace librados; @@ -95,6 +96,7 @@ void usage(ostream& out) " rmomapkey <obj-name> <key>\n" " getomapheader <obj-name>\n" " setomapheader <obj-name> <val>\n" +" listwatchers <obj-name> list the watchers of this object\n" "\n" "IMPORT AND EXPORT\n" " import [options] <local-directory> <rados-pool>\n" @@ -2001,6 +2003,25 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts, formatter = new JSONFormatter(pretty_format); } ret = do_lock_cmd(nargs, opts, &io_ctx, formatter); + } else if (strcmp(nargs[0], "listwatchers") == 0) { + if (!pool_name || nargs.size() < 2) + usage_exit(); + + string oid(nargs[1]); + std::list<obj_watch_t> lw; + + ret = io_ctx.list_watchers(oid, &lw); + if (ret < 0) { + cerr << "error listing watchers " << pool_name << "/" << oid << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl; + return 1; + } + else + ret = 0; + + std::list<obj_watch_t>::iterator i; + for (i = lw.begin(); i != lw.end(); i++) { + cout << "watcher=client." << i->watcher_id << " cookie=" << i->cookie << std::endl; + } } else { cerr << "unrecognized command " << nargs[0] << std::endl; usage_exit(); diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc index b2de96d1332..d6cb8a0cee3 100644 --- a/src/test/librados/watch_notify.cc +++ b/src/test/librados/watch_notify.cc @@ -1,5 +1,6 @@ #include "include/rados/librados.h" #include "include/rados/librados.hpp" +#include "include/rados/rados_types.h" #include "test/librados/test.h" #include <errno.h> @@ -61,6 +62,9 @@ TEST(LibRadosWatchNotify, WatchNotifyTestPP) { uint64_t handle; WatchNotifyTestCtx ctx; ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx)); + std::list<obj_watch_t> watches; + ASSERT_EQ(0, ioctx.list_watchers("foo", &watches)); + ASSERT_EQ(watches.size(), 1); bufferlist bl2; ASSERT_EQ(0, ioctx.notify("foo", 0, bl2)); TestAlarm alarm; |