summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-17 20:10:14 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-17 20:10:14 -0700
commit7669662b6200b8837ca695099fae395dcce86517 (patch)
tree7c1cc493f1efa9efe8fb98901b7803a21dc5b39a
parentf5f8314841a717bc087bf266d05921cb6435d90e (diff)
downloadceph-7669662b6200b8837ca695099fae395dcce86517.tar.gz
cls_statelog: introducing new objclass to handle state tracking
Somewhat similar to the log objclass, but uses different data for indexing. Also keeps a dual index. In general an entry has 3 identifiers: - object: the object id on which the operation is made - client_id: client's unique identifier - op_id: operation's unique identifier An entry is indexed by both client_id+op_id, and by object+op_id, make it possible to list operations either by client_id, or by object id. We also keep state per each entry and the new check_state request can be used as a guard. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/Makefile.am9
-rw-r--r--src/cls/statelog/cls_statelog.cc294
-rw-r--r--src/cls/statelog/cls_statelog_ops.h138
-rw-r--r--src/cls/statelog/cls_statelog_types.h51
4 files changed, 492 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 0e36f8fcc5e..8f9b850a0ea 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -571,6 +571,13 @@ libcls_log_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex
radoslib_LTLIBRARIES += libcls_log.la
+libcls_statelog_la_SOURCES = cls/statelog/cls_statelog.cc
+libcls_statelog_la_CFLAGS = ${AM_CFLAGS}
+libcls_statelog_la_CXXFLAGS= ${AM_CXXFLAGS}
+libcls_statelog_la_LIBADD = $(PTHREAD_LIBS) $(EXTRALIBS)
+libcls_statelog_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '.*__cls_.*'
+
+radoslib_LTLIBRARIES += libcls_statelog.la
if WITH_RADOSGW
# rgw: rados gateway
@@ -1645,6 +1652,8 @@ noinst_HEADERS = \
cls/log/cls_log_types.h\
cls/log/cls_log_ops.h\
cls/log/cls_log_client.h\
+ cls/log/cls_statelog_types.h\
+ cls/log/cls_statelog_ops.h\
cls/rgw/cls_rgw_client.h\
cls/rgw/cls_rgw_ops.h\
cls/rgw/cls_rgw_types.h\
diff --git a/src/cls/statelog/cls_statelog.cc b/src/cls/statelog/cls_statelog.cc
new file mode 100644
index 00000000000..c17527ceea6
--- /dev/null
+++ b/src/cls/statelog/cls_statelog.cc
@@ -0,0 +1,294 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "objclass/objclass.h"
+
+#include "cls_statelog_types.h"
+#include "cls_statelog_ops.h"
+
+#include "global/global_context.h"
+
+CLS_VER(1,0)
+CLS_NAME(statelog)
+
+cls_handle_t h_class;
+cls_method_handle_t h_statelog_add;
+cls_method_handle_t h_statelog_list;
+cls_method_handle_t h_statelog_remove;
+cls_method_handle_t h_statelog_check_state;
+
+static string statelog_index_by_client_prefix = "1_";
+static string statelog_index_by_object_prefix = "2_";
+
+
+static int write_statelog_entry(cls_method_context_t hctx, const string& index, const cls_statelog_entry& entry)
+{
+ bufferlist bl;
+ ::encode(entry, bl);
+
+ int ret = cls_cxx_map_set_val(hctx, index, &bl);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+static void get_index_by_client(const string& client_id, const string& op_id, string& index)
+{
+ string s = statelog_index_by_client_prefix + "_";
+ s.append(client_id + "_" + op_id);
+}
+
+static void get_index_by_client(cls_statelog_entry& entry, string& index)
+{
+ get_index_by_client(entry.client_id, entry.op_id, index);
+}
+
+static void get_index_by_object(const string& object, const string& op_id, string& index)
+{
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d_", (int)object.size());
+
+ index = statelog_index_by_object_prefix + "_" + buf + "_"; /* append object length to ensure uniqueness */
+ index.append(object + "_" + op_id);
+}
+
+static void get_index_by_object(cls_statelog_entry& entry, string& index)
+{
+ get_index_by_object(entry.object, entry.op_id, index);
+}
+
+static int cls_statelog_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_statelog_add_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_statelog_add_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ for (list<cls_statelog_entry>::iterator iter = op.entries.begin();
+ iter != op.entries.end(); ++iter) {
+ cls_statelog_entry& entry = *iter;
+
+ string index_by_client;
+
+ get_index_by_client(entry, index_by_client);
+
+ CLS_LOG(0, "storing entry by client/op at %s", index_by_client.c_str());
+
+ int ret = write_statelog_entry(hctx, index_by_client, entry);
+ if (ret < 0)
+ return ret;
+
+ string index_by_obj;
+
+ get_index_by_object(entry, index_by_obj);
+
+ CLS_LOG(0, "storing entry by object at %s", index_by_obj.c_str());
+ ret = write_statelog_entry(hctx, index_by_obj, entry);
+ if (ret < 0)
+ return ret;
+
+ }
+
+ return 0;
+}
+
+static int cls_statelog_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_statelog_list_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_statelog_list_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ map<string, bufferlist> keys;
+
+ string from_index;
+ string match_prefix;
+
+ if (op.object.empty()) {
+ get_index_by_client(op.client_id, op.op_id, match_prefix);
+ } else {
+ get_index_by_object(op.object, op.op_id, match_prefix);
+ }
+
+ if (op.marker.empty()) {
+ from_index = match_prefix;
+ } else {
+ from_index = op.marker;
+ }
+
+#define MAX_ENTRIES 1000
+ size_t max_entries = op.max_entries;
+ if (!max_entries || max_entries > MAX_ENTRIES)
+ max_entries = MAX_ENTRIES;
+
+ int rc = cls_cxx_map_get_vals(hctx, from_index, match_prefix, max_entries + 1, &keys);
+ if (rc < 0)
+ return rc;
+
+ cls_statelog_list_ret ret;
+
+ list<cls_statelog_entry>& entries = ret.entries;
+ map<string, bufferlist>::iterator iter = keys.begin();
+
+ bool done = false;
+ string marker;
+
+ size_t i;
+ for (i = 0; i < max_entries && iter != keys.end(); ++i, ++iter) {
+ const string& index = iter->first;
+ marker = index;
+
+ bufferlist& bl = iter->second;
+ bufferlist::iterator biter = bl.begin();
+ try {
+ cls_statelog_entry e;
+ ::decode(e, biter);
+ entries.push_back(e);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: cls_statelog_list: could not decode entry, index=%s", index.c_str());
+ }
+ }
+
+ if (iter == keys.end())
+ done = true;
+ else
+ ret.marker = marker;
+
+ ret.truncated = !done;
+
+ ::encode(ret, *out);
+
+ return 0;
+}
+
+static int cls_statelog_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_statelog_remove_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_statelog_remove_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ if (op.object.empty() || op.op_id.empty()) {
+ CLS_LOG(0, "object name or op id not specified");
+ return -EINVAL;
+ }
+
+ string obj_index;
+ get_index_by_object(op.object, op.op_id, obj_index);
+
+ bufferlist bl;
+ int rc = cls_cxx_map_get_val(hctx, obj_index, &bl);
+ if (rc < 0) {
+ CLS_LOG(0, "could not find entry %s", obj_index.c_str());
+ return rc;
+ }
+
+ cls_statelog_entry entry;
+
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(entry, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: failed to decode entry %s", obj_index.c_str());
+ return -EIO;
+ }
+
+ string client_index;
+ get_index_by_client(entry.client_id, entry.op_id, client_index);
+
+ rc = cls_cxx_map_remove_key(hctx, obj_index);
+ if (rc < 0) {
+ CLS_LOG(0, "ERROR: failed to remove key");
+ return rc;
+ }
+ rc = cls_cxx_map_remove_key(hctx, client_index);
+ if (rc < 0) {
+ CLS_LOG(0, "ERROR: failed to remove key");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int cls_statelog_check_state(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_statelog_check_state_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_statelog_check_state_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ if (op.object.empty() || op.op_id.empty()) {
+ CLS_LOG(0, "object name or op id not specified");
+ return -EINVAL;
+ }
+
+ string obj_index;
+ get_index_by_object(op.object, op.op_id, obj_index);
+
+ bufferlist bl;
+ int rc = cls_cxx_map_get_val(hctx, obj_index, &bl);
+ if (rc < 0) {
+ CLS_LOG(0, "could not find entry %s", obj_index.c_str());
+ return rc;
+ }
+
+ cls_statelog_entry entry;
+
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(entry, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: failed to decode entry %s", obj_index.c_str());
+ return -EIO;
+ }
+
+ if (entry.state != op.state)
+ return -ECANCELED;
+
+ return 0;
+}
+
+void __cls_init()
+{
+ CLS_LOG(1, "Loaded log class!");
+
+ cls_register("log", &h_class);
+
+ /* log */
+ cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_add, &h_statelog_add);
+ cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, cls_statelog_list, &h_statelog_list);
+ cls_register_cxx_method(h_class, "remove", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_remove, &h_statelog_remove);
+ cls_register_cxx_method(h_class, "check_state", CLS_METHOD_RD, cls_statelog_check_state, &h_statelog_check_state);
+
+ return;
+}
+
diff --git a/src/cls/statelog/cls_statelog_ops.h b/src/cls/statelog/cls_statelog_ops.h
new file mode 100644
index 00000000000..aa5f7885bb6
--- /dev/null
+++ b/src/cls/statelog/cls_statelog_ops.h
@@ -0,0 +1,138 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_STATELOG_OPS_H
+#define CEPH_CLS_STATELOG_OPS_H
+
+#include "include/types.h"
+#include "cls_statelog_types.h"
+
+struct cls_statelog_add_op {
+ list<cls_statelog_entry> entries;
+
+ cls_statelog_add_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_statelog_add_op)
+
+struct cls_statelog_list_op {
+ string object;
+ string client_id;
+ string op_id;
+ string marker; /* if not empty, overrides from_time */
+ int max_entries; /* upperbound to returned num of entries
+ might return less than that and still be truncated */
+
+ cls_statelog_list_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(object, bl);
+ ::encode(client_id, bl);
+ ::encode(op_id, bl);
+ ::encode(marker, bl);
+ ::encode(max_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(object, bl);
+ ::decode(client_id, bl);
+ ::decode(op_id, bl);
+ ::decode(marker, bl);
+ ::decode(max_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_statelog_list_op)
+
+struct cls_statelog_list_ret {
+ list<cls_statelog_entry> entries;
+ string marker;
+ bool truncated;
+
+ cls_statelog_list_ret() : truncated(false) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entries, bl);
+ ::encode(marker, bl);
+ ::encode(truncated, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entries, bl);
+ ::decode(marker, bl);
+ ::decode(truncated, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_statelog_list_ret)
+
+
+/*
+ * operation will return 0 when successfully removed but not done. Will return
+ * -ENODATA when done, so caller needs to repeat sending request until that.
+ */
+struct cls_statelog_remove_op {
+ string object;
+ string op_id;
+
+ cls_statelog_remove_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(object, bl);
+ ::encode(op_id, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(object, bl);
+ ::decode(op_id, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_statelog_remove_op)
+
+struct cls_statelog_check_state_op {
+ string object;
+ string op_id;
+ uint32_t state;
+
+ cls_statelog_check_state_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(object, bl);
+ ::encode(op_id, bl);
+ ::encode(state, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(object, bl);
+ ::decode(op_id, bl);
+ ::decode(state, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_statelog_check_state_op)
+
+#endif
diff --git a/src/cls/statelog/cls_statelog_types.h b/src/cls/statelog/cls_statelog_types.h
new file mode 100644
index 00000000000..f812c7b321c
--- /dev/null
+++ b/src/cls/statelog/cls_statelog_types.h
@@ -0,0 +1,51 @@
+#ifndef CEPH_CLS_STATELOG_TYPES_H
+#define CEPH_CLS_STATELOG_TYPES_H
+
+#include "include/encoding.h"
+#include "include/types.h"
+
+#include "include/utime.h"
+
+class JSONObj;
+
+struct cls_statelog_entry {
+ string client_id;
+ string op_id;
+ string object;
+ utime_t timestamp;
+ bufferlist data;
+ uint32_t state; /* user defined state */
+
+ cls_statelog_entry() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(client_id, bl);
+ ::encode(op_id, bl);
+ ::encode(object, bl);
+ ::encode(timestamp, bl);
+ ::encode(data, bl);
+ ::encode(state, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(client_id, bl);
+ ::decode(op_id, bl);
+ ::decode(object, bl);
+ ::decode(timestamp, bl);
+ ::decode(data, bl);
+ ::decode(state, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(cls_statelog_entry)
+
+
+#endif
+
+