summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cls/rgw/cls_rgw.cc131
-rw-r--r--src/cls/rgw/cls_rgw_client.cc8
-rw-r--r--src/cls/rgw/cls_rgw_client.h6
-rw-r--r--src/cls/rgw/cls_rgw_ops.cc7
-rw-r--r--src/cls/rgw/cls_rgw_ops.h35
-rw-r--r--src/cls/rgw/cls_rgw_types.cc12
-rw-r--r--src/cls/rgw/cls_rgw_types.h136
-rw-r--r--src/rgw/rgw_rados.cc51
-rw-r--r--src/rgw/rgw_rados.h14
-rw-r--r--src/test/cls_rgw/test_cls_rgw.cc9
10 files changed, 334 insertions, 75 deletions
diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc
index 15498ef0aa6..0b3646224ba 100644
--- a/src/cls/rgw/cls_rgw.cc
+++ b/src/cls/rgw/cls_rgw.cc
@@ -38,11 +38,80 @@ cls_method_handle_t h_rgw_gc_remove;
#define ROUND_BLOCK_SIZE 4096
+
+#define BI_PREFIX_CHAR 0x80
+
+#define BI_BUCKET_OBJS_INDEX 0
+#define BI_BUCKET_LOG_INDEX 1
+
+static string bucket_index_prefixes[] = { "", /* special handling for the objs index */
+ "0_" };
+
static uint64_t get_rounded_size(uint64_t size)
{
return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1);
}
+static bool bi_is_objs_index(const string& s) {
+ return ((unsigned char)s[0] != BI_PREFIX_CHAR);
+}
+
+static int bi_entry_type(const string& s)
+{
+ if (bi_is_objs_index(s)) {
+ return BI_BUCKET_OBJS_INDEX;
+ }
+
+ for (size_t i = 1;
+ i < sizeof(bucket_index_prefixes) / sizeof(bucket_index_prefixes[0]);
+ ++i) {
+ const string& t = bucket_index_prefixes[i];
+
+ if (s.compare(0, t.size(), t) == 0) {
+ return i;
+ }
+ }
+
+ return -EINVAL;
+}
+
+static void get_time_key(utime_t& ut, string *key)
+{
+ char buf[32];
+ snprintf(buf, 32, "%011llu.%09u", (unsigned long long)ut.sec(), ut.nsec());
+ *key = buf;
+}
+
+static void bi_log_index_key(string& key, utime_t& t, string& obj)
+{
+ key = BI_PREFIX_CHAR;
+ key.append(bucket_index_prefixes[BI_BUCKET_LOG_INDEX]);
+
+ string tk;
+ get_time_key(t, &tk);
+ key.append(tk);
+}
+
+static int log_index_operation(cls_method_context_t hctx, string& obj, RGWModifyOp op, rgw_bucket_entry_ver& ver, RGWPendingState state)
+{
+ bufferlist bl;
+
+ struct rgw_bi_log_entry entry;
+
+ entry.object = obj;
+ entry.timestamp = ceph_clock_now(g_ceph_context);
+ entry.op = op;
+ entry.ver = ver;
+ entry.state = state;
+ ::encode(entry, bl);
+
+ string key;
+
+ bi_log_index_key(key, entry.timestamp, obj);
+
+ return cls_cxx_map_set_val(hctx, key, &bl);
+}
+
int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
bufferlist::iterator iter = in->begin();
@@ -80,8 +149,16 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
std::map<string, bufferlist>::iterator kiter = keys.begin();
uint32_t i;
+ bool done = false;
+
for (i = 0; i < op.num_entries && kiter != keys.end(); ++i, ++kiter) {
struct rgw_bucket_dir_entry entry;
+
+ if (!bi_is_objs_index(kiter->first)) {
+ done = true;
+ break;
+ }
+
bufferlist& entrybl = kiter->second;
bufferlist::iterator eiter = entrybl.begin();
try {
@@ -94,7 +171,7 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
m[kiter->first] = entry;
}
- ret.is_truncated = (kiter != keys.end());
+ ret.is_truncated = (kiter != keys.end() && !done);
::encode(ret, *out);
return 0;
@@ -123,6 +200,8 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *
string filter_prefix;
#define CHECK_CHUNK_SIZE 1000
+ bool done = false;
+
do {
rc = cls_cxx_map_get_vals(hctx, start_obj, filter_prefix, CHECK_CHUNK_SIZE, &keys);
if (rc < 0)
@@ -130,6 +209,11 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *
std::map<string, bufferlist>::iterator kiter = keys.begin();
for (; kiter != keys.end(); ++kiter) {
+ if (!bi_is_objs_index(kiter->first)) {
+ done = true;
+ break;
+ }
+
struct rgw_bucket_dir_entry entry;
bufferlist::iterator eiter = kiter->second.begin();
try {
@@ -145,7 +229,7 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *
start_obj = kiter->first;
}
- } while (keys.size() == CHECK_CHUNK_SIZE);
+ } while (keys.size() == CHECK_CHUNK_SIZE && !done);
return 0;
}
@@ -284,7 +368,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
if (noent) { // no entry, initialize fields
entry.name = op.name;
- entry.epoch = 0;
+ entry.ver = rgw_bucket_entry_ver();
entry.exists = false;
entry.locator = op.locator;
}
@@ -295,6 +379,10 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
info.state = CLS_RGW_STATE_PENDING_MODIFY;
info.op = op.op;
+ rc = log_index_operation(hctx, op.name, op.op, entry.ver, info.state);
+ if (rc < 0)
+ return rc;
+
// write out new key to disk
bufferlist info_bl;
::encode(entry, info_bl);
@@ -326,7 +414,9 @@ static int read_index_entry(cls_method_context_t hctx, string& name, struct rgw_
return -EIO;
}
- CLS_LOG(1, "read_index_entry(): existing entry: epoch=%llu name=%s locator=%s\n", (unsigned long long)entry->epoch, entry->name.c_str(), entry->locator.c_str());
+ CLS_LOG(1, "read_index_entry(): existing entry: ver=%ld:%llu name=%s locator=%s\n",
+ (long)entry->ver.pool, (unsigned long long)entry->ver.epoch,
+ entry->name.c_str(), entry->locator.c_str());
return 0;
}
@@ -341,7 +431,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode request\n");
return -EINVAL;
}
- CLS_LOG(1, "rgw_bucket_complete_op(): request: op=%d name=%s epoch=%llu tag=%s\n", op.op, op.name.c_str(), (unsigned long long)op.epoch, op.tag.c_str());
+ CLS_LOG(1, "rgw_bucket_complete_op(): request: op=%d name=%s ver=%lu:%llu tag=%s\n",
+ op.op, op.name.c_str(),
+ (unsigned long)op.ver.pool, (unsigned long long)op.ver.epoch,
+ op.tag.c_str());
bufferlist header_bl;
struct rgw_bucket_dir_header header;
@@ -362,7 +455,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
rc = read_index_entry(hctx, op.name, &entry);
if (rc == -ENOENT) {
entry.name = op.name;
- entry.epoch = op.epoch;
+ entry.ver = op.ver;
entry.meta = op.meta;
entry.locator = op.locator;
ondisk = false;
@@ -385,13 +478,18 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
if (op.tag.size() && op.op == CLS_RGW_OP_CANCEL) {
CLS_LOG(1, "rgw_bucket_complete_op(): cancel requested\n");
cancel = true;
- } else if (op.epoch && op.epoch <= entry.epoch) {
+ } else if (op.ver.pool == entry.ver.pool &&
+ op.ver.epoch && op.ver.epoch <= entry.ver.epoch) {
CLS_LOG(1, "rgw_bucket_complete_op(): skipping request, old epoch\n");
cancel = true;
}
bufferlist op_bl;
if (cancel) {
+ rc = log_index_operation(hctx, op.name, op.op, entry.ver, CLS_RGW_STATE_COMPLETE);
+ if (rc < 0)
+ return rc;
+
if (op.tag.size()) {
bufferlist new_key_bl;
::encode(entry, new_key_bl);
@@ -405,7 +503,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
unaccount_entry(header, entry);
}
- switch (op.op) {
+ switch ((int)op.op) {
case CLS_RGW_OP_DEL:
if (ondisk) {
if (!entry.pending_map.size()) {
@@ -430,7 +528,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
struct rgw_bucket_category_stats& stats = header.stats[meta.category];
entry.meta = meta;
entry.name = op.name;
- entry.epoch = op.epoch;
+ entry.ver = op.ver;
entry.exists = true;
stats.num_entries++;
stats.total_size += meta.size;
@@ -444,6 +542,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
break;
}
+ rc = log_index_operation(hctx, op.name, op.op, entry.ver, CLS_RGW_STATE_COMPLETE);
+ if (rc < 0)
+ return rc;
+
list<string>::iterator remove_iter;
CLS_LOG(0, "rgw_bucket_complete_op(): remove_objs.size()=%d\n", (int)op.remove_objs.size());
for (remove_iter = op.remove_objs.begin(); remove_iter != op.remove_objs.end(); ++remove_iter) {
@@ -458,6 +560,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
CLS_LOG(0, "rgw_bucket_complete_op(): entry.name=%s entry.meta.category=%d\n", remove_entry.name.c_str(), remove_entry.meta.category);
unaccount_entry(header, remove_entry);
+ rc = log_index_operation(hctx, op.name, CLS_RGW_OP_DEL, remove_entry.ver, CLS_RGW_STATE_COMPLETE);
+ if (rc < 0)
+ continue;
+
ret = cls_cxx_map_remove_key(hctx, remove_oid_name);
if (ret < 0) {
CLS_LOG(1, "rgw_bucket_complete_op(): cls_cxx_map_remove_key, failed to remove entry, name=%s read_index_entry ret=%d\n", remove_oid_name.c_str(), rc);
@@ -899,13 +1005,6 @@ static int gc_omap_remove(cls_method_context_t hctx, int type, const string& key
return 0;
}
-static void get_time_key(utime_t& ut, string *key)
-{
- char buf[32];
- snprintf(buf, 32, "%011llu.%09u", (unsigned long long)ut.sec(), ut.nsec());
- *key = buf;
-}
-
static bool key_in_index(const string& key, int index_type)
{
const string& prefix = gc_index_prefixes[index_type];
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc
index aa140bd49b8..3113f9ed136 100644
--- a/src/cls/rgw/cls_rgw_client.cc
+++ b/src/cls/rgw/cls_rgw_client.cc
@@ -21,7 +21,7 @@ void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeou
o.exec("rgw", "bucket_set_tag_timeout", in);
}
-void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, uint8_t op, string& tag,
+void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator)
{
struct rgw_cls_obj_prepare_op call;
@@ -34,8 +34,8 @@ void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, uint8_t op, string& tag,
o.exec("rgw", "bucket_prepare_op", in);
}
-void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag,
- uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta,
+void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
list<string> *remove_objs)
{
@@ -44,7 +44,7 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag
call.op = op;
call.tag = tag;
call.name = name;
- call.epoch = epoch;
+ call.ver = ver;
call.meta = dir_meta;
if (remove_objs)
call.remove_objs = *remove_objs;
diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h
index 4ab8e902d4f..d955fab6a3a 100644
--- a/src/cls/rgw/cls_rgw_client.h
+++ b/src/cls/rgw/cls_rgw_client.h
@@ -10,11 +10,11 @@ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
-void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, uint8_t op, string& tag,
+void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator);
-void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, uint8_t op, string& tag,
- uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta,
+void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
list<string> *remove_objs);
int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc
index e6041592cd3..3058083b274 100644
--- a/src/cls/rgw/cls_rgw_ops.cc
+++ b/src/cls/rgw/cls_rgw_ops.cc
@@ -29,7 +29,8 @@ void rgw_cls_obj_complete_op::generate_test_instances(list<rgw_cls_obj_complete_
op->op = CLS_RGW_OP_DEL;
op->name = "name";
op->locator = "locator";
- op->epoch = 100;
+ op->ver.pool = 2;
+ op->ver.epoch = 100;
op->tag = "tag";
list<rgw_bucket_dir_entry_meta *> l;
@@ -47,7 +48,9 @@ void rgw_cls_obj_complete_op::dump(Formatter *f) const
f->dump_int("op", (int)op);
f->dump_string("name", name);
f->dump_string("locator", locator);
- f->dump_unsigned("epoch", epoch);
+ f->open_object_section("ver");
+ ver.dump(f);
+ f->close_section();
f->open_object_section("meta");
meta.dump(f);
f->close_section();
diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h
index 2ba72299fac..572c609f1d0 100644
--- a/src/cls/rgw/cls_rgw_ops.h
+++ b/src/cls/rgw/cls_rgw_ops.h
@@ -27,16 +27,17 @@ WRITE_CLASS_ENCODER(rgw_cls_tag_timeout_op)
struct rgw_cls_obj_prepare_op
{
- uint8_t op;
+ RGWModifyOp op;
string name;
string tag;
string locator;
- rgw_cls_obj_prepare_op() : op(0) {}
+ rgw_cls_obj_prepare_op() : op(CLS_RGW_OP_UNKNOWN) {}
void encode(bufferlist &bl) const {
ENCODE_START(3, 3, bl);
- ::encode(op, bl);
+ uint8_t c = (uint8_t)op;
+ ::encode(c, bl);
::encode(name, bl);
::encode(tag, bl);
::encode(locator, bl);
@@ -44,7 +45,9 @@ struct rgw_cls_obj_prepare_op
}
void decode(bufferlist::iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
- ::decode(op, bl);
+ uint8_t c;
+ ::decode(c, bl);
+ op = (RGWModifyOp)c;
::decode(name, bl);
::decode(tag, bl);
if (struct_v >= 2) {
@@ -59,20 +62,21 @@ WRITE_CLASS_ENCODER(rgw_cls_obj_prepare_op)
struct rgw_cls_obj_complete_op
{
- uint8_t op;
+ RGWModifyOp op;
string name;
string locator;
- uint64_t epoch;
+ rgw_bucket_entry_ver ver;
struct rgw_bucket_dir_entry_meta meta;
string tag;
list<string> remove_objs;
void encode(bufferlist &bl) const {
- ENCODE_START(4, 3, bl);
- ::encode(op, bl);
+ ENCODE_START(5, 3, bl);
+ uint8_t c = (uint8_t)op;
+ ::encode(c, bl);
::encode(name, bl);
- ::encode(epoch, bl);
+ ::encode(ver, bl);
::encode(meta, bl);
::encode(tag, bl);
::encode(locator, bl);
@@ -80,10 +84,17 @@ struct rgw_cls_obj_complete_op
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, bl);
- ::decode(op, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
+ uint8_t c;
+ ::decode(c, bl);
+ op = (RGWModifyOp)c;
::decode(name, bl);
- ::decode(epoch, bl);
+ if (struct_v >= 5) {
+ ::decode(ver, bl);
+ } else {
+ ver.pool = 0;
+ ::decode(ver.epoch, bl);
+ }
::decode(meta, bl);
::decode(tag, bl);
if (struct_v >= 2) {
diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc
index 1c40e02922e..5a9dd9f548f 100644
--- a/src/cls/rgw/cls_rgw_types.cc
+++ b/src/cls/rgw/cls_rgw_types.cc
@@ -55,7 +55,8 @@ void rgw_bucket_dir_entry::generate_test_instances(list<rgw_bucket_dir_entry*>&
rgw_bucket_dir_entry_meta *m = *iter;
rgw_bucket_dir_entry *e = new rgw_bucket_dir_entry;
e->name = "name";
- e->epoch = 1234;
+ e->ver.pool = 1;
+ e->ver.epoch = 1234;
e->locator = "locator";
e->exists = true;
e->meta = *m;
@@ -66,11 +67,18 @@ void rgw_bucket_dir_entry::generate_test_instances(list<rgw_bucket_dir_entry*>&
}
o.push_back(new rgw_bucket_dir_entry);
}
+void rgw_bucket_entry_ver::dump(Formatter *f) const
+{
+ f->dump_unsigned("pool", pool);
+ f->dump_unsigned("epoch", epoch);
+}
void rgw_bucket_dir_entry::dump(Formatter *f) const
{
f->dump_string("name", name);
- f->dump_unsigned("epoch", epoch);
+ f->open_object_section("ver");
+ ver.dump(f);
+ f->close_section();
f->dump_string("locator", locator);
f->dump_int("exists", (int)exists);
f->open_object_section("meta");
diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h
index 42fcef89e50..7f7c9ccc88c 100644
--- a/src/cls/rgw/cls_rgw_types.h
+++ b/src/cls/rgw/cls_rgw_types.h
@@ -20,9 +20,10 @@ enum RGWPendingState {
};
enum RGWModifyOp {
- CLS_RGW_OP_ADD = 0,
- CLS_RGW_OP_DEL = 1,
- CLS_RGW_OP_CANCEL = 2,
+ CLS_RGW_OP_ADD = 0,
+ CLS_RGW_OP_DEL = 1,
+ CLS_RGW_OP_CANCEL = 2,
+ CLS_RGW_OP_UNKNOWN = 3,
};
struct rgw_bucket_pending_info {
@@ -95,21 +96,96 @@ struct rgw_bucket_dir_entry_meta {
};
WRITE_CLASS_ENCODER(rgw_bucket_dir_entry_meta)
+template<class T>
+void encode_packed_val(T val, bufferlist& bl)
+{
+ unsigned char c = 0x80 | (unsigned char)sizeof(T);
+ ::encode(c, bl);
+ ::encode(val, bl);
+}
+
+template<class T>
+void decode_packed_val(T val, bufferlist::iterator& bl)
+{
+ unsigned char c;
+ ::decode(c, bl);
+ if (c < 0x80) {
+ val = c;
+ return;
+ }
+
+ c ^= 0x80;
+
+ switch (c) {
+ case 1:
+ {
+ uint8_t v;
+ ::decode(v, bl);
+ val = v;
+ }
+ case 2:
+ {
+ uint16_t v;
+ ::decode(v, bl);
+ val = v;
+ }
+ case 4:
+ {
+ uint32_t v;
+ ::decode(v, bl);
+ val = v;
+ }
+ case 8:
+ {
+ uint64_t v;
+ ::decode(v, bl);
+ val = v;
+ }
+ break;
+ default:
+ throw buffer::error();
+ }
+}
+
+struct rgw_bucket_entry_ver {
+ int64_t pool;
+ uint64_t epoch;
+
+ rgw_bucket_entry_ver() : pool(-1), epoch(0) {}
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode_packed_val(pool, bl);
+ ::encode_packed_val(epoch, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ DECODE_START(1, bl);
+ ::decode_packed_val(pool, bl);
+ ::decode_packed_val(epoch, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<rgw_bucket_entry_ver*>& o);
+};
+WRITE_CLASS_ENCODER(rgw_bucket_entry_ver)
+
+
struct rgw_bucket_dir_entry {
std::string name;
- uint64_t epoch;
+ rgw_bucket_entry_ver ver;
std::string locator;
bool exists;
struct rgw_bucket_dir_entry_meta meta;
map<string, struct rgw_bucket_pending_info> pending_map;
rgw_bucket_dir_entry() :
- epoch(0), exists(false) {}
+ exists(false) {}
void encode(bufferlist &bl) const {
- ENCODE_START(3, 3, bl);
+ ENCODE_START(4, 3, bl);
::encode(name, bl);
- ::encode(epoch, bl);
+ ::encode(ver, bl);
::encode(exists, bl);
::encode(meta, bl);
::encode(pending_map, bl);
@@ -117,9 +193,14 @@ struct rgw_bucket_dir_entry {
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, bl);
::decode(name, bl);
- ::decode(epoch, bl);
+ if (struct_v >= 4) {
+ ::decode(ver, bl);
+ } else {
+ ver.pool = 0;
+ ::decode(ver.epoch, bl);
+ }
::decode(exists, bl);
::decode(meta, bl);
::decode(pending_map, bl);
@@ -133,6 +214,43 @@ struct rgw_bucket_dir_entry {
};
WRITE_CLASS_ENCODER(rgw_bucket_dir_entry)
+struct rgw_bi_log_entry {
+ string object;
+ utime_t timestamp;
+ rgw_bucket_entry_ver ver;
+ RGWModifyOp op;
+ RGWPendingState state;
+
+ rgw_bi_log_entry() : op(CLS_RGW_OP_UNKNOWN) {}
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(object, bl);
+ ::encode(timestamp, bl);
+ ::encode(ver, bl);
+ uint8_t c = (uint8_t)op;
+ ::encode(c, bl);
+ c = (uint8_t)state;
+ ::encode(state, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ DECODE_START(1, bl);
+ ::decode(object, bl);
+ ::decode(timestamp, bl);
+ ::decode(ver, bl);
+ uint8_t c;
+ ::decode(c, bl);
+ op = (RGWModifyOp)c;
+ ::decode(c, bl);
+ state = (RGWPendingState)c;
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<rgw_bi_log_entry*>& o);
+};
+WRITE_CLASS_ENCODER(rgw_bi_log_entry)
+
struct rgw_bucket_category_stats {
uint64_t total_size;
uint64_t total_size_rounded;
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index c1ad786a063..c62a34a449a 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -1595,6 +1595,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size,
string index_tag;
uint64_t epoch;
+ int64_t poolid;
utime_t ut;
r = prepare_update_index(NULL, bucket, obj, index_tag);
if (r < 0)
@@ -1609,6 +1610,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size,
}
epoch = io_ctx.get_last_version();
+ poolid = io_ctx.get_id();
r = complete_atomic_overwrite(rctx, state, obj);
if (r < 0) {
@@ -1616,7 +1618,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size,
}
ut = ceph_clock_now(cct);
- r = complete_update_index(bucket, obj.object, index_tag, epoch, size,
+ r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, remove_objs);
if (r < 0)
goto done_cancel;
@@ -2230,9 +2232,10 @@ int RGWRados::delete_obj_impl(void *ctx, rgw_obj& obj)
r = io_ctx.operate(oid, &op);
bool removed = (r >= 0);
+ int64_t poolid = io_ctx.get_id();
if ((r >= 0 || r == -ENOENT) && bucket.marker.size()) {
uint64_t epoch = io_ctx.get_last_version();
- r = complete_update_index_del(bucket, obj.object, tag, epoch);
+ r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch);
} else {
int ret = complete_update_index_cancel(bucket, obj.object, tag);
if (ret < 0) {
@@ -2276,7 +2279,7 @@ int RGWRados::delete_obj_index(rgw_obj& obj)
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
string tag;
- int r = complete_update_index_del(bucket, obj.object, tag, 0);
+ int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0);
return r;
}
@@ -2884,7 +2887,7 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
return ret;
}
-int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs)
{
@@ -2907,7 +2910,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag
ent.owner_display_name = owner.get_display_name();
ent.content_type = content_type;
- int ret = cls_obj_complete_add(bucket, tag, epoch, ent, category, remove_objs);
+ int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs);
return ret;
}
@@ -3008,6 +3011,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
string tag;
uint64_t epoch = 0;
+ int64_t poolid = io_ctx.get_id();
int ret;
if (update_index) {
@@ -3025,7 +3029,7 @@ done:
if (update_index) {
if (ret >= 0) {
- ret = complete_update_index(bucket, dst_obj.object, tag, epoch, size,
+ ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, NULL);
} else {
int r = complete_update_index_cancel(bucket, dst_obj.object, tag);
@@ -3989,7 +3993,7 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteO
return r;
}
-int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag,
+int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
string& name, string& locator)
{
librados::IoCtx io_ctx;
@@ -4005,7 +4009,9 @@ int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag,
return r;
}
-int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category,
+int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+ int64_t pool, uint64_t epoch,
+ RGWObjEnt& ent, RGWObjCategory category,
list<string> *remove_objs)
{
librados::IoCtx io_ctx;
@@ -4024,7 +4030,11 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, u
dir_meta.owner_display_name = ent.owner_display_name;
dir_meta.content_type = ent.content_type;
dir_meta.category = category;
- cls_rgw_bucket_complete_op(o, op, tag, epoch, ent.name, dir_meta, remove_objs);
+
+ rgw_bucket_entry_ver ver;
+ ver.pool = pool;
+ ver.epoch = epoch;
+ cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
r = io_ctx.aio_operate(oid, c, &o);
@@ -4032,23 +4042,28 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, u
return r;
}
-int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs)
+int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+ int64_t pool, uint64_t epoch,
+ RGWObjEnt& ent, RGWObjCategory category,
+ list<string> *remove_objs)
{
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, epoch, ent, category, remove_objs);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs);
}
-int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name)
+int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+ int64_t pool, uint64_t epoch,
+ string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
@@ -4219,7 +4234,7 @@ int RGWRados::remove_objs_from_index(rgw_bucket& bucket, list<string>& oid_list)
string& oid = *iter;
dout(2) << "RGWRados::remove_objs_from_index bucket=" << bucket << " oid=" << oid << dendl;
rgw_bucket_dir_entry entry;
- entry.epoch = (uint64_t)-1; // ULLONG_MAX, needed to that objclass doesn't skip out request
+ entry.ver.epoch = (uint64_t)-1; // ULLONG_MAX, needed to that objclass doesn't skip out request
entry.name = oid;
updates.append(CEPH_RGW_REMOVE);
::encode(entry, updates);
@@ -4265,7 +4280,8 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
* to handle!) */
}
// encode a suggested removal of that key
- list_state.epoch = io_ctx.get_last_version();
+ list_state.ver.epoch = io_ctx.get_last_version();
+ list_state.ver.pool = io_ctx.get_id();
cls_rgw_encode_suggestion(CEPH_RGW_REMOVE, list_state, suggested_updates);
return -ENOENT;
}
@@ -4317,7 +4333,8 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
object.owner_display_name = owner.get_display_name();
// encode suggested updates
- list_state.epoch = astate->epoch;
+ list_state.ver.pool = io_ctx.get_id();
+ list_state.ver.epoch = astate->epoch;
list_state.meta.size = object.size;
list_state.meta.mtime.set_from_double(double(object.mtime));
list_state.meta.category = main_category;
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index b3719aa88f1..3ef8a9bced2 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -939,12 +939,12 @@ public:
virtual int put_bucket_info(string& bucket_name, RGWBucketInfo& info, bool exclusive, map<string, bufferlist> *pattrs);
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag,
+ int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
string& name, string& locator);
- int cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch,
+ int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name);
+ int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+ int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
@@ -953,11 +953,11 @@ public:
int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
rgw_obj& oid, string& tag);
- int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size,
+ int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs);
- int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch) {
- return cls_obj_complete_del(bucket, tag, epoch, oid);
+ int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) {
+ return cls_obj_complete_del(bucket, tag, pool, epoch, oid);
}
int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) {
return cls_obj_complete_cancel(bucket, tag, oid);
diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc
index 8e0d23f3d35..59a2b14e972 100644
--- a/src/test/cls_rgw/test_cls_rgw.cc
+++ b/src/test/cls_rgw/test_cls_rgw.cc
@@ -74,17 +74,20 @@ void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_
ASSERT_EQ(num_entries, stats.num_entries);
}
-void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, int index_op, string& tag, string& obj, string& loc)
+void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
{
ObjectWriteOperation *op = mgr.write_op();
cls_rgw_bucket_prepare_op(*op, index_op, tag, obj, loc);
ASSERT_EQ(0, ioctx.operate(oid, op));
}
-void index_complete(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, int index_op, string& tag, int epoch, string& obj, rgw_bucket_dir_entry_meta& meta)
+void index_complete(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, int epoch, string& obj, rgw_bucket_dir_entry_meta& meta)
{
ObjectWriteOperation *op = mgr.write_op();
- cls_rgw_bucket_complete_op(*op, index_op, tag, epoch, obj, meta, NULL);
+ rgw_bucket_entry_ver ver;
+ ver.pool = ioctx.get_id();
+ ver.epoch = epoch;
+ cls_rgw_bucket_complete_op(*op, index_op, tag, ver, obj, meta, NULL);
ASSERT_EQ(0, ioctx.operate(oid, op));
}