summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Zafman <david.zafman@inktank.com>2013-02-22 12:49:47 -0800
committerDavid Zafman <david.zafman@inktank.com>2013-02-22 12:50:19 -0800
commitd612a9abacdde1844725262fbbeba8578b767333 (patch)
treea6b9c5bfdfca4767fb452d79dd7172319f0cfbce
parentdc181224abf6fb8fc583730ae3d90acdf0b80f39 (diff)
parent5648117626cb60452aea8bdf51f88bd2180d8e62 (diff)
downloadceph-d612a9abacdde1844725262fbbeba8578b767333.tar.gz
Merge branch 'wip-3403-4-rebase'
Feature: #3403 Signed-off-by: David Zafman <david.zafman@inktank.com> Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/Makefile.am2
-rw-r--r--src/include/ceph_strings.cc1
-rw-r--r--src/include/rados.h2
-rw-r--r--src/include/rados/librados.h1
-rw-r--r--src/include/rados/librados.hpp10
-rw-r--r--src/include/rados/rados_types.h16
-rw-r--r--src/librados/librados.cc22
-rw-r--r--src/osd/ReplicatedPG.cc25
-rw-r--r--src/osd/osd_types.h64
-rw-r--r--src/osdc/Objecter.h45
-rw-r--r--src/rados.cc21
-rw-r--r--src/test/librados/watch_notify.cc4
12 files changed, 212 insertions, 1 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 60c3f2e5203..9c453df1f53 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1182,6 +1182,7 @@ librbd_include_DATA = \
rados_includedir = $(includedir)/rados
rados_include_DATA = \
$(srcdir)/include/rados/librados.h \
+ $(srcdir)/include/rados/rados_types.h \
$(srcdir)/include/rados/librados.hpp \
$(srcdir)/include/buffer.h \
$(srcdir)/include/page.h \
@@ -1645,6 +1646,7 @@ noinst_HEADERS = \
include/uuid.h\
include/xlist.h\
include/rados/librados.h\
+ include/rados/rados_types.h\
include/rados/librados.hpp\
include/rados/librgw.h\
include/rados/page.h\
diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc
index 026ca4e2f99..b3c095cb223 100644
--- a/src/include/ceph_strings.cc
+++ b/src/include/ceph_strings.cc
@@ -25,6 +25,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_NOTIFY: return "notify";
case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
+ case CEPH_OSD_OP_LIST_WATCHERS: return "list-watchers";
case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
diff --git a/src/include/rados.h b/src/include/rados.h
index 4f7d7174c47..093a04baf86 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -177,6 +177,8 @@ enum {
/* versioning */
CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
+ CEPH_OSD_OP_LIST_WATCHERS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 9,
+
/* write */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h
index d40d9b588c2..fec4bd6778c 100644
--- a/src/include/rados/librados.h
+++ b/src/include/rados/librados.h
@@ -13,6 +13,7 @@ extern "C" {
#include "include/inttypes.h"
#endif
#include <string.h>
+#include "include/rados/rados_types.h"
#ifndef CEPH_OSD_TMAP_SET
/* These are also defined in rados.h and objclass.h. Keep them in sync! */
diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp
index fa60237a353..fb585d09811 100644
--- a/src/include/rados/librados.hpp
+++ b/src/include/rados/librados.hpp
@@ -326,8 +326,15 @@ namespace librados
std::map<std::string, bufferlist> *map,
int *prval);
- };
+ /**
+ * list_watchers: Get list watchers of object
+ *
+ * @param out_watchers [out] place returned values in out_watchers on completion
+ * @param prval [out] place error code in prval upon completion
+ */
+ void list_watchers(std::list<obj_watch_t> *out_watchers, int *prval);
+ };
/* IoCtx : This is a context in which we can perform I/O.
* It includes a Pool,
@@ -498,6 +505,7 @@ namespace librados
librados::WatchCtx *ctx);
int unwatch(const std::string& o, uint64_t handle);
int notify(const std::string& o, uint64_t ver, bufferlist& bl);
+ int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
void set_notify_timeout(uint32_t timeout);
// assert version for next sync operations
diff --git a/src/include/rados/rados_types.h b/src/include/rados/rados_types.h
new file mode 100644
index 00000000000..a5579536b37
--- /dev/null
+++ b/src/include/rados/rados_types.h
@@ -0,0 +1,16 @@
+#ifndef CEPH_RADOS_TYPES_H
+#define CEPH_RADOS_TYPES_H
+
+#include "include/inttypes.h"
+
+/**
+ * @struct obj_watch_t
+ * One item from list_watchers
+ */
+struct obj_watch_t {
+ int64_t watcher_id;
+ uint64_t cookie;
+ uint32_t timeout_seconds;
+};
+
+#endif
diff --git a/src/librados/librados.cc b/src/librados/librados.cc
index 5a81a267f2b..0b7eaf97850 100644
--- a/src/librados/librados.cc
+++ b/src/librados/librados.cc
@@ -211,6 +211,14 @@ void librados::ObjectOperation::omap_cmp(
o->omap_cmp(assertions, prval);
}
+void librados::ObjectReadOperation::list_watchers(
+ list<obj_watch_t> *out_watchers,
+ int *prval)
+{
+ ::ObjectOperation *o = (::ObjectOperation *)impl;
+ o->list_watchers(out_watchers, prval);
+}
+
int librados::IoCtx::omap_get_vals(const std::string& oid,
const std::string& start_after,
const std::string& filter_prefix,
@@ -1018,6 +1026,20 @@ int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
return io_ctx_impl->notify(obj, ver, bl);
}
+int librados::IoCtx::list_watchers(const std::string& oid,
+ std::list<obj_watch_t> *out_watchers)
+{
+ ObjectReadOperation op;
+ int r;
+ op.list_watchers(out_watchers, &r);
+ bufferlist bl;
+ int ret = operate(oid, &op, &bl);
+ if (ret < 0)
+ return ret;
+
+ return r;
+}
+
void librados::IoCtx::set_notify_timeout(uint32_t timeout)
{
io_ctx_impl->set_notify_timeout(timeout);
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 37f34fcb79e..cfa6bbabe8a 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -2203,6 +2203,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
break;
}
+ case CEPH_OSD_OP_LIST_WATCHERS:
+ {
+ obj_list_watch_response_t resp;
+
+ map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator oi_iter;
+ for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end();
+ oi_iter++) {
+ dout(20) << "key cookie=" << oi_iter->first.first
+ << " entity=" << oi_iter->first.second << " "
+ << oi_iter->second << dendl;
+ assert(oi_iter->first.first == oi_iter->second.cookie);
+ assert(oi_iter->first.second.is_client());
+
+ watch_item_t wi(oi_iter->first.second, oi_iter->second.cookie,
+ oi_iter->second.timeout_seconds);
+ resp.entries.push_back(wi);
+ }
+
+ resp.encode(osd_op.outdata);
+ result = 0;
+
+ ctx->delta_stats.num_rd++;
+ break;
+ }
+
case CEPH_OSD_OP_ASSERT_SRC_VERSION:
{
uint64_t ver = op.watch.ver;
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 008fe8e9ded..a0755825e1e 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -2029,4 +2029,68 @@ struct OSDOp {
ostream& operator<<(ostream& out, const OSDOp& op);
+struct watch_item_t {
+ entity_name_t name;
+ uint64_t cookie;
+ uint32_t timeout_seconds;
+
+ watch_item_t() : cookie(0), timeout_seconds(0) { }
+ watch_item_t(entity_name_t name, uint64_t cookie, uint32_t timeout)
+ : name(name), cookie(cookie), timeout_seconds(timeout) { }
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(name, bl);
+ ::encode(cookie, bl);
+ ::encode(timeout_seconds, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ DECODE_START(1, bl);
+ ::decode(name, bl);
+ ::decode(cookie, bl);
+ ::decode(timeout_seconds, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(watch_item_t)
+
+/**
+ * obj list watch response format
+ *
+ */
+struct obj_list_watch_response_t {
+ list<watch_item_t> entries;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const {
+ f->open_array_section("entries");
+ for (list<watch_item_t>::const_iterator p = entries.begin(); p != entries.end(); ++p) {
+ f->open_object_section("watch");
+ f->dump_stream("watcher") << p->name;
+ f->dump_int("cookie", p->cookie);
+ f->dump_int("timeout", p->timeout_seconds);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ static void generate_test_instances(list<obj_list_watch_response_t*>& o) {
+ o.push_back(new obj_list_watch_response_t);
+ o.push_back(new obj_list_watch_response_t);
+ o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 1), 10, 30));
+ o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 2), 20, 60));
+ }
+};
+
+WRITE_CLASS_ENCODER(obj_list_watch_response_t)
+
#endif
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 9ff02f6ab93..2b604c85684 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -24,6 +24,7 @@
#include "common/admin_socket.h"
#include "common/Timer.h"
+#include "include/rados/rados_types.h"
#include <list>
#include <map>
@@ -312,6 +313,37 @@ struct ObjectOperation {
}
}
};
+ struct C_ObjectOperation_decodewatchers : public Context {
+ bufferlist bl;
+ list<obj_watch_t> *pwatchers;
+ int *prval;
+ C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
+ : pwatchers(pw), prval(pr) {}
+ void finish(int r) {
+ if (r >= 0) {
+ bufferlist::iterator p = bl.begin();
+ try {
+ obj_list_watch_response_t resp;
+ ::decode(resp, p);
+ if (pwatchers) {
+ for (list<watch_item_t>::iterator i = resp.entries.begin() ;
+ i != resp.entries.end() ; ++i) {
+ obj_watch_t ow;
+ ow.watcher_id = i->name.num();
+ ow.cookie = i->cookie;
+ ow.timeout_seconds = i->timeout_seconds;
+ pwatchers->push_back(ow);
+ }
+ }
+ *prval = 0;
+ }
+ catch (buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ }
+ }
+ }
+ };
void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
add_op(CEPH_OSD_OP_GETXATTRS);
if (pattrs || prval) {
@@ -494,6 +526,19 @@ struct ObjectOperation {
add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
}
+ void list_watchers(list<obj_watch_t> *out,
+ int *prval) {
+ (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
+ if (prval || out) {
+ unsigned p = ops.size() - 1;
+ C_ObjectOperation_decodewatchers *h =
+ new C_ObjectOperation_decodewatchers(out, prval);
+ out_handler[p] = h;
+ out_bl[p] = &h->bl;
+ out_rval[p] = prval;
+ }
+ }
+
void assert_version(uint64_t ver) {
bufferlist bl;
add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);
diff --git a/src/rados.cc b/src/rados.cc
index d3de74a810b..bbd3badc082 100644
--- a/src/rados.cc
+++ b/src/rados.cc
@@ -15,6 +15,7 @@
#include "include/types.h"
#include "include/rados/librados.hpp"
+#include "include/rados/rados_types.h"
#include "rados_sync.h"
using namespace librados;
@@ -95,6 +96,7 @@ void usage(ostream& out)
" rmomapkey <obj-name> <key>\n"
" getomapheader <obj-name>\n"
" setomapheader <obj-name> <val>\n"
+" listwatchers <obj-name> list the watchers of this object\n"
"\n"
"IMPORT AND EXPORT\n"
" import [options] <local-directory> <rados-pool>\n"
@@ -2001,6 +2003,25 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
formatter = new JSONFormatter(pretty_format);
}
ret = do_lock_cmd(nargs, opts, &io_ctx, formatter);
+ } else if (strcmp(nargs[0], "listwatchers") == 0) {
+ if (!pool_name || nargs.size() < 2)
+ usage_exit();
+
+ string oid(nargs[1]);
+ std::list<obj_watch_t> lw;
+
+ ret = io_ctx.list_watchers(oid, &lw);
+ if (ret < 0) {
+ cerr << "error listing watchers " << pool_name << "/" << oid << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
+ return 1;
+ }
+ else
+ ret = 0;
+
+ std::list<obj_watch_t>::iterator i;
+ for (i = lw.begin(); i != lw.end(); i++) {
+ cout << "watcher=client." << i->watcher_id << " cookie=" << i->cookie << std::endl;
+ }
} else {
cerr << "unrecognized command " << nargs[0] << std::endl;
usage_exit();
diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc
index b2de96d1332..d6cb8a0cee3 100644
--- a/src/test/librados/watch_notify.cc
+++ b/src/test/librados/watch_notify.cc
@@ -1,5 +1,6 @@
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
+#include "include/rados/rados_types.h"
#include "test/librados/test.h"
#include <errno.h>
@@ -61,6 +62,9 @@ TEST(LibRadosWatchNotify, WatchNotifyTestPP) {
uint64_t handle;
WatchNotifyTestCtx ctx;
ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
+ std::list<obj_watch_t> watches;
+ ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
+ ASSERT_EQ(watches.size(), 1);
bufferlist bl2;
ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
TestAlarm alarm;