diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-17 20:10:14 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-17 20:10:14 -0700 |
commit | 7669662b6200b8837ca695099fae395dcce86517 (patch) | |
tree | 7c1cc493f1efa9efe8fb98901b7803a21dc5b39a | |
parent | f5f8314841a717bc087bf266d05921cb6435d90e (diff) | |
download | ceph-7669662b6200b8837ca695099fae395dcce86517.tar.gz |
cls_statelog: introducing new objclass to handle state tracking
Somewhat similar to the log objclass, but uses different data for
indexing. Also keeps a dual index. In general an entry has 3
identifiers:
- object: the object id on which the operation is made
- client_id: client's unique identifier
- op_id: operation's unique identifier
An entry is indexed by both client_id+op_id, and by
object+op_id, make it possible to list operations
either by client_id, or by object id.
We also keep state per each entry and the new check_state request
can be used as a guard.
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/Makefile.am | 9 | ||||
-rw-r--r-- | src/cls/statelog/cls_statelog.cc | 294 | ||||
-rw-r--r-- | src/cls/statelog/cls_statelog_ops.h | 138 | ||||
-rw-r--r-- | src/cls/statelog/cls_statelog_types.h | 51 |
4 files changed, 492 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 0e36f8fcc5e..8f9b850a0ea 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -571,6 +571,13 @@ libcls_log_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex radoslib_LTLIBRARIES += libcls_log.la +libcls_statelog_la_SOURCES = cls/statelog/cls_statelog.cc +libcls_statelog_la_CFLAGS = ${AM_CFLAGS} +libcls_statelog_la_CXXFLAGS= ${AM_CXXFLAGS} +libcls_statelog_la_LIBADD = $(PTHREAD_LIBS) $(EXTRALIBS) +libcls_statelog_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '.*__cls_.*' + +radoslib_LTLIBRARIES += libcls_statelog.la if WITH_RADOSGW # rgw: rados gateway @@ -1645,6 +1652,8 @@ noinst_HEADERS = \ cls/log/cls_log_types.h\ cls/log/cls_log_ops.h\ cls/log/cls_log_client.h\ + cls/log/cls_statelog_types.h\ + cls/log/cls_statelog_ops.h\ cls/rgw/cls_rgw_client.h\ cls/rgw/cls_rgw_ops.h\ cls/rgw/cls_rgw_types.h\ diff --git a/src/cls/statelog/cls_statelog.cc b/src/cls/statelog/cls_statelog.cc new file mode 100644 index 00000000000..c17527ceea6 --- /dev/null +++ b/src/cls/statelog/cls_statelog.cc @@ -0,0 +1,294 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <iostream> + +#include <string.h> +#include <stdlib.h> +#include <errno.h> + +#include "include/types.h" +#include "include/utime.h" +#include "objclass/objclass.h" + +#include "cls_statelog_types.h" +#include "cls_statelog_ops.h" + +#include "global/global_context.h" + +CLS_VER(1,0) +CLS_NAME(statelog) + +cls_handle_t h_class; +cls_method_handle_t h_statelog_add; +cls_method_handle_t h_statelog_list; +cls_method_handle_t h_statelog_remove; +cls_method_handle_t h_statelog_check_state; + +static string statelog_index_by_client_prefix = "1_"; +static string statelog_index_by_object_prefix = "2_"; + + +static int write_statelog_entry(cls_method_context_t hctx, const string& index, const cls_statelog_entry& entry) +{ + bufferlist bl; + ::encode(entry, bl); + + int ret = cls_cxx_map_set_val(hctx, index, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index_by_client(const string& client_id, const string& op_id, string& index) +{ + string s = statelog_index_by_client_prefix + "_"; + s.append(client_id + "_" + op_id); +} + +static void get_index_by_client(cls_statelog_entry& entry, string& index) +{ + get_index_by_client(entry.client_id, entry.op_id, index); +} + +static void get_index_by_object(const string& object, const string& op_id, string& index) +{ + char buf[16]; + snprintf(buf, sizeof(buf), "%d_", (int)object.size()); + + index = statelog_index_by_object_prefix + "_" + buf + "_"; /* append object length to ensure uniqueness */ + index.append(object + "_" + op_id); +} + +static void get_index_by_object(cls_statelog_entry& entry, string& index) +{ + get_index_by_object(entry.object, entry.op_id, index); +} + +static int cls_statelog_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_add_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_add_op(): failed to decode op"); + return -EINVAL; + } + + for (list<cls_statelog_entry>::iterator iter = op.entries.begin(); + iter != op.entries.end(); ++iter) { + cls_statelog_entry& entry = *iter; + + string index_by_client; + + get_index_by_client(entry, index_by_client); + + CLS_LOG(0, "storing entry by client/op at %s", index_by_client.c_str()); + + int ret = write_statelog_entry(hctx, index_by_client, entry); + if (ret < 0) + return ret; + + string index_by_obj; + + get_index_by_object(entry, index_by_obj); + + CLS_LOG(0, "storing entry by object at %s", index_by_obj.c_str()); + ret = write_statelog_entry(hctx, index_by_obj, entry); + if (ret < 0) + return ret; + + } + + return 0; +} + +static int cls_statelog_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_list_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_list_op(): failed to decode op"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string match_prefix; + + if (op.object.empty()) { + get_index_by_client(op.client_id, op.op_id, match_prefix); + } else { + get_index_by_object(op.object, op.op_id, match_prefix); + } + + if (op.marker.empty()) { + from_index = match_prefix; + } else { + from_index = op.marker; + } + +#define MAX_ENTRIES 1000 + size_t max_entries = op.max_entries; + if (!max_entries || max_entries > MAX_ENTRIES) + max_entries = MAX_ENTRIES; + + int rc = cls_cxx_map_get_vals(hctx, from_index, match_prefix, max_entries + 1, &keys); + if (rc < 0) + return rc; + + cls_statelog_list_ret ret; + + list<cls_statelog_entry>& entries = ret.entries; + map<string, bufferlist>::iterator iter = keys.begin(); + + bool done = false; + string marker; + + size_t i; + for (i = 0; i < max_entries && iter != keys.end(); ++i, ++iter) { + const string& index = iter->first; + marker = index; + + bufferlist& bl = iter->second; + bufferlist::iterator biter = bl.begin(); + try { + cls_statelog_entry e; + ::decode(e, biter); + entries.push_back(e); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_statelog_list: could not decode entry, index=%s", index.c_str()); + } + } + + if (iter == keys.end()) + done = true; + else + ret.marker = marker; + + ret.truncated = !done; + + ::encode(ret, *out); + + return 0; +} + +static int cls_statelog_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_remove_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_remove_op(): failed to decode op"); + return -EINVAL; + } + + if (op.object.empty() || op.op_id.empty()) { + CLS_LOG(0, "object name or op id not specified"); + return -EINVAL; + } + + string obj_index; + get_index_by_object(op.object, op.op_id, obj_index); + + bufferlist bl; + int rc = cls_cxx_map_get_val(hctx, obj_index, &bl); + if (rc < 0) { + CLS_LOG(0, "could not find entry %s", obj_index.c_str()); + return rc; + } + + cls_statelog_entry entry; + + try { + bufferlist::iterator iter = bl.begin(); + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: failed to decode entry %s", obj_index.c_str()); + return -EIO; + } + + string client_index; + get_index_by_client(entry.client_id, entry.op_id, client_index); + + rc = cls_cxx_map_remove_key(hctx, obj_index); + if (rc < 0) { + CLS_LOG(0, "ERROR: failed to remove key"); + return rc; + } + rc = cls_cxx_map_remove_key(hctx, client_index); + if (rc < 0) { + CLS_LOG(0, "ERROR: failed to remove key"); + return rc; + } + + return 0; +} + +static int cls_statelog_check_state(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_check_state_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_check_state_op(): failed to decode op"); + return -EINVAL; + } + + if (op.object.empty() || op.op_id.empty()) { + CLS_LOG(0, "object name or op id not specified"); + return -EINVAL; + } + + string obj_index; + get_index_by_object(op.object, op.op_id, obj_index); + + bufferlist bl; + int rc = cls_cxx_map_get_val(hctx, obj_index, &bl); + if (rc < 0) { + CLS_LOG(0, "could not find entry %s", obj_index.c_str()); + return rc; + } + + cls_statelog_entry entry; + + try { + bufferlist::iterator iter = bl.begin(); + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: failed to decode entry %s", obj_index.c_str()); + return -EIO; + } + + if (entry.state != op.state) + return -ECANCELED; + + return 0; +} + +void __cls_init() +{ + CLS_LOG(1, "Loaded log class!"); + + cls_register("log", &h_class); + + /* log */ + cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_add, &h_statelog_add); + cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, cls_statelog_list, &h_statelog_list); + cls_register_cxx_method(h_class, "remove", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_remove, &h_statelog_remove); + cls_register_cxx_method(h_class, "check_state", CLS_METHOD_RD, cls_statelog_check_state, &h_statelog_check_state); + + return; +} + diff --git a/src/cls/statelog/cls_statelog_ops.h b/src/cls/statelog/cls_statelog_ops.h new file mode 100644 index 00000000000..aa5f7885bb6 --- /dev/null +++ b/src/cls/statelog/cls_statelog_ops.h @@ -0,0 +1,138 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_STATELOG_OPS_H +#define CEPH_CLS_STATELOG_OPS_H + +#include "include/types.h" +#include "cls_statelog_types.h" + +struct cls_statelog_add_op { + list<cls_statelog_entry> entries; + + cls_statelog_add_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_add_op) + +struct cls_statelog_list_op { + string object; + string client_id; + string op_id; + string marker; /* if not empty, overrides from_time */ + int max_entries; /* upperbound to returned num of entries + might return less than that and still be truncated */ + + cls_statelog_list_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(object, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(marker, bl); + ::encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(object, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(marker, bl); + ::decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_list_op) + +struct cls_statelog_list_ret { + list<cls_statelog_entry> entries; + string marker; + bool truncated; + + cls_statelog_list_ret() : truncated(false) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ::encode(marker, bl); + ::encode(truncated, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + ::decode(marker, bl); + ::decode(truncated, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_list_ret) + + +/* + * operation will return 0 when successfully removed but not done. Will return + * -ENODATA when done, so caller needs to repeat sending request until that. + */ +struct cls_statelog_remove_op { + string object; + string op_id; + + cls_statelog_remove_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(object, bl); + ::encode(op_id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(object, bl); + ::decode(op_id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_remove_op) + +struct cls_statelog_check_state_op { + string object; + string op_id; + uint32_t state; + + cls_statelog_check_state_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(object, bl); + ::encode(op_id, bl); + ::encode(state, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(object, bl); + ::decode(op_id, bl); + ::decode(state, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_check_state_op) + +#endif diff --git a/src/cls/statelog/cls_statelog_types.h b/src/cls/statelog/cls_statelog_types.h new file mode 100644 index 00000000000..f812c7b321c --- /dev/null +++ b/src/cls/statelog/cls_statelog_types.h @@ -0,0 +1,51 @@ +#ifndef CEPH_CLS_STATELOG_TYPES_H +#define CEPH_CLS_STATELOG_TYPES_H + +#include "include/encoding.h" +#include "include/types.h" + +#include "include/utime.h" + +class JSONObj; + +struct cls_statelog_entry { + string client_id; + string op_id; + string object; + utime_t timestamp; + bufferlist data; + uint32_t state; /* user defined state */ + + cls_statelog_entry() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(object, bl); + ::encode(timestamp, bl); + ::encode(data, bl); + ::encode(state, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(object, bl); + ::decode(timestamp, bl); + ::decode(data, bl); + ::decode(state, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(cls_statelog_entry) + + +#endif + + |