diff options
49 files changed, 4550 insertions, 954 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 21ab497f2cf..dff7489de43 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -158,7 +158,9 @@ rgw_dencoder_src = rgw/rgw_dencoder.cc \ ceph_dencoder_SOURCES = test/encoding/ceph_dencoder.cc ${rgw_dencoder_src} perfglue/disabled_heap_profiler.cc ceph_dencoder_CXXFLAGS = ${AM_CXXFLAGS} -ceph_dencoder_LDADD = $(LIBGLOBAL_LDA) libcls_lock_client.a libcls_rgw_client.a libosd.a libmds.a libosdc.la $(LIBOS_LDA) libmon.a +ceph_dencoder_LDADD = $(LIBGLOBAL_LDA) libcls_lock_client.a \ + libcls_rgw_client.a libcls_replica_log_client.a \ + libosd.a libmds.a libosdc.la $(LIBOS_LDA) libmon.a bin_PROGRAMS += ceph-dencoder mount_ceph_SOURCES = mount/mount.ceph.c common/armor.c common/safe_io.c common/secret.c include/addr_parsing.c @@ -399,14 +401,16 @@ librgw_a_SOURCES = \ rgw/rgw_cors.cc \ rgw/rgw_cors_s3.cc \ rgw/rgw_auth_s3.cc \ - rgw/rgw_metadata.cc + rgw/rgw_metadata.cc \ + rgw/rgw_replica_log.cc librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS} librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS} noinst_LIBRARIES += librgw.a my_radosgw_ldadd = \ libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \ - libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \ + libcls_statelog_client.a libcls_replica_log_client.a libcls_lock_client.a \ + libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \ $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) radosgw_SOURCES = \ @@ -418,6 +422,7 @@ radosgw_SOURCES = \ rgw/rgw_rest_user.cc \ rgw/rgw_rest_bucket.cc \ rgw/rgw_rest_metadata.cc \ + rgw/rgw_replica_log.cc \ rgw/rgw_rest_log.cc \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ @@ -571,6 +576,22 @@ libcls_log_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex radoslib_LTLIBRARIES += libcls_log.la +libcls_statelog_la_SOURCES = cls/statelog/cls_statelog.cc +libcls_statelog_la_CFLAGS = ${AM_CFLAGS} +libcls_statelog_la_CXXFLAGS= ${AM_CXXFLAGS} +libcls_statelog_la_LIBADD = $(PTHREAD_LIBS) $(EXTRALIBS) +libcls_statelog_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '.*__cls_.*' + +radoslib_LTLIBRARIES += libcls_statelog.la + +# replica log class +libcls_replica_log_la_SOURCES = cls/replica_log/cls_replica_log.cc +libcls_replica_log_la_CFLAGS = ${AM_CFLAGS} +libcls_replica_log_la_CXXFLAGS= ${AM_CXXFLAGS} +libcls_replica_log_la_LIBADD = $(PTHREAD_LIBS) $(EXTRALIBS) +libcls_replica_log_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '.*__cls_.*' + +radoslib_LTLIBRARIES += libcls_replica_log.la if WITH_RADOSGW # rgw: rados gateway @@ -602,6 +623,16 @@ libcls_log_client_a_SOURCES = \ cls/log/cls_log_client.cc noinst_LIBRARIES += libcls_log_client.a +libcls_statelog_client_a_SOURCES = \ + cls/statelog/cls_statelog_client.cc +noinst_LIBRARIES += libcls_statelog_client.a + +libcls_replica_log_client_a_SOURCES = \ + cls/replica_log/cls_replica_log_types.cc \ + cls/replica_log/cls_replica_log_ops.cc \ + cls/replica_log/cls_replica_log_client.cc +noinst_LIBRARIES += libcls_replica_log_client.a + libcls_rgw_client_a_SOURCES = \ cls/rgw/cls_rgw_client.cc \ cls/rgw/cls_rgw_types.cc \ @@ -945,14 +976,14 @@ bin_DEBUGPROGRAMS += ceph_test_cors unittest_rgw_meta_SOURCES = test/test_rgw_admin_meta.cc unittest_rgw_meta_LDFLAGS = libglobal.la unittest_rgw_meta_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \ - libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a + libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a unittest_rgw_meta_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} bin_DEBUGPROGRAMS += unittest_rgw_meta unittest_rgw_log_SOURCES = test/test_rgw_admin_log.cc unittest_rgw_log_LDFLAGS = libglobal.la unittest_rgw_log_LDADD = librgw.a ${UNITTEST_LDADD} ${UNITTEST_STATIC_LDADD} -lcryptopp -lcurl -luuid -lexpat librados.la libcls_version_client.a \ - libcls_log_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a + libcls_log_client.a libcls_statelog_client.a libcls_refcount_client.a libcls_rgw_client.a libcls_lock_client.a unittest_rgw_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} bin_DEBUGPROGRAMS += unittest_rgw_log endif @@ -995,6 +1026,19 @@ ceph_test_cls_log_LDADD = libglobal.la librados.la libcls_log_client.a ${UNITTES ceph_test_cls_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} bin_DEBUGPROGRAMS += ceph_test_cls_log +ceph_test_cls_statelog_SOURCES = test/cls_statelog/test_cls_statelog.cc \ + test/librados/test.cc +ceph_test_cls_statelog_LDADD = libglobal.la librados.la libcls_statelog_client.a ${UNITTEST_STATIC_LDADD} +ceph_test_cls_statelog_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} +bin_DEBUGPROGRAMS += ceph_test_cls_statelog +ceph_test_cls_replica_log_SOURCES = \ + test/cls_replica_log/test_cls_replica_log.cc \ + test/librados/test.cc +ceph_test_cls_replica_log_LDADD = libglobal.la librados.la \ + libcls_replica_log_client.a ${UNITTEST_STATIC_LDADD} +ceph_test_cls_replica_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} +bin_DEBUGPROGRAMS += ceph_test_cls_replica_log + ceph_test_cls_lock_SOURCES = test/cls_lock/test_cls_lock.cc test/librados/test.cc ceph_test_cls_lock_LDFLAGS = ${AM_LDFLAGS} ceph_test_cls_lock_LDADD = libcls_lock_client.a librados.la ${UNITTEST_STATIC_LDADD} @@ -1645,6 +1689,8 @@ noinst_HEADERS = \ cls/log/cls_log_types.h\ cls/log/cls_log_ops.h\ cls/log/cls_log_client.h\ + cls/statelog/cls_statelog_types.h\ + cls/statelog/cls_statelog_ops.h\ cls/rgw/cls_rgw_client.h\ cls/rgw/cls_rgw_ops.h\ cls/rgw/cls_rgw_types.h\ @@ -2082,6 +2128,7 @@ noinst_HEADERS = \ rgw/rgw_swift.h\ rgw/rgw_swift_auth.h\ rgw/rgw_rados.h\ + rgw/rgw_replica_log.h \ rgw/rgw_resolve.h\ rgw/rgw_rest.h\ rgw/rgw_rest_swift.h\ diff --git a/src/cls/replica_log/cls_replica_log.cc b/src/cls/replica_log/cls_replica_log.cc new file mode 100644 index 00000000000..1083b8adc38 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log.cc @@ -0,0 +1,151 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + * Copyright Inktank 2013 + */ + +#include "objclass/objclass.h" +#include "global/global_context.h" + +#include "cls_replica_log_types.h" +#include "cls_replica_log_ops.h" + +CLS_VER(1, 0) +CLS_NAME(replica_log) + +cls_handle_t h_class; +cls_method_handle_t h_replica_log_set; +cls_method_handle_t h_replica_log_delete; +cls_method_handle_t h_replica_log_get; + +static const string replica_log_prefix = "rl_"; +static const string replica_log_bounds = replica_log_prefix + "bounds"; + +static int get_bounds(cls_method_context_t hctx, cls_replica_log_bound& bound) +{ + bufferlist bounds_bl; + int rc = cls_cxx_map_get_val(hctx, replica_log_bounds, &bounds_bl); + if (rc < 0) { + return rc; + } + + try { + bufferlist::iterator bounds_bl_i = bounds_bl.begin(); + ::decode(bound, bounds_bl_i); + } catch (buffer::error& err) { + bound = cls_replica_log_bound(); + CLS_LOG(0, "ERROR: get_bounds(): failed to decode on-disk bounds object"); + return -EIO; + } + + return 0; +} + +static int write_bounds(cls_method_context_t hctx, + const cls_replica_log_bound& bound) +{ + bufferlist bounds_bl; + ::encode(bound, bounds_bl); + return cls_cxx_map_set_val(hctx, replica_log_bounds, &bounds_bl); +} + +static int cls_replica_log_set(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_replica_log_set_marker_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_replica_log_set(): failed to decode op"); + return -EINVAL; + } + + cls_replica_log_bound bound; + int rc = get_bounds(hctx, bound); + if (rc < 0 && rc != -ENOENT) { + return rc; + } + + rc = bound.update_marker(op.marker); + if (rc < 0) { + return rc; + } + + return write_bounds(hctx, bound); +} + +static int cls_replica_log_delete(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_replica_log_delete_marker_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_replica_log_delete(): failed to decode op"); + return -EINVAL; + } + + cls_replica_log_bound bound; + int rc = get_bounds(hctx, bound); + if (rc < 0 && rc != -ENOENT) { + return rc; + } + + rc = bound.delete_marker(op.entity_id); + if (rc < 0) { + return rc; + } + + return write_bounds(hctx, bound); +} + +static int cls_replica_log_get(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_replica_log_get_bounds_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_replica_log_get(): failed to decode op"); + return -EINVAL; + } + + cls_replica_log_bound bound; + int rc = get_bounds(hctx, bound); + if (rc < 0) { + return rc; + } + + cls_replica_log_get_bounds_ret ret; + ret.oldest_time = bound.get_oldest_time(); + ret.position_marker = bound.get_lowest_marker_bound(); + bound.get_markers(ret.markers); + + ::encode(ret, *out); + return 0; +} + +void __cls_init() +{ + CLS_LOG(1, "Loaded replica log class!"); + + cls_register("replica_log", &h_class); + + cls_register_cxx_method(h_class, "set", CLS_METHOD_RD | CLS_METHOD_WR, + cls_replica_log_set, &h_replica_log_set); + cls_register_cxx_method(h_class, "get", CLS_METHOD_RD, + cls_replica_log_get, &h_replica_log_get); + cls_register_cxx_method(h_class, "delete", CLS_METHOD_RD | CLS_METHOD_WR, + cls_replica_log_delete, &h_replica_log_delete); +} diff --git a/src/cls/replica_log/cls_replica_log_client.cc b/src/cls/replica_log/cls_replica_log_client.cc new file mode 100644 index 00000000000..5591389c78a --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_client.cc @@ -0,0 +1,92 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include <errno.h> + +#include "cls/replica_log/cls_replica_log_ops.h" +#include "include/rados/librados.hpp" + +using namespace librados; + +void cls_replica_log_prepare_marker(cls_replica_log_progress_marker& progress, + const string& entity, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries) +{ + progress.entity_id = entity; + progress.position_marker = marker; + progress.position_time = time; + if (entries) { + list<pair<string, utime_t> >::const_iterator i; + for (i = entries->begin(); i != entries->end(); ++i) { + cls_replica_log_item_marker item(i->first, i->second); + progress.items.push_back(item); + } + } +} + +void cls_replica_log_extract_marker(const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t& time, + list<pair<string, utime_t> >& entries) +{ + entity = progress.entity_id; + marker = progress.position_marker; + time = progress.position_time; + list<cls_replica_log_item_marker>::const_iterator i; + for (i = progress.items.begin(); i != progress.items.end(); ++i) { + entries.push_back(make_pair(i->item_name, i->item_timestamp)); + } +} + +void cls_replica_log_update_bound(librados::ObjectWriteOperation& o, + const cls_replica_log_progress_marker& progress) +{ + cls_replica_log_set_marker_op op(progress); + bufferlist in; + ::encode(op, in); + o.exec("replica_log", "set", in); +} + +void cls_replica_log_delete_bound(librados::ObjectWriteOperation& o, + const string& entity) +{ + cls_replica_log_delete_marker_op op(entity); + bufferlist in; + ::encode(op, in); + o.exec("replica_log", "delete", in); +} + +int cls_replica_log_get_bounds(librados::IoCtx& io_ctx, const string& oid, + string& position_marker, + utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) +{ + bufferlist in; + bufferlist out; + cls_replica_log_get_bounds_op op; + ::encode(op, in); + int r = io_ctx.exec(oid, "replica_log", "get", in, out); + if (r < 0) + return r; + + cls_replica_log_get_bounds_ret ret; + try { + bufferlist::iterator i = out.begin(); + ::decode(ret, i); + } catch (buffer::error& err) { + return -EIO; + } + + position_marker = ret.position_marker; + oldest_time = ret.oldest_time; + markers = ret.markers; + + return 0; +} diff --git a/src/cls/replica_log/cls_replica_log_client.h b/src/cls/replica_log/cls_replica_log_client.h new file mode 100644 index 00000000000..d1a83e08ff6 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_client.h @@ -0,0 +1,84 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + * Copyright 2013 Inktank + */ + +#ifndef CLS_REPLICA_LOG_CLIENT_H_ +#define CLS_REPLICA_LOG_CLIENT_H_ + +#include "include/rados/librados.hpp" +#include "cls_replica_log_types.h" + +/** + * Prepare a progress marker object to send out. + * + * @param progress The marker object to prepare + * @param entity The ID of the entity setting the progress + * @param marker The marker key the entity has gotten to + * @param time The timestamp associated with the marker + * param entries A list of in-progress entries prior to the marker + */ +void cls_replica_log_prepare_marker(cls_replica_log_progress_marker& progress, + const string& entity, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries); + +/** + * Extract a progress marker object into its components. + * + * @param progress The marker object to extract data from + * @param entity [out] The ID of the entity the progress is associated with + * @param marker [out] The marker key the entity has gotten to + * @param time [out] The timestamp associated with the marker + * @param entries [out] List of in-progress entries prior to the marker + */ +void cls_replica_log_extract_marker(const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t& time, + list<pair<string, utime_t> >& entries); + +/** + * Add a progress marker update to a write op. The op will return 0 on + * success, -EEXIST if the marker conflicts with an existing one, or + * -EINVAL if the marker is in conflict (ie, before) the daemon's existing + * marker. + * + * @param op The op to add the update to + * @param progress The progress marker to send + */ +void cls_replica_log_update_bound(librados::ObjectWriteOperation& op, + const cls_replica_log_progress_marker& progress); + +/** + * Remove an entity's progress marker from the replica log. The op will return + * 0 on success, -ENOENT if the entity does not exist on the replica log, or + * -ENOTEMPTY if the items list on the marker is not empty. + * + * @param op The op to add the delete to + * @param entity The entity whose progress should be removed + */ +void cls_replica_log_delete_bound(librados::ObjectWriteOperation& op, + const string& entity); + +/** + * Read the bounds on a replica log. + * + * @param io_ctx The IoCtx to use for the read + * @param oid The oid to direct the read to + * @param position_marker [out] The lowest marker key that has been reached + * @param oldest_time [out] Timestamp corresponding to the position marker or + * oldest in-progress item. + * @param markers [out] List of progress markers for individual daemons + */ +int cls_replica_log_get_bounds(librados::IoCtx& io_ctx, const string& oid, + string& position_marker, + utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers); + +#endif /* CLS_REPLICA_LOG_CLIENT_H_ */ diff --git a/src/cls/replica_log/cls_replica_log_ops.cc b/src/cls/replica_log/cls_replica_log_ops.cc new file mode 100644 index 00000000000..7d653b64613 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_ops.cc @@ -0,0 +1,81 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "cls_replica_log_ops.h" +#include "common/Formatter.h" +#include "common/ceph_json.h" + +void cls_replica_log_delete_marker_op::dump(Formatter *f) const +{ + f->dump_string("entity_id", entity_id); +} + +void cls_replica_log_delete_marker_op:: +generate_test_instances(std::list<cls_replica_log_delete_marker_op*>& ls) +{ + ls.push_back(new cls_replica_log_delete_marker_op); + ls.push_back(new cls_replica_log_delete_marker_op); + ls.back()->entity_id = "test_entity_1"; +} + +void cls_replica_log_set_marker_op::dump(Formatter *f) const +{ + encode_json("marker", marker, f); +} + +void cls_replica_log_set_marker_op:: +generate_test_instances(std::list<cls_replica_log_set_marker_op*>& ls) +{ + std::list<cls_replica_log_progress_marker*> samples; + cls_replica_log_progress_marker::generate_test_instances(samples); + std::list<cls_replica_log_progress_marker*>::iterator i; + for (i = samples.begin(); i != samples.end(); ++i) { + ls.push_back(new cls_replica_log_set_marker_op(*(*i))); + } +} + +void cls_replica_log_get_bounds_op::dump(Formatter *f) const +{ + f->dump_string("contents", "empty"); +} + +void cls_replica_log_get_bounds_op:: +generate_test_instances(std::list<cls_replica_log_get_bounds_op*>& ls) +{ + ls.push_back(new cls_replica_log_get_bounds_op); +} + +void cls_replica_log_get_bounds_ret::dump(Formatter *f) const +{ + f->dump_string("position_marker", position_marker); + oldest_time.gmtime(f->dump_stream("oldest_time")); + encode_json("entity_markers", markers, f); +} + +void cls_replica_log_get_bounds_ret:: +generate_test_instances(std::list<cls_replica_log_get_bounds_ret*>& ls) +{ + std::list<cls_replica_log_progress_marker*> samples; + cls_replica_log_progress_marker::generate_test_instances(samples); + std::list<cls_replica_log_progress_marker> samples_whole; + std::list<cls_replica_log_progress_marker*>::iterator i; + int count = 0; + for (i = samples.begin(); i != samples.end(); ++i) { + ls.push_back(new cls_replica_log_get_bounds_ret()); + ls.back()->markers.push_back(*(*i)); + ls.back()->oldest_time.set_from_double(1000*count); + ls.back()->position_marker = ls.back()->markers.front().position_marker; + samples_whole.push_back(*(*i)); + } + ls.push_back(new cls_replica_log_get_bounds_ret()); + ls.back()->markers = samples_whole; + ls.back()->oldest_time = samples_whole.back().position_time; + ls.back()->position_marker = samples_whole.back().position_marker; +} diff --git a/src/cls/replica_log/cls_replica_log_ops.h b/src/cls/replica_log/cls_replica_log_ops.h new file mode 100644 index 00000000000..0905bb94e84 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_ops.h @@ -0,0 +1,112 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef CLS_REPLICA_LOG_OPS_H_ +#define CLS_REPLICA_LOG_OPS_H_ + +#include "include/types.h" +#include "cls_replica_log_types.h" + +struct cls_replica_log_delete_marker_op { + string entity_id; + cls_replica_log_delete_marker_op() {} + cls_replica_log_delete_marker_op(const string& id) : entity_id(id) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entity_id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entity_id, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_delete_marker_op*>& ls); + +}; +WRITE_CLASS_ENCODER(cls_replica_log_delete_marker_op) + +struct cls_replica_log_set_marker_op { + cls_replica_log_progress_marker marker; + cls_replica_log_set_marker_op() {} + cls_replica_log_set_marker_op(const cls_replica_log_progress_marker& m) : + marker(m) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_set_marker_op*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_set_marker_op) + +struct cls_replica_log_get_bounds_op { + cls_replica_log_get_bounds_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_get_bounds_op*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_get_bounds_op) + +struct cls_replica_log_get_bounds_ret { + string position_marker; // oldest log listing position on the master + utime_t oldest_time; // oldest timestamp associated with position or an item + std::list<cls_replica_log_progress_marker> markers; + + cls_replica_log_get_bounds_ret() {} + cls_replica_log_get_bounds_ret(const string& pos_marker, + const utime_t& time, + const std::list<cls_replica_log_progress_marker>& m) : + position_marker(pos_marker), oldest_time(time), markers(m) + {} + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(position_marker, bl); + ::encode(oldest_time, bl); + ::encode(markers, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(position_marker, bl); + ::decode(oldest_time, bl); + ::decode(markers, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_get_bounds_ret*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_get_bounds_ret) + +#endif /* CLS_REPLICA_LOG_OPS_H_ */ diff --git a/src/cls/replica_log/cls_replica_log_types.cc b/src/cls/replica_log/cls_replica_log_types.cc new file mode 100644 index 00000000000..ba89fd42ae3 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_types.cc @@ -0,0 +1,80 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "cls_replica_log_types.h" + +#include "common/Formatter.h" +#include "common/ceph_json.h" + +void cls_replica_log_item_marker::dump(Formatter *f) const +{ + f->dump_string("item name", item_name); + f->dump_stream("item timestamp") << item_timestamp; +} + +void cls_replica_log_item_marker:: +generate_test_instances(std::list<cls_replica_log_item_marker*>& ls) +{ + ls.push_back(new cls_replica_log_item_marker); + ls.back()->item_name = "test_item_1"; + ls.back()->item_timestamp.set_from_double(0); + ls.push_back(new cls_replica_log_item_marker); + ls.back()->item_name = "test_item_2"; + ls.back()->item_timestamp.set_from_double(20); +} + +void cls_replica_log_progress_marker::dump(Formatter *f) const +{ + f->dump_string("entity", entity_id); + f->dump_string("position_marker", position_marker); + position_time.gmtime(f->dump_stream("position_time")); + encode_json("items_in_progress", items, f); +} + +void cls_replica_log_progress_marker:: +generate_test_instances(std::list<cls_replica_log_progress_marker*>& ls) +{ + ls.push_back(new cls_replica_log_progress_marker); + ls.push_back(new cls_replica_log_progress_marker); + ls.back()->entity_id = "entity1"; + ls.back()->position_marker = "pos1"; + ls.back()->position_time.set_from_double(20); + + std::list<cls_replica_log_item_marker*> test_items; + cls_replica_log_item_marker::generate_test_instances(test_items); + std::list<cls_replica_log_item_marker*>::iterator i = test_items.begin(); + for ( ; i != test_items.end(); ++i) { + ls.back()->items.push_back(*(*i)); + } +} + +void cls_replica_log_bound::dump(Formatter *f) const +{ + f->dump_string("position_marker", position_marker); + position_time.gmtime(f->dump_stream("position_time")); + f->dump_string("marker_exists", marker_exists ? "yes" : "no"); + if (marker_exists) { + encode_json("marker", marker, f); //progress marker + } +} + +void cls_replica_log_bound:: +generate_test_instances(std::list<cls_replica_log_bound*>& ls) +{ + ls.push_back(new cls_replica_log_bound); + std::list<cls_replica_log_progress_marker*> marker_objects; + cls_replica_log_progress_marker::generate_test_instances(marker_objects); + std::list<cls_replica_log_progress_marker*>::iterator i = + marker_objects.begin(); + ls.back()->update_marker(*(*i)); + ls.push_back(new cls_replica_log_bound); + ++i; + ls.back()->update_marker(*(*i)); +} diff --git a/src/cls/replica_log/cls_replica_log_types.h b/src/cls/replica_log/cls_replica_log_types.h new file mode 100644 index 00000000000..39dc22e4456 --- /dev/null +++ b/src/cls/replica_log/cls_replica_log_types.h @@ -0,0 +1,189 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + * Copyright 2013 Inktank + */ + +#ifndef CLS_REPLICA_LOG_TYPES_H_ +#define CLS_REPLICA_LOG_TYPES_H_ + +#include "include/utime.h" +#include "include/encoding.h" +#include "include/types.h" +#include <errno.h> + +struct cls_replica_log_item_marker { + string item_name; // the name of the item we're marking + utime_t item_timestamp; // the time stamp at which the item was outdated + + cls_replica_log_item_marker() {} + cls_replica_log_item_marker(const string& name, const utime_t& time) : + item_name(name), item_timestamp(time) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(item_name, bl); + ::encode(item_timestamp, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(item_name, bl); + ::decode(item_timestamp, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_item_marker*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_item_marker) + +struct cls_replica_log_progress_marker { + string entity_id; // the name of the entity setting the progress marker + string position_marker; // represents a log listing position on the master + utime_t position_time; // the timestamp associated with the position marker + std::list<cls_replica_log_item_marker> items; /* any items not caught up + to the position marker*/ + + cls_replica_log_progress_marker() {} + cls_replica_log_progress_marker(const string& entity, const string& marker, + const utime_t& time ) : + entity_id(entity), position_marker(marker), + position_time(time) {} + cls_replica_log_progress_marker(const string& entity, const string& marker, + const utime_t& time, + const std::list<cls_replica_log_item_marker> b) : + entity_id(entity), position_marker(marker), + position_time(time), + items(b) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entity_id, bl); + ::encode(position_marker, bl); + ::encode(position_time, bl); + ::encode(items, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entity_id, bl); + ::decode(position_marker, bl); + ::decode(position_time, bl); + ::decode(items, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_progress_marker*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_progress_marker) + +class cls_replica_log_bound { + /** + * Right now, we are lazy and only support a single marker at a time. In the + * future, we might support more than one, so the interface is designed to + * let that work. + */ + string position_marker; // represents a log listing position on the master + utime_t position_time; // the timestamp associated with the position marker + bool marker_exists; // has the marker been set? + cls_replica_log_progress_marker marker; // the status of the current locker + +public: + cls_replica_log_bound() : marker_exists(false) {} + + int update_marker(const cls_replica_log_progress_marker& new_mark) { + // only one marker at a time right now + if (marker_exists && (marker.entity_id != new_mark.entity_id)) { + return -EEXIST; + } + // can't go backwards with our one marker! + if (marker_exists && (marker.position_time > new_mark.position_time)) { + return -EINVAL; + } + + marker = new_mark; + position_marker = new_mark.position_marker; + position_time = new_mark.position_time; + marker_exists = true; + // hey look, updating is idempotent; did you notice that? + return 0; + } + + int delete_marker(const string& entity_id) { + if (marker_exists) { + // ENOENT if our marker doesn't match the passed ID + if (marker.entity_id != entity_id) { + return -ENOENT; + } + // you can't delete it if there are unclean entries + if (!marker.items.empty()) { + return -ENOTEMPTY; + } + } + + marker_exists = false; + marker = cls_replica_log_progress_marker(); + // hey look, deletion is idempotent! Hurray. + return 0; + } + + std::string get_lowest_marker_bound() { + return position_marker; + } + + utime_t get_lowest_time_bound() { + return position_time; + } + + utime_t get_oldest_time() { + utime_t oldest = position_time; + list<cls_replica_log_item_marker>::const_iterator i; + for ( i = marker.items.begin(); i != marker.items.end(); ++i) { + if (i->item_timestamp < oldest) + oldest = i->item_timestamp; + } + return oldest; + } + + void get_markers(list<cls_replica_log_progress_marker>& ls) { + if (marker_exists) { + ls.push_back(marker); + } + } + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(position_marker, bl); + ::encode(position_time, bl); + ::encode(marker_exists, bl); + if (marker_exists) { + ::encode(marker, bl); + } + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(position_marker, bl); + ::decode(position_time, bl); + ::decode(marker_exists, bl); + if (marker_exists) { + ::decode(marker, bl); + } + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(std::list<cls_replica_log_bound*>& ls); +}; +WRITE_CLASS_ENCODER(cls_replica_log_bound); + +#endif /* CLS_REPLICA_LOG_TYPES_H_ */ diff --git a/src/cls/statelog/cls_statelog.cc b/src/cls/statelog/cls_statelog.cc new file mode 100644 index 00000000000..f2cbbf79672 --- /dev/null +++ b/src/cls/statelog/cls_statelog.cc @@ -0,0 +1,318 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <iostream> + +#include <string.h> +#include <stdlib.h> +#include <errno.h> + +#include "include/types.h" +#include "include/utime.h" +#include "objclass/objclass.h" + +#include "cls_statelog_types.h" +#include "cls_statelog_ops.h" + +#include "global/global_context.h" + +CLS_VER(1,0) +CLS_NAME(statelog) + +cls_handle_t h_class; +cls_method_handle_t h_statelog_add; +cls_method_handle_t h_statelog_list; +cls_method_handle_t h_statelog_remove; +cls_method_handle_t h_statelog_check_state; + +static string statelog_index_by_client_prefix = "1_"; +static string statelog_index_by_object_prefix = "2_"; + + +static int write_statelog_entry(cls_method_context_t hctx, const string& index, const cls_statelog_entry& entry) +{ + bufferlist bl; + ::encode(entry, bl); + + int ret = cls_cxx_map_set_val(hctx, index, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index_by_client(const string& client_id, const string& op_id, string& index) +{ + index = statelog_index_by_client_prefix; + index.append(client_id + "_" + op_id); +} + +static void get_index_by_client(cls_statelog_entry& entry, string& index) +{ + get_index_by_client(entry.client_id, entry.op_id, index); +} + +static void get_index_by_object(const string& object, const string& op_id, string& index) +{ + char buf[16]; + snprintf(buf, sizeof(buf), "%d_", (int)object.size()); + + index = statelog_index_by_object_prefix + buf; /* append object length to ensure uniqueness */ + index.append(object + "_" + op_id); +} + +static void get_index_by_object(cls_statelog_entry& entry, string& index) +{ + get_index_by_object(entry.object, entry.op_id, index); +} + +static int get_existing_entry(cls_method_context_t hctx, const string& client_id, + const string& op_id, const string& object, + cls_statelog_entry& entry) +{ + if ((object.empty() && client_id.empty()) || op_id.empty()) { + return -EINVAL; + } + + string obj_index; + if (!object.empty()) { + get_index_by_object(object, op_id, obj_index); + } else { + get_index_by_client(client_id, op_id, obj_index); + } + + bufferlist bl; + int rc = cls_cxx_map_get_val(hctx, obj_index, &bl); + if (rc < 0) { + CLS_LOG(0, "could not find entry %s", obj_index.c_str()); + return rc; + } + try { + bufferlist::iterator iter = bl.begin(); + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: failed to decode entry %s", obj_index.c_str()); + return -EIO; + } + + if ((!object.empty() && entry.object != object) || + (!client_id.empty() && entry.client_id != client_id)){ + /* ouch, we were passed inconsistent client_id / object */ + CLS_LOG(0, "data mismatch: object=%s client_id=%s entry: object=%s client_id=%s", + object.c_str(), client_id.c_str(), entry.object.c_str(), entry.client_id.c_str()); + return -EINVAL; + } + + return 0; +} + +static int cls_statelog_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_add_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_add_op(): failed to decode op"); + return -EINVAL; + } + + for (list<cls_statelog_entry>::iterator iter = op.entries.begin(); + iter != op.entries.end(); ++iter) { + cls_statelog_entry& entry = *iter; + + string index_by_client; + + get_index_by_client(entry, index_by_client); + + CLS_LOG(0, "storing entry by client/op at %s", index_by_client.c_str()); + + int ret = write_statelog_entry(hctx, index_by_client, entry); + if (ret < 0) + return ret; + + string index_by_obj; + + get_index_by_object(entry, index_by_obj); + + CLS_LOG(0, "storing entry by object at %s", index_by_obj.c_str()); + ret = write_statelog_entry(hctx, index_by_obj, entry); + if (ret < 0) + return ret; + + } + + return 0; +} + +static int cls_statelog_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_list_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_list_op(): failed to decode op"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string match_prefix; + + if (!op.client_id.empty()) { + get_index_by_client(op.client_id, op.op_id, match_prefix); + } else if (!op.object.empty()) { + get_index_by_object(op.object, op.op_id, match_prefix); + } else { + match_prefix = statelog_index_by_object_prefix; + } + + if (op.marker.empty()) { + from_index = match_prefix; + } else { + from_index = op.marker; + } + +#define MAX_ENTRIES 1000 + size_t max_entries = op.max_entries; + if (!max_entries || max_entries > MAX_ENTRIES) + max_entries = MAX_ENTRIES; + + int rc = cls_cxx_map_get_vals(hctx, from_index, match_prefix, max_entries + 1, &keys); + if (rc < 0) + return rc; + + CLS_LOG(20, "from_index=%s match_prefix=%s", from_index.c_str(), match_prefix.c_str()); + cls_statelog_list_ret ret; + + list<cls_statelog_entry>& entries = ret.entries; + map<string, bufferlist>::iterator iter = keys.begin(); + + bool done = false; + string marker; + + size_t i; + for (i = 0; i < max_entries && iter != keys.end(); ++i, ++iter) { + const string& index = iter->first; + marker = index; + + bufferlist& bl = iter->second; + bufferlist::iterator biter = bl.begin(); + try { + cls_statelog_entry e; + ::decode(e, biter); + entries.push_back(e); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_statelog_list: could not decode entry, index=%s", index.c_str()); + } + } + + if (iter == keys.end()) + done = true; + else + ret.marker = marker; + + ret.truncated = !done; + + ::encode(ret, *out); + + return 0; +} + +static int cls_statelog_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_remove_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_remove_op(): failed to decode op"); + return -EINVAL; + } + + cls_statelog_entry entry; + + int rc = get_existing_entry(hctx, op.client_id, op.op_id, op.object, entry); + if (rc < 0) + return rc; + + string obj_index; + get_index_by_object(entry.object, entry.op_id, obj_index); + + rc = cls_cxx_map_remove_key(hctx, obj_index); + if (rc < 0) { + CLS_LOG(0, "ERROR: failed to remove key"); + return rc; + } + + string client_index; + get_index_by_client(entry.client_id, entry.op_id, client_index); + + rc = cls_cxx_map_remove_key(hctx, client_index); + if (rc < 0) { + CLS_LOG(0, "ERROR: failed to remove key"); + return rc; + } + + return 0; +} + +static int cls_statelog_check_state(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_statelog_check_state_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_statelog_check_state_op(): failed to decode op"); + return -EINVAL; + } + + if (op.object.empty() || op.op_id.empty()) { + CLS_LOG(0, "object name or op id not specified"); + return -EINVAL; + } + + string obj_index; + get_index_by_object(op.object, op.op_id, obj_index); + + bufferlist bl; + int rc = cls_cxx_map_get_val(hctx, obj_index, &bl); + if (rc < 0) { + CLS_LOG(0, "could not find entry %s", obj_index.c_str()); + return rc; + } + + cls_statelog_entry entry; + + rc = get_existing_entry(hctx, op.client_id, op.op_id, op.object, entry); + if (rc < 0) + return rc; + + if (entry.state != op.state) + return -ECANCELED; + + return 0; +} + +void __cls_init() +{ + CLS_LOG(1, "Loaded log class!"); + + cls_register("statelog", &h_class); + + /* log */ + cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_add, &h_statelog_add); + cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, cls_statelog_list, &h_statelog_list); + cls_register_cxx_method(h_class, "remove", CLS_METHOD_RD | CLS_METHOD_WR, cls_statelog_remove, &h_statelog_remove); + cls_register_cxx_method(h_class, "check_state", CLS_METHOD_RD, cls_statelog_check_state, &h_statelog_check_state); + + return; +} + diff --git a/src/cls/statelog/cls_statelog_client.cc b/src/cls/statelog/cls_statelog_client.cc new file mode 100644 index 00000000000..2ba38686a08 --- /dev/null +++ b/src/cls/statelog/cls_statelog_client.cc @@ -0,0 +1,128 @@ +#include <errno.h> + +#include "include/types.h" +#include "cls/statelog/cls_statelog_ops.h" +#include "include/rados/librados.hpp" + + +using namespace librados; + + +void cls_statelog_add(librados::ObjectWriteOperation& op, list<cls_statelog_entry>& entries) +{ + bufferlist in; + cls_statelog_add_op call; + call.entries = entries; + ::encode(call, in); + op.exec("statelog", "add", in); +} + +void cls_statelog_add(librados::ObjectWriteOperation& op, cls_statelog_entry& entry) +{ + bufferlist in; + cls_statelog_add_op call; + call.entries.push_back(entry); + ::encode(call, in); + op.exec("statelog", "add", in); +} + +void cls_statelog_add_prepare_entry(cls_statelog_entry& entry, const string& client_id, const string& op_id, + const string& object, const utime_t& timestamp, uint32_t state, bufferlist& bl) +{ + entry.client_id = client_id; + entry.op_id = op_id; + entry.object = object; + entry.timestamp = timestamp; + entry.state = state; + entry.data = bl; +} + +void cls_statelog_add(librados::ObjectWriteOperation& op, const string& client_id, const string& op_id, + const string& object, const utime_t& timestamp, uint32_t state, bufferlist& bl) + +{ + cls_statelog_entry entry; + + cls_statelog_add_prepare_entry(entry, client_id, op_id, object, timestamp, state, bl); + cls_statelog_add(op, entry); +} + +void cls_statelog_remove_by_client(librados::ObjectWriteOperation& op, const string& client_id, const string& op_id) +{ + bufferlist in; + cls_statelog_remove_op call; + call.client_id = client_id; + call.op_id = op_id; + ::encode(call, in); + op.exec("statelog", "remove", in); +} + +void cls_statelog_remove_by_object(librados::ObjectWriteOperation& op, const string& object, const string& op_id) +{ + bufferlist in; + cls_statelog_remove_op call; + call.object = object; + call.op_id = op_id; + ::encode(call, in); + op.exec("statelog", "remove", in); +} + +class StateLogListCtx : public ObjectOperationCompletion { + list<cls_statelog_entry> *entries; + string *marker; + bool *truncated; +public: + StateLogListCtx(list<cls_statelog_entry> *_entries, string *_marker, bool *_truncated) : + entries(_entries), marker(_marker), truncated(_truncated) {} + void handle_completion(int r, bufferlist& outbl) { + if (r >= 0) { + cls_statelog_list_ret ret; + try { + bufferlist::iterator iter = outbl.begin(); + ::decode(ret, iter); + if (entries) + *entries = ret.entries; + if (truncated) + *truncated = ret.truncated; + if (marker) + *marker = ret.marker; + } catch (buffer::error& err) { + // nothing we can do about it atm + } + } + } +}; + +void cls_statelog_list(librados::ObjectReadOperation& op, + const string& client_id, const string& op_id, const string& object, /* op_id may be empty, also one of client_id, object*/ + const string& in_marker, int max_entries, list<cls_statelog_entry>& entries, + string *out_marker, bool *truncated) +{ + bufferlist inbl; + cls_statelog_list_op call; + call.client_id = client_id; + call.op_id = op_id; + call.object = object; + call.marker = in_marker; + call.max_entries = max_entries; + + ::encode(call, inbl); + + op.exec("statelog", "list", inbl, new StateLogListCtx(&entries, out_marker, truncated)); +} + +void cls_statelog_check_state(librados::ObjectOperation& op, const string& client_id, const string& op_id, const string& object, uint32_t state) +{ + bufferlist inbl; + bufferlist outbl; + cls_statelog_check_state_op call; + call.client_id = client_id; + call.op_id = op_id; + call.object = object; + call.state = state; + + ::encode(call, inbl); + + op.exec("statelog", "check_state", inbl, NULL); +} + diff --git a/src/cls/statelog/cls_statelog_client.h b/src/cls/statelog/cls_statelog_client.h new file mode 100644 index 00000000000..7faf361e1dc --- /dev/null +++ b/src/cls/statelog/cls_statelog_client.h @@ -0,0 +1,29 @@ +#ifndef CEPH_CLS_STATELOG_CLIENT_H +#define CEPH_CLS_STATELOG_CLIENT_H + +#include "include/types.h" +#include "include/rados/librados.hpp" +#include "cls_statelog_types.h" + +/* + * log objclass + */ + +void cls_statelog_add_prepare_entry(cls_statelog_entry& entry, const string& client_id, const string& op_id, + const string& object, const utime_t& timestamp, uint32_t state, bufferlist& bl); + +void cls_statelog_add(librados::ObjectWriteOperation& op, list<cls_statelog_entry>& entry); +void cls_statelog_add(librados::ObjectWriteOperation& op, cls_statelog_entry& entry); +void cls_statelog_add(librados::ObjectWriteOperation& op, const string& client_id, const string& op_id, + const string& object, const utime_t& timestamp, uint32_t state, bufferlist& bl); + +void cls_statelog_list(librados::ObjectReadOperation& op, + const string& client_id, const string& op_id, const string& object, /* op_id may be empty, also one of client_id, object*/ + const string& in_marker, int max_entries, list<cls_statelog_entry>& entries, + string *out_marker, bool *truncated); + +void cls_statelog_remove_by_client(librados::ObjectWriteOperation& op, const string& client_id, const string& op_id); +void cls_statelog_remove_by_object(librados::ObjectWriteOperation& op, const string& object, const string& op_id); + +void cls_statelog_check_state(librados::ObjectOperation& op, const string& client_id, const string& op_id, const string& object, uint32_t state); +#endif diff --git a/src/cls/statelog/cls_statelog_ops.h b/src/cls/statelog/cls_statelog_ops.h new file mode 100644 index 00000000000..725fa863df7 --- /dev/null +++ b/src/cls/statelog/cls_statelog_ops.h @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_STATELOG_OPS_H +#define CEPH_CLS_STATELOG_OPS_H + +#include "include/types.h" +#include "cls_statelog_types.h" + +struct cls_statelog_add_op { + list<cls_statelog_entry> entries; + + cls_statelog_add_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_add_op) + +struct cls_statelog_list_op { + string object; + string client_id; + string op_id; + string marker; /* if not empty, overrides from_time */ + int max_entries; /* upperbound to returned num of entries + might return less than that and still be truncated */ + + cls_statelog_list_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(object, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(marker, bl); + ::encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(object, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(marker, bl); + ::decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_list_op) + +struct cls_statelog_list_ret { + list<cls_statelog_entry> entries; + string marker; + bool truncated; + + cls_statelog_list_ret() : truncated(false) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ::encode(marker, bl); + ::encode(truncated, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + ::decode(marker, bl); + ::decode(truncated, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_list_ret) + + +/* + * operation will return 0 when successfully removed but not done. Will return + * -ENODATA when done, so caller needs to repeat sending request until that. + */ +struct cls_statelog_remove_op { + string client_id; + string op_id; + string object; + + cls_statelog_remove_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(object, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(object, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_remove_op) + +struct cls_statelog_check_state_op { + string client_id; + string op_id; + string object; + uint32_t state; + + cls_statelog_check_state_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(object, bl); + ::encode(state, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(object, bl); + ::decode(state, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_statelog_check_state_op) + +#endif diff --git a/src/cls/statelog/cls_statelog_types.h b/src/cls/statelog/cls_statelog_types.h new file mode 100644 index 00000000000..f812c7b321c --- /dev/null +++ b/src/cls/statelog/cls_statelog_types.h @@ -0,0 +1,51 @@ +#ifndef CEPH_CLS_STATELOG_TYPES_H +#define CEPH_CLS_STATELOG_TYPES_H + +#include "include/encoding.h" +#include "include/types.h" + +#include "include/utime.h" + +class JSONObj; + +struct cls_statelog_entry { + string client_id; + string op_id; + string object; + utime_t timestamp; + bufferlist data; + uint32_t state; /* user defined state */ + + cls_statelog_entry() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(client_id, bl); + ::encode(op_id, bl); + ::encode(object, bl); + ::encode(timestamp, bl); + ::encode(data, bl); + ::encode(state, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(client_id, bl); + ::decode(op_id, bl); + ::decode(object, bl); + ::decode(timestamp, bl); + ::decode(data, bl); + ::decode(state, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(cls_statelog_entry) + + +#endif + + diff --git a/src/common/config_opts.h b/src/common/config_opts.h index ed52297bd78..66582c0407e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -602,6 +602,8 @@ OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket name rules for US region buckets OPTION(rgw_list_buckets_max_chunk, OPT_INT, 1000) // max buckets to retrieve in a single op when listing user buckets OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log +OPTION(rgw_num_zone_opstate_shards, OPT_INT, 128) // max shards for keeping inter-region copy progress info +OPTION(rgw_opstate_ratelimit_sec, OPT_INT, 30) // min time between opstate updates on a single upload (0 for disabling ratelimit) OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds) OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log diff --git a/src/rgw/rgw_acl_s3.cc b/src/rgw/rgw_acl_s3.cc index 4f26dda7d20..4c04e8c69e9 100644 --- a/src/rgw/rgw_acl_s3.cc +++ b/src/rgw/rgw_acl_s3.cc @@ -280,16 +280,18 @@ static const char *get_acl_header(RGWEnv *env, static int parse_grantee_str(RGWRados *store, string& grantee_str, const struct s3_acl_header *perm, ACLGrant& grant) { - string id_type, id_val; + string id_type, id_val_quoted; int rgw_perm = perm->rgw_perm; int ret; RGWUserInfo info; - ret = parse_key_value(grantee_str, id_type, id_val); + ret = parse_key_value(grantee_str, id_type, id_val_quoted); if (ret < 0) return ret; + string id_val = rgw_trim_quotes(id_val_quoted); + if (strcasecmp(id_type.c_str(), "emailAddress") == 0) { ret = rgw_get_user_info_by_email(store, id_val, info); if (ret < 0) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index d8382ca1990..435013b1e8d 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -26,6 +26,7 @@ using namespace std; #include "rgw_log.h" #include "rgw_formats.h" #include "rgw_usage.h" +#include "rgw_replica_log.h" #include "auth/Crypto.h" #define dout_subsys ceph_subsys_rgw @@ -94,6 +95,17 @@ void _usage() cerr << " bilog trim trim bucket index log (use start-marker, end-marker)\n"; cerr << " datalog list list data log\n"; cerr << " datalog trim trim data log\n"; + cerr << " opstate list list stateful operations entries (use client_id,\n"; + cerr << " op_id, object)\n"; + cerr << " opstate set set state on an entry (use client_id, op_id, object)\n"; + cerr << " opstate renewstate renew state on an entry (use client_id, op_id, object)\n"; + cerr << " opstate rmstate remove entry (use client_id, op_id, object)\n"; + cerr << " replicamdlog get get the replica metadata log\n"; + cerr << " replicamdlog delete delete the replica metadata log\n"; + cerr << " replicadatalog get get the replica data log\n"; + cerr << " replicadatalog delete delete the replica data log\n"; + cerr << " replicabucketlog get get the replica bucket log\n"; + cerr << " replicabucketlog delete delete the replica bucket log\n"; cerr << "options:\n"; cerr << " --uid=<id> user id\n"; cerr << " --subuser=<name> subuser name\n"; @@ -206,6 +218,18 @@ enum { OPT_BILOG_TRIM, OPT_DATALOG_LIST, OPT_DATALOG_TRIM, + OPT_OPSTATE_LIST, + OPT_OPSTATE_SET, + OPT_OPSTATE_RENEW, + OPT_OPSTATE_RM, + OPT_REPLICAMDLOG_GET, + OPT_REPLICAMDLOG_CREATE, + OPT_REPLICAMDLOG_DELETE, + OPT_REPLICADATALOG_GET, + OPT_REPLICADATALOG_CREATE, + OPT_REPLICADATALOG_DELETE, + OPT_REPLICABUCKETLOG_GET, + OPT_REPLICABUCKETLOG_DELETE }; static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) @@ -234,7 +258,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) strcmp(cmd, "metadata") == 0 || strcmp(cmd, "mdlog") == 0 || strcmp(cmd, "bilog") == 0 || - strcmp(cmd, "datalog") == 0) { + strcmp(cmd, "datalog") == 0 || + strcmp(cmd, "opstate") == 0 || + strcmp(cmd, "replicamdlog") == 0 || + strcmp(cmd, "replicadatalog") == 0 || + strcmp(cmd, "replicabucketlog") == 0) { *need_more = true; return 0; } @@ -384,6 +412,34 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) return OPT_DATALOG_LIST; if (strcmp(cmd, "trim") == 0) return OPT_DATALOG_TRIM; + } else if (strcmp(prev_cmd, "opstate") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_OPSTATE_LIST; + if (strcmp(cmd, "set") == 0) + return OPT_OPSTATE_SET; + if (strcmp(cmd, "renew") == 0) + return OPT_OPSTATE_RENEW; + if (strcmp(cmd, "rm") == 0) + return OPT_OPSTATE_RM; + } else if (strcmp(prev_cmd, "replicamdlog") == 0) { + if (strcmp(cmd, "get") == 0) + return OPT_REPLICAMDLOG_GET; + if (strcmp(cmd, "create") == 0) + return OPT_REPLICAMDLOG_CREATE; + if (strcmp(cmd, "delete") == 0) + return OPT_REPLICAMDLOG_DELETE; + } else if (strcmp(prev_cmd, "replicadatalog") == 0) { + if (strcmp(cmd, "get") == 0) + return OPT_REPLICADATALOG_GET; + if (strcmp(cmd, "create") == 0) + return OPT_REPLICADATALOG_CREATE; + if (strcmp(cmd, "delete") == 0) + return OPT_REPLICADATALOG_DELETE; + } else if (strcmp(prev_cmd, "replicabucketlog") == 0) { + if (strcmp(cmd, "get") == 0) + return OPT_REPLICABUCKETLOG_GET; + if (strcmp(cmd, "delete") == 0) + return OPT_REPLICABUCKETLOG_DELETE; } return -EINVAL; @@ -438,7 +494,8 @@ static void dump_bucket_usage(map<RGWObjCategory, RGWBucketStats>& stats, Format int bucket_stats(rgw_bucket& bucket, Formatter *formatter) { RGWBucketInfo bucket_info; - int r = store->get_bucket_info(NULL, bucket.name, bucket_info, NULL); + time_t mtime; + int r = store->get_bucket_info(NULL, bucket.name, bucket_info, NULL, &mtime); if (r < 0) return r; @@ -457,6 +514,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); + formatter->dump_int("mtime", mtime); formatter->dump_int("ver", bucket_ver); formatter->dump_int("master_ver", master_ver); dump_bucket_usage(stats, formatter); @@ -478,7 +536,7 @@ static int init_bucket(string& bucket_name, rgw_bucket& bucket) { if (!bucket_name.empty()) { RGWBucketInfo bucket_info; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL); + int r = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL, NULL); if (r < 0) { cerr << "could not get bucket info for bucket=" << bucket_name << std::endl; return r; @@ -646,6 +704,11 @@ int main(int argc, char **argv) bool system_specified = false; int shard_id = -1; bool specified_shard_id = false; + string daemon_id; + bool specified_daemon_id = false; + string client_id; + string op_id; + string state_str; std::string val; std::ostringstream errs; @@ -674,6 +737,12 @@ int main(int argc, char **argv) pool_name = val; } else if (ceph_argparse_witharg(args, i, &val, "-o", "--object", (char*)NULL)) { object = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-id", (char*)NULL)) { + client_id = val; + } else if (ceph_argparse_witharg(args, i, &val, "--op-id", (char*)NULL)) { + op_id = val; + } else if (ceph_argparse_witharg(args, i, &val, "--state", (char*)NULL)) { + state_str = val; } else if (ceph_argparse_witharg(args, i, &val, "--key-type", (char*)NULL)) { key_type_str = val; if (key_type_str.compare("swift") == 0) { @@ -716,6 +785,9 @@ int main(int argc, char **argv) } else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) { shard_id = atoi(val.c_str()); specified_shard_id = true; + } else if (ceph_argparse_witharg(args, i, &val, "--daemon-id", (char*)NULL)) { + daemon_id = val; + specified_daemon_id = true; } else if (ceph_argparse_witharg(args, i, &val, "--access", (char*)NULL)) { access = val; perm_mask = rgw_str_to_perm(access.c_str()); @@ -1768,7 +1840,6 @@ next: if (ret < 0) return -ret; - int i = (specified_shard_id ? shard_id : 0); formatter->open_array_section("entries"); @@ -1778,7 +1849,7 @@ next: list<cls_log_entry> entries; - meta_log->init_list_entries(i, start_time, end_time, &handle); + meta_log->init_list_entries(i, start_time, end_time, marker, &handle); bool truncated; do { @@ -1946,5 +2017,179 @@ next: return -ret; } } + + if (opt_cmd == OPT_OPSTATE_LIST) { + RGWOpState oc(store); + + int max = 1000; + + void *handle; + oc.init_list_entries(client_id, op_id, object, &handle); + list<cls_statelog_entry> entries; + bool done; + formatter->open_array_section("entries"); + do { + int ret = oc.list_entries(handle, max, entries, &done); + if (ret < 0) { + cerr << "oc.list_entries returned " << cpp_strerror(-ret) << std::endl; + oc.finish_list_entries(handle); + return -ret; + } + + for (list<cls_statelog_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + oc.dump_entry(*iter, formatter); + } + + formatter->flush(cout); + } while (!done); + formatter->close_section(); + formatter->flush(cout); + oc.finish_list_entries(handle); + } + + if (opt_cmd == OPT_OPSTATE_SET || opt_cmd == OPT_OPSTATE_RENEW) { + RGWOpState oc(store); + + RGWOpState::OpState state; + if (object.empty() || client_id.empty() || op_id.empty()) { + cerr << "ERROR: need to specify client_id, op_id, and object" << std::endl; + return EINVAL; + } + if (state_str.empty()) { + cerr << "ERROR: state was not specified" << std::endl; + return EINVAL; + } + int ret = oc.state_from_str(state_str, &state); + if (ret < 0) { + cerr << "ERROR: invalid state: " << state_str << std::endl; + return -ret; + } + + if (opt_cmd == OPT_OPSTATE_SET) { + ret = oc.set_state(client_id, op_id, object, state); + if (ret < 0) { + cerr << "ERROR: failed to set state: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } else { + ret = oc.renew_state(client_id, op_id, object, state); + if (ret < 0) { + cerr << "ERROR: failed to renew state: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + } + if (opt_cmd == OPT_OPSTATE_RM) { + RGWOpState oc(store); + + if (object.empty() || client_id.empty() || op_id.empty()) { + cerr << "ERROR: need to specify client_id, op_id, and object" << std::endl; + return EINVAL; + } + ret = oc.remove_entry(client_id, op_id, object); + if (ret < 0) { + cerr << "ERROR: failed to set state: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_REPLICAMDLOG_GET) { + if (!specified_shard_id) { + cerr << "ERROR: shard-id must be specified for get operation" << std::endl; + return EINVAL; + } + + RGWReplicaObjectLogger logger(store, pool_name, META_REPLICA_LOG_OBJ_PREFIX); + string pos_marker; + utime_t time_marker; + list<cls_replica_log_progress_marker> markers; + int ret = logger.get_bounds(shard_id, pos_marker, time_marker, markers); + if (ret < 0) + return -ret; + encode_json("markers", markers, formatter); + } + + if (opt_cmd == OPT_REPLICAMDLOG_DELETE) { + if (!specified_shard_id) { + cerr << "ERROR: shard-id must be specified for delete operation" << std::endl; + return EINVAL; + } + if (!specified_daemon_id) { + cerr << "ERROR: daemon-id must be specified for delete operation" << std::endl; + return EINVAL; + } + RGWReplicaObjectLogger logger(store, pool_name, META_REPLICA_LOG_OBJ_PREFIX); + int ret = logger.delete_bound(shard_id, daemon_id); + if (ret < 0) + return -ret; + } + + if (opt_cmd == OPT_REPLICADATALOG_GET) { + if (!specified_shard_id) { + cerr << "ERROR: shard-id must be specified for get operation" << std::endl; + return EINVAL; + } + RGWReplicaObjectLogger logger(store, pool_name, DATA_REPLICA_LOG_OBJ_PREFIX); + string pos_marker; + utime_t time_marker; + list<cls_replica_log_progress_marker> markers; + int ret = logger.get_bounds(shard_id, pos_marker, time_marker, markers); + if (ret < 0) + return -ret; + encode_json("markers", markers, formatter); + } + + if (opt_cmd == OPT_REPLICADATALOG_DELETE) { + if (!specified_shard_id) { + cerr << "ERROR: shard-id must be specified for delete operation" << std::endl; + return EINVAL; + } + if (!specified_daemon_id) { + cerr << "ERROR: daemon-id must be specified for delete operation" << std::endl; + return EINVAL; + } + RGWReplicaObjectLogger logger(store, pool_name, DATA_REPLICA_LOG_OBJ_PREFIX); + int ret = logger.delete_bound(shard_id, daemon_id); + if (ret < 0) + return -ret; + } + + if (opt_cmd == OPT_REPLICABUCKETLOG_GET) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return -EINVAL; + } + int ret = init_bucket(bucket_name, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWReplicaBucketLogger logger(store); + string pos_marker; + utime_t time_marker; + list<cls_replica_log_progress_marker> markers; + ret = logger.get_bounds(bucket, pos_marker, time_marker, markers); + if (ret < 0) + return -ret; + encode_json("markers", markers, formatter); + } + + if (opt_cmd == OPT_REPLICABUCKETLOG_DELETE) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return -EINVAL; + } + int ret = init_bucket(bucket_name, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWReplicaBucketLogger logger(store); + ret = logger.delete_bound(bucket, daemon_id); + if (ret < 0) + return -ret; + } return 0; } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 52c7b6ebdc2..7b758b08fd1 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -72,7 +72,7 @@ int rgw_read_user_buckets(RGWRados *store, string user_id, RGWUserBuckets& bucke return 0; } -int rgw_add_bucket(RGWRados *store, string user_id, rgw_bucket& bucket) +int rgw_add_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time) { int ret; string& bucket_name = bucket.name; @@ -82,7 +82,10 @@ int rgw_add_bucket(RGWRados *store, string user_id, rgw_bucket& bucket) RGWBucketEnt new_bucket; new_bucket.bucket = bucket; new_bucket.size = 0; - time(&new_bucket.mtime); + if (!creation_time) + time(&new_bucket.creation_time); + else + new_bucket.creation_time = creation_time; ::encode(new_bucket, bl); string buckets_obj_id; @@ -118,8 +121,9 @@ int rgw_remove_user_bucket_info(RGWRados *store, string user_id, rgw_bucket& buc } int rgw_bucket_store_info(RGWRados *store, string& bucket_name, bufferlist& bl, bool exclusive, - map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker) { - return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, pattrs); + map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, + time_t mtime) { + return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs); } @@ -139,14 +143,18 @@ int RGWBucket::create_bucket(string bucket_str, string& user_id, string& region_ policy.encode(aclbl); RGWObjVersionTracker objv_tracker; + time_t mtime; - ret = store->get_bucket_info(NULL, bucket_str, bucket_info, &objv_tracker); + ret = store->get_bucket_info(NULL, bucket_str, bucket_info, &objv_tracker, &mtime); if (ret < 0) return ret; rgw_bucket& bucket = bucket_info.bucket; - ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker, NULL); + RGWBucketInfo new_info; + + ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker, + NULL, bucket_info.creation_time, NULL, &new_info); if (ret && ret != -EEXIST) goto done; @@ -158,7 +166,7 @@ int RGWBucket::create_bucket(string bucket_str, string& user_id, string& region_ goto done; } - ret = rgw_add_bucket(store, user_id, bucket); + ret = rgw_add_bucket(store, user_id, bucket, new_info.creation_time); if (ret == -EEXIST) ret = 0; @@ -218,7 +226,8 @@ void check_bad_user_bucket_mapping(RGWRados *store, const string& user_id, bool RGWBucketInfo bucket_info; RGWObjVersionTracker objv_tracker; - int r = store->get_bucket_info(NULL, bucket.name, bucket_info, &objv_tracker); + time_t mtime; + int r = store->get_bucket_info(NULL, bucket.name, bucket_info, &objv_tracker, &mtime); if (r < 0) { ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl; continue; @@ -234,7 +243,7 @@ void check_bad_user_bucket_mapping(RGWRados *store, const string& user_id, bool cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl; if (fix) { cout << "fixing" << std::endl; - r = rgw_add_bucket(store, user_id, actual_bucket); + r = rgw_add_bucket(store, user_id, actual_bucket, bucket_info.creation_time); if (r < 0) { cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl; } @@ -283,7 +292,7 @@ int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children) obj.bucket = bucket; int max = 1000; - ret = rgw_get_system_obj(store, NULL, store->zone.domain_root, bucket.name, bl, NULL); + ret = rgw_get_system_obj(store, NULL, store->zone.domain_root, bucket.name, bl, NULL, NULL); bufferlist::iterator iter = bl.begin(); try { @@ -363,7 +372,7 @@ int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state) if (!bucket_name.empty()) { RGWObjVersionTracker objv_tracker; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &objv_tracker); + int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &objv_tracker, NULL); if (r < 0) { ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl; return r; @@ -441,7 +450,7 @@ int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg) if (r < 0) return r; - r = rgw_add_bucket(store, user_id, bucket); + r = rgw_add_bucket(store, user_id, bucket, 0); if (r < 0) return r; } else { @@ -849,7 +858,8 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f map<RGWObjCategory, RGWBucketStats> stats; RGWObjVersionTracker objv_tracker; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &objv_tracker); + time_t mtime; + int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &objv_tracker, &mtime); if (r < 0) return r; @@ -871,6 +881,7 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f formatter->dump_string("owner", bucket_info.owner); formatter->dump_int("ver", bucket_ver); formatter->dump_int("master_ver", master_ver); + formatter->dump_int("mtime", mtime); dump_bucket_usage(stats, formatter); formatter->close_section(); @@ -1148,7 +1159,7 @@ int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end list<cls_log_entry> log_entries; int ret = store->time_log_list(oids[shard], start_time, end_time, - max_entries, log_entries, marker, truncated); + max_entries, log_entries, marker, truncated); if (ret < 0) return ret; @@ -1171,7 +1182,6 @@ int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int max_entries, list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated) { bool truncated; - entries.clear(); for (; marker.shard < num_shards && (int)entries.size() < max_entries; @@ -1277,8 +1287,9 @@ struct RGWBucketCompleteInfo { class RGWBucketMetadataObject : public RGWMetadataObject { RGWBucketCompleteInfo info; public: - RGWBucketMetadataObject(RGWBucketCompleteInfo& i, obj_version& v) : info(i) { + RGWBucketMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, time_t m) : info(i) { objv = v; + mtime = m; } void dump(Formatter *f) const { @@ -1290,7 +1301,7 @@ class RGWBucketMetadataHandler : public RGWMetadataHandler { int init_bucket(RGWRados *store, string& bucket_name, rgw_bucket& bucket, RGWObjVersionTracker *objv_tracker) { RGWBucketInfo bucket_info; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, objv_tracker); + int r = store->get_bucket_info(NULL, bucket_name, bucket_info, objv_tracker, NULL); if (r < 0) { cerr << "could not get bucket info for bucket=" << bucket_name << std::endl; return r; @@ -1307,32 +1318,54 @@ public: RGWBucketCompleteInfo bci; RGWObjVersionTracker objv_tracker; + time_t mtime; - int ret = store->get_bucket_info(NULL, entry, bci.info, &objv_tracker, &bci.attrs); + int ret = store->get_bucket_info(NULL, entry, bci.info, &objv_tracker, &mtime, &bci.attrs); if (ret < 0) return ret; - RGWBucketMetadataObject *mdo = new RGWBucketMetadataObject(bci, objv_tracker.read_version); + RGWBucketMetadataObject *mdo = new RGWBucketMetadataObject(bci, objv_tracker.read_version, mtime); *obj = mdo; return 0; } - int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, JSONObj *obj) { + int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) { RGWBucketCompleteInfo bci, old_bci; decode_json_obj(bci, obj); + time_t orig_mtime; - int ret = store->get_bucket_info(NULL, entry, old_bci.info, &objv_tracker, &old_bci.attrs); + int ret = store->get_bucket_info(NULL, entry, old_bci.info, &objv_tracker, &orig_mtime, &old_bci.attrs); if (ret < 0 && ret != -ENOENT) return ret; - ret = store->put_bucket_info(entry, bci.info, false, &objv_tracker, &bci.attrs); + if (ret == -ENOENT || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) { + /* a new bucket, we need to select a new bucket placement for it */ + rgw_bucket bucket; + ret = store->select_bucket_placement(entry, bucket); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl; + return ret; + } + bci.info.bucket.data_pool = bucket.data_pool; + bci.info.bucket.index_pool = bucket.index_pool; + } else { + /* existing bucket, keep its placement pools */ + bci.info.bucket.data_pool = old_bci.info.bucket.data_pool; + bci.info.bucket.index_pool = old_bci.info.bucket.index_pool; + } + + ret = store->put_bucket_info(entry, bci.info, false, &objv_tracker, mtime, &bci.attrs); + if (ret < 0) + return ret; + + ret = store->init_bucket_index(bci.info.bucket); if (ret < 0) return ret; - ret = rgw_add_bucket(store, bci.info.owner, bci.info.bucket); + ret = rgw_add_bucket(store, bci.info.owner, bci.info.bucket, bci.info.creation_time); if (ret < 0) return ret; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index cec6192b75e..f4ff4ec15cc 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -23,7 +23,8 @@ using namespace std; extern void rgw_get_buckets_obj(string& user_id, string& buckets_obj_id); extern int rgw_bucket_store_info(RGWRados *store, string& bucket_name, bufferlist& bl, bool exclusive, - map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker); + map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, + time_t mtime); extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker); @@ -93,7 +94,7 @@ extern void rgw_bucket_init(RGWMetadataManager *mm); extern int rgw_read_user_buckets(RGWRados *store, string user_id, RGWUserBuckets& buckets, const string& marker, uint64_t max, bool need_stats); -extern int rgw_add_bucket(RGWRados *store, string user_id, rgw_bucket& bucket); +extern int rgw_add_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time); extern int rgw_remove_user_bucket_info(RGWRados *store, string user_id, rgw_bucket& bucket); extern int rgw_remove_object(RGWRados *store, rgw_bucket& bucket, std::string& object); @@ -343,11 +344,11 @@ public: list<rgw_data_change>& entries, string& marker, bool *truncated); int trim_entries(int shard_id, utime_t& start_time, utime_t& end_time); int trim_entries(utime_t& start_time, utime_t& end_time); - int lock_exclusive(int shard_id, utime_t& duration, string& owner_id) { - return store->lock_exclusive(store->zone.log_pool, oids[shard_id], duration, owner_id); + int lock_exclusive(int shard_id, utime_t& duration, string& zone_id, string& owner_id) { + return store->lock_exclusive(store->zone.log_pool, oids[shard_id], duration, zone_id, owner_id); } - int unlock(int shard_id, string& owner_id) { - return store->unlock(store->zone.log_pool, oids[shard_id], owner_id); + int unlock(int shard_id, string& zone_id, string& owner_id) { + return store->unlock(store->zone.log_pool, oids[shard_id], zone_id, owner_id); } struct LogMarker { int shard; diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 6514a6171de..1a36e1a78d2 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -199,7 +199,7 @@ public: map<std::string, bufferlist>& attrs, RGWObjCategory category, int flags, map<std::string, bufferlist>* rmattrs, const bufferlist *data, RGWObjManifest *manifest, const string *ptag, list<string> *remove_objs, - bool modify_version, RGWObjVersionTracker *objv_tracker); + bool modify_version, RGWObjVersionTracker *objv_tracker, time_t set_mtime); int put_obj_data(void *ctx, rgw_obj& obj, const char *data, off_t ofs, size_t len, bool exclusive); @@ -379,7 +379,7 @@ int RGWCache<T>::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, time_ map<std::string, bufferlist>& attrs, RGWObjCategory category, int flags, map<std::string, bufferlist>* rmattrs, const bufferlist *data, RGWObjManifest *manifest, const string *ptag, list<string> *remove_objs, - bool modify_version, RGWObjVersionTracker *objv_tracker) + bool modify_version, RGWObjVersionTracker *objv_tracker, time_t set_mtime) { rgw_bucket bucket; string oid; @@ -401,7 +401,7 @@ int RGWCache<T>::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, time_ } } int ret = T::put_obj_meta_impl(ctx, obj, size, mtime, attrs, category, flags, rmattrs, data, manifest, ptag, remove_objs, - modify_version, objv_tracker); + modify_version, objv_tracker, set_mtime); if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index efcf4305f00..1f31981a760 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -148,6 +148,7 @@ req_state::req_state(CephContext *_cct, struct RGWEnv *e) : cct(_cct), cio(NULL) length = NULL; copy_source = NULL; http_auth = NULL; + local_source = false; obj_ctx = NULL; } @@ -161,6 +162,74 @@ req_state::~req_state() { free((void *)bucket_name); } +struct str_len { + const char *str; + int len; +}; + +#define STR_LEN_ENTRY(s) { s, sizeof(s) - 1 } + +struct str_len meta_prefixes[] = { STR_LEN_ENTRY("HTTP_X_AMZ"), + STR_LEN_ENTRY("HTTP_X_GOOG"), + STR_LEN_ENTRY("HTTP_X_DHO"), + STR_LEN_ENTRY("HTTP_X_RGW"), + STR_LEN_ENTRY("HTTP_X_OBJECT"), + STR_LEN_ENTRY("HTTP_X_CONTAINER"), + {NULL, 0} }; + + +void req_info::init_meta_info(bool *found_bad_meta) +{ + x_meta_map.clear(); + + map<string, string>& m = env->get_map(); + map<string, string>::iterator iter; + for (iter = m.begin(); iter != m.end(); ++iter) { + const char *prefix; + const string& header_name = iter->first; + const string& val = iter->second; + for (int prefix_num = 0; (prefix = meta_prefixes[prefix_num].str) != NULL; prefix_num++) { + int len = meta_prefixes[prefix_num].len; + const char *p = header_name.c_str(); + if (strncmp(p, prefix, len) == 0) { + dout(10) << "meta>> " << p << dendl; + const char *name = p+len; /* skip the prefix */ + int name_len = header_name.size() - len; + + if (found_bad_meta && strncmp(name, "_META_", name_len) == 0) + *found_bad_meta = true; + + char name_low[meta_prefixes[0].len + name_len + 1]; + snprintf(name_low, meta_prefixes[0].len - 5 + name_len + 1, "%s%s", meta_prefixes[0].str + 5 /* skip HTTP_ */, name); // normalize meta prefix + int j; + for (j = 0; name_low[j]; j++) { + if (name_low[j] != '_') + name_low[j] = tolower(name_low[j]); + else + name_low[j] = '-'; + } + name_low[j] = 0; + + map<string, string>::iterator iter; + iter = x_meta_map.find(name_low); + if (iter != x_meta_map.end()) { + string old = iter->second; + int pos = old.find_last_not_of(" \t"); /* get rid of any whitespaces after the value */ + old = old.substr(0, pos + 1); + old.append(","); + old.append(val); + x_meta_map[name_low] = old; + } else { + x_meta_map[name_low] = val; + } + } + } + } + for (iter = x_meta_map.begin(); iter != x_meta_map.end(); ++iter) { + dout(10) << "x>> " << iter->first << ":" << iter->second << dendl; + } +} + std::ostream& operator<<(std::ostream& oss, const rgw_err &err) { oss << "rgw_err(http_ret=" << err.http_ret << ", s3='" << err.s3_code << "') "; @@ -667,6 +736,55 @@ bool url_decode(string& src_str, string& dest_str) return true; } +string rgw_trim_whitespace(const string& src) +{ + if (src.empty()) { + return string(); + } + + int start = 0; + for (; start != (int)src.size(); start++) { + if (!isspace(src[start])) + break; + } + + int end = src.size() - 1; + if (end <= start) { + return string(); + } + + for (; end > start; end--) { + if (!isspace(src[end])) + break; + } + + return src.substr(start, end - start + 1); +} + +string rgw_trim_quotes(const string& val) +{ + string s = rgw_trim_whitespace(val); + if (s.size() < 2) + return s; + + int start = 0; + int end = s.size() - 1; + int quotes_count = 0; + + if (s[start] == '"') { + start++; + quotes_count++; + } + if (s[end] == '"') { + end--; + quotes_count++; + } + if (quotes_count == 2) { + return s.substr(start, end - start + 1); + } + return s; +} + static struct { const char *type_name; uint32_t perm; diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 219fb504f2c..3a4b76a09b0 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -43,6 +43,9 @@ using ceph::crypto::MD5; #define RGW_ATTR_PREFIX "user.rgw." +#define RGW_HTTP_RGWX_ATTR_PREFIX "RGWX_ATTR_" +#define RGW_HTTP_RGWX_ATTR_PREFIX_OUT "Rgwx-Attr-" + #define RGW_AMZ_META_PREFIX "x-amz-meta-" #define RGW_SYS_PARAM_PREFIX "rgwx-" @@ -594,17 +597,19 @@ struct RGWBucketInfo string owner; uint32_t flags; string region; + time_t creation_time; void encode(bufferlist& bl) const { - ENCODE_START(5, 4, bl); + ENCODE_START(6, 4, bl); ::encode(bucket, bl); ::encode(owner, bl); ::encode(flags, bl); ::encode(region, bl); + ::encode(creation_time, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN_32(5, 4, 4, bl); + DECODE_START_LEGACY_COMPAT_LEN_32(6, 4, 4, bl); ::decode(bucket, bl); if (struct_v >= 2) ::decode(owner, bl); @@ -612,6 +617,8 @@ struct RGWBucketInfo ::decode(flags, bl); if (struct_v >= 5) ::decode(region, bl); + if (struct_v >= 6) + ::decode(creation_time, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -619,7 +626,7 @@ struct RGWBucketInfo void decode_json(JSONObj *obj); - RGWBucketInfo() : flags(0) {} + RGWBucketInfo() : flags(0), creation_time(0) {} }; WRITE_CLASS_ENCODER(RGWBucketInfo) @@ -683,6 +690,7 @@ struct req_info { req_info(CephContext *cct, RGWEnv *_env); void rebuild_from(req_info& src); + void init_meta_info(bool *found_nad_meta); }; /** Store all the state necessary to complete and respond to an HTTP request*/ @@ -731,6 +739,7 @@ struct req_state { bool has_acl_header; const char *copy_source; const char *http_auth; + bool local_source; /* source is local */ int prot_flags; @@ -774,15 +783,15 @@ struct RGWBucketEnt { rgw_bucket bucket; size_t size; size_t size_rounded; - time_t mtime; + time_t creation_time; uint64_t count; - RGWBucketEnt() : size(0), size_rounded(0), mtime(0), count(0) {} + RGWBucketEnt() : size(0), size_rounded(0), creation_time(0), count(0) {} void encode(bufferlist& bl) const { ENCODE_START(5, 5, bl); uint64_t s = size; - __u32 mt = mtime; + __u32 mt = creation_time; string empty_str; // originally had the bucket name here, but we encode bucket later ::encode(empty_str, bl); ::encode(s, bl); @@ -802,7 +811,7 @@ struct RGWBucketEnt { ::decode(s, bl); ::decode(mt, bl); size = s; - mtime = mt; + creation_time = mt; if (struct_v >= 2) ::decode(count, bl); if (struct_v >= 3) @@ -814,13 +823,6 @@ struct RGWBucketEnt { } void dump(Formatter *f) const; static void generate_test_instances(list<RGWBucketEnt*>& o); - void clear() { - bucket.clear(); - size = 0; - size_rounded = 0; - mtime = 0; - count = 0; - } }; WRITE_CLASS_ENCODER(RGWBucketEnt) @@ -1126,6 +1128,9 @@ extern int parse_time(const char *time_str, time_t *time); extern bool parse_rfc2616(const char *s, struct tm *t); extern bool parse_iso8601(const char *s, struct tm *t); extern int parse_date(const string& date, uint64_t *epoch, string *out_date = NULL, string *out_time = NULL); +extern string rgw_trim_whitespace(const string& src); +extern string rgw_trim_quotes(const string& val); + /** Check if the req_state's user has the necessary permissions * to do the requested action */ diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 7b807a3e3bb..afdf1d6d8b3 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -435,6 +435,7 @@ void rgw_bucket::decode_json(JSONObj *obj) { void RGWBucketInfo::dump(Formatter *f) const { encode_json("bucket", bucket, f); + encode_json("creation_time", creation_time, f); encode_json("owner", owner, f); encode_json("flags", flags, f); encode_json("region", region, f); @@ -442,6 +443,7 @@ void RGWBucketInfo::dump(Formatter *f) const void RGWBucketInfo::decode_json(JSONObj *obj) { JSONDecoder::decode_json("bucket", bucket, obj); + JSONDecoder::decode_json("creation_time", creation_time, obj); JSONDecoder::decode_json("owner", owner, obj); JSONDecoder::decode_json("flags", flags, obj); JSONDecoder::decode_json("region", region, obj); @@ -465,7 +467,7 @@ void RGWBucketEnt::dump(Formatter *f) const encode_json("bucket", bucket, f); encode_json("size", size, f); encode_json("size_rounded", size_rounded, f); - encode_json("mtime", mtime, f); + encode_json("mtime", creation_time, f); /* mtime / creation time discrepency needed for backward compatibility */ encode_json("count", count, f); } diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index f19c5d7c8a4..ba894334444 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -339,12 +339,16 @@ void RGWProcess::handle_request(RGWRequest *req) req->log(s, "reading the cors attr"); handler->read_cors_config(); - + req->log(s, "verifying op permissions"); ret = op->verify_permission(); if (ret < 0) { - abort_early(s, ret); - goto done; + if (s->system_request) { + dout(2) << "overriding permissions due to system operation" << dendl; + } else { + abort_early(s, ret); + goto done; + } } req->log(s, "verifying op params"); diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 2bf0f2e885f..6242806ef4d 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -79,13 +79,15 @@ int RGWMetadataLog::add_entry(RGWRados *store, string& section, string& key, buf return store->time_log_add(oid, now, section, key, bl); } -void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, void **handle) +void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, + string& marker, void **handle) { LogListCtx *ctx = new LogListCtx(); ctx->cur_shard = shard_id; ctx->from_time = from_time; - ctx->end_time = end_time; + ctx->end_time = end_time; + ctx->marker = marker; get_shard_oid(ctx->cur_shard, ctx->cur_oid); @@ -99,7 +101,7 @@ void RGWMetadataLog::complete_list_entries(void *handle) { int RGWMetadataLog::list_entries(void *handle, int max_entries, - list<cls_log_entry>& entries, + list<cls_log_entry>& entries, bool *truncated) { LogListCtx *ctx = (LogListCtx *)handle; @@ -108,8 +110,6 @@ int RGWMetadataLog::list_entries(void *handle, return 0; } - entries.clear(); - int ret = store->time_log_list(ctx->cur_oid, ctx->from_time, ctx->end_time, max_entries, entries, ctx->marker, truncated); if ((ret < 0) && (ret != -ENOENT)) @@ -136,18 +136,18 @@ int RGWMetadataLog::trim(int shard_id, utime_t& from_time, utime_t& end_time) return ret; } -int RGWMetadataLog::lock_exclusive(int shard_id, utime_t& duration, string& owner_id) { +int RGWMetadataLog::lock_exclusive(int shard_id, utime_t& duration, string& zone_id, string& owner_id) { string oid; get_shard_oid(shard_id, oid); - return store->lock_exclusive(store->zone.log_pool, oid, duration, owner_id); + return store->lock_exclusive(store->zone.log_pool, oid, duration, zone_id, owner_id); } -int RGWMetadataLog::unlock(int shard_id, string& owner_id) { +int RGWMetadataLog::unlock(int shard_id, string& zone_id, string& owner_id) { string oid; get_shard_oid(shard_id, oid); - return store->unlock(store->zone.log_pool, oid, owner_id); + return store->unlock(store->zone.log_pool, oid, zone_id, owner_id); } obj_version& RGWMetadataObject::get_version() @@ -167,7 +167,7 @@ public: virtual string get_type() { return string(); } virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) { return -ENOTSUP; } - virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, JSONObj *obj) { return -ENOTSUP; } + virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) { return -ENOTSUP; } virtual void get_pool_and_oid(RGWRados *store, string& key, rgw_bucket& bucket, string& oid) {} @@ -289,6 +289,10 @@ int RGWMetadataManager::get(string& metadata_key, Formatter *f) f->open_object_section("metadata_info"); encode_json("key", metadata_key, f); encode_json("ver", obj->get_version(), f); + time_t mtime = obj->get_mtime(); + if (mtime > 0) { + encode_json("mtime", mtime, f); + } encode_json("data", *obj, f); f->close_section(); @@ -315,16 +319,18 @@ int RGWMetadataManager::put(string& metadata_key, bufferlist& bl) obj_version *objv = &objv_tracker.write_version; + time_t mtime = 0; JSONDecoder::decode_json("key", metadata_key, &parser); JSONDecoder::decode_json("ver", *objv, &parser); + JSONDecoder::decode_json("mtime", mtime, &parser); JSONObj *jo = parser.find_obj("data"); if (!jo) { return -EINVAL; } - return handler->put(store, entry, objv_tracker, jo); + return handler->put(store, entry, objv_tracker, mtime, jo); } int RGWMetadataManager::remove(string& metadata_key) @@ -353,6 +359,7 @@ int RGWMetadataManager::remove(string& metadata_key) int RGWMetadataManager::lock_exclusive(string& metadata_key, utime_t duration, string& owner_id) { RGWMetadataHandler *handler; string entry; + string zone_id; int ret = find_handler(metadata_key, &handler, entry); if (ret < 0) @@ -363,13 +370,14 @@ int RGWMetadataManager::lock_exclusive(string& metadata_key, utime_t duration, s handler->get_pool_and_oid(store, entry, pool, oid); - return store->lock_exclusive(pool, oid, duration, owner_id); + return store->lock_exclusive(pool, oid, duration, zone_id, owner_id); } int RGWMetadataManager::unlock(string& metadata_key, string& owner_id) { librados::IoCtx io_ctx; RGWMetadataHandler *handler; string entry; + string zone_id; int ret = find_handler(metadata_key, &handler, entry); if (ret < 0) @@ -380,7 +388,7 @@ int RGWMetadataManager::unlock(string& metadata_key, string& owner_id) { handler->get_pool_and_oid(store, entry, pool, oid); - return store->unlock(pool, oid, owner_id); + return store->unlock(pool, oid, zone_id, owner_id); } struct list_keys_handle { @@ -506,7 +514,7 @@ int RGWMetadataManager::post_modify(string& section, string& key, RGWMetadataLog } int RGWMetadataManager::put_entry(RGWMetadataHandler *handler, string& key, bufferlist& bl, bool exclusive, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs) + RGWObjVersionTracker *objv_tracker, time_t mtime, map<string, bufferlist> *pattrs) { string section; RGWMetadataLogData log_data; @@ -521,7 +529,7 @@ int RGWMetadataManager::put_entry(RGWMetadataHandler *handler, string& key, buff ret = rgw_put_system_obj(store, bucket, oid, bl.c_str(), bl.length(), exclusive, - objv_tracker, pattrs); + objv_tracker, mtime, pattrs); if (ret < 0) return ret; diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index 80c9f41ae6a..ba12decdd41 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -27,10 +27,13 @@ enum RGWMDLogStatus { class RGWMetadataObject { protected: obj_version objv; + time_t mtime; public: + RGWMetadataObject() : mtime(0) {} virtual ~RGWMetadataObject() {} obj_version& get_version(); + time_t get_mtime() { return mtime; } virtual void dump(Formatter *f) const = 0; }; @@ -47,7 +50,7 @@ public: virtual string get_type() = 0; virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) = 0; - virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, JSONObj *obj) = 0; + virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) = 0; virtual int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) = 0; virtual int list_keys_init(RGWRados *store, void **phandle) = 0; @@ -88,16 +91,15 @@ public: LogListCtx() : done(false) {} }; - void init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, void **handle); + void init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, string& marker, void **handle); void complete_list_entries(void *handle); int list_entries(void *handle, int max_entries, - list<cls_log_entry>& entries, - bool *truncated); + list<cls_log_entry>& entries, bool *truncated); int trim(int shard_id, utime_t& from_time, utime_t& end_time); - int lock_exclusive(int shard_id, utime_t& duration, string& owner_id); - int unlock(int shard_id, string& owner_id); + int lock_exclusive(int shard_id, utime_t& duration, string&zone_id, string& owner_id); + int unlock(int shard_id, string& zone_id, string& owner_id); }; class RGWMetadataLogData; @@ -126,7 +128,7 @@ public: RGWMetadataHandler *get_handler(const char *type); int put_entry(RGWMetadataHandler *handler, string& key, bufferlist& bl, bool exclusive, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs = NULL); + RGWObjVersionTracker *objv_tracker, time_t mtime, map<string, bufferlist> *pattrs = NULL); int remove_entry(RGWMetadataHandler *handler, string& key, RGWObjVersionTracker *objv_tracker); int set_attr(RGWMetadataHandler *handler, string& key, rgw_obj& obj, string& attr, bufferlist& bl, RGWObjVersionTracker *objv_tracker); diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index f571de0f731..1f2cb9f5452 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -158,29 +158,6 @@ static void rgw_get_request_metadata(CephContext *cct, struct req_info& info, ma } } -static int policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy) -{ - map<string, bufferlist>::iterator aiter = attrset.find(RGW_ATTR_ACL); - if (aiter == attrset.end()) - return -EIO; - - bufferlist& bl = aiter->second; - bufferlist::iterator iter = bl.begin(); - try { - policy->decode(iter); - } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; - return -EIO; - } - if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { - RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy); - ldout(cct, 15) << "Read AccessControlPolicy"; - s3policy->to_xml(*_dout); - *_dout << dendl; - } - return 0; -} - /** * Get the AccessControlPolicy for an object off of disk. * policy: must point to a valid RGWACL, and will be filled upon return. @@ -215,7 +192,7 @@ static int get_policy_from_attr(CephContext *cct, RGWRados *store, void *ctx, RG /* object exists, but policy is broken */ RGWBucketInfo info; RGWUserInfo uinfo; - int r = store->get_bucket_info(ctx, obj.bucket.name, info, objv_tracker); + int r = store->get_bucket_info(ctx, obj.bucket.name, info, objv_tracker, NULL); if (r < 0) goto done; r = rgw_get_user_info_by_uid(store, info.owner, uinfo); @@ -298,8 +275,6 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu RGWBucketInfo bucket_info; - bool source_in_domain = false; - if (s->copy_source) { /* check if copy source is within the current domain */ const char *src = s->copy_source; if (*src == '/') @@ -312,17 +287,17 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu RGWBucketInfo source_info; - ret = store->get_bucket_info(s->obj_ctx, copy_source_str, source_info, NULL); + ret = store->get_bucket_info(s->obj_ctx, copy_source_str, source_info, NULL, NULL); if (ret == 0) { string& region = source_info.region; - source_in_domain = (region.empty() && store->region.is_master) || + s->local_source = (region.empty() && store->region.is_master) || (region == store->region.name); } } if (s->bucket_name_str.size()) { bool exists = true; - ret = store->get_bucket_info(s->obj_ctx, s->bucket_name_str, bucket_info, &s->objv_tracker); + ret = store->get_bucket_info(s->obj_ctx, s->bucket_name_str, bucket_info, &s->objv_tracker, NULL); if (ret < 0) { if (ret != -ENOENT) { ldout(s->cct, 0) << "NOTICE: couldn't get bucket from bucket_name (name=" << s->bucket_name_str << ")" << dendl; @@ -345,7 +320,7 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu /* we now need to make sure that the operation actually requires copy source, that is * it's a copy operation */ - if (!source_in_domain || + if (!s->local_source || (s->op != OP_PUT && s->op != OP_COPY) || s->object_str.empty()) { return -ERR_PERMANENT_REDIRECT; @@ -416,7 +391,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc goto done_err; } - ret = policy_from_attrset(s->cct, attrs, &obj_policy); + ret = rgw_policy_from_attrset(s->cct, attrs, &obj_policy); if (ret < 0) goto done_err; @@ -540,7 +515,7 @@ int RGWGetObj::handle_user_manifest(const char *prefix) if (bucket_name.compare(s->bucket.name) != 0) { RGWBucketInfo bucket_info; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &s->objv_tracker); + int r = store->get_bucket_info(NULL, bucket_name, bucket_info, &s->objv_tracker, NULL); if (r < 0) { ldout(s->cct, 0) << "could not get bucket info for bucket=" << bucket_name << dendl; return r; @@ -880,28 +855,25 @@ int RGWCreateBucket::verify_permission() return 0; } -template<class T> -static int forward_request(struct req_state *s, RGWRados *store, bufferlist& in_data, const char *name, T& obj) +static int forward_request_to_master(struct req_state *s, RGWRados *store, bufferlist& in_data, JSONParser *jp) { - if (!store->rest_conn) { + if (!store->rest_master_conn) { ldout(s->cct, 0) << "rest connection is invalid" << dendl; return -EINVAL; } ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl; bufferlist response; #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response - int ret = store->rest_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response); + int ret = store->rest_master_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response); if (ret < 0) return ret; ldout(s->cct, 20) << "response: " << response.c_str() << dendl; - JSONParser jp; - ret = jp.parse(response.c_str(), response.length()); + ret = jp->parse(response.c_str(), response.length()); if (ret < 0) { ldout(s->cct, 0) << "failed parsing response from master region" << dendl; return ret; } - JSONDecoder::decode_json(name, obj, &jp); return 0; } @@ -937,12 +909,26 @@ void RGWCreateBucket::execute() } } + RGWBucketInfo master_info; + rgw_bucket *pmaster_bucket; + time_t creation_time; + if (!store->region.is_master) { - ret = forward_request(s, store, in_data, "object_ver", objv); + JSONParser jp; + ret = forward_request_to_master(s, store, in_data, &jp); if (ret < 0) return; + JSONDecoder::decode_json("object_ver", objv, &jp); + JSONDecoder::decode_json("bucket_info", master_info, &jp); ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl; + ldout(s->cct, 20) << "got creation time: << " << master_info.creation_time << dendl; + pmaster_bucket= &master_info.bucket; + creation_time = master_info.creation_time; + pobjv = &objv; + } else { + pmaster_bucket = NULL; + creation_time = 0; } string region_name; @@ -961,7 +947,8 @@ void RGWCreateBucket::execute() attrs[RGW_ATTR_ACL] = aclbl; s->bucket.name = s->bucket_name_str; - ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, pobjv, true); + ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, pobjv, + creation_time, pmaster_bucket, &info, true); /* continue if EEXIST and create_bucket will fail below. this way we can recover * from a partial create by retrying it. */ ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << ret << " bucket=" << s->bucket << dendl; @@ -977,14 +964,6 @@ void RGWCreateBucket::execute() * info, verify that the reported bucket owner is the current user. * If all is ok then update the user's list of buckets */ - RGWBucketInfo info; - map<string, bufferlist> attrs; - int r = store->get_bucket_info(NULL, s->bucket.name, info, &s->objv_tracker, &attrs); - if (r < 0) { - ldout(s->cct, 0) << "ERROR: get_bucket_info on bucket=" << s->bucket.name << " returned err=" << r << " after create_bucket returned -EEXIST" << dendl; - ret = r; - return; - } if (info.owner.compare(s->user.user_id) != 0) { ret = -ERR_BUCKET_EXISTS; return; @@ -992,7 +971,7 @@ void RGWCreateBucket::execute() s->bucket = info.bucket; } - ret = rgw_add_bucket(store, s->user.user_id, s->bucket); + ret = rgw_add_bucket(store, s->user.user_id, s->bucket, info.creation_time); if (ret && !existed && ret != -EEXIST) /* if it exists (or previously existed), don't remove it! */ rgw_remove_user_bucket_info(store, s->user.user_id, s->bucket); @@ -1017,10 +996,12 @@ void RGWDeleteBucket::execute() if (!store->region.is_master) { bufferlist in_data; - ret = forward_request(s, store, in_data, "object_ver", objv_tracker.read_version); - if (ret < 0) { + JSONParser jp; + ret = forward_request_to_master(s, store, in_data, &jp); + if (ret < 0) return; - } + + JSONDecoder::decode_json("object_ver", objv_tracker.read_version, &jp); } ret = store->delete_bucket(s->bucket, objv_tracker); @@ -1033,10 +1014,6 @@ void RGWDeleteBucket::execute() } } -struct put_obj_aio_info { - void *handle; -}; - int RGWPutObj::verify_permission() { if (!verify_bucket_permission(s, RGW_PERM_WRITE)) @@ -1045,312 +1022,26 @@ int RGWPutObj::verify_permission() return 0; } -int RGWPutObjProcessor::complete(string& etag, map<string, bufferlist>& attrs) -{ - int r = do_complete(etag, attrs); - if (r < 0) - return r; - - is_complete = true; - return 0; -} - -RGWPutObjProcessor::~RGWPutObjProcessor() -{ - if (is_complete) - return; - - if (!s) - return; - - list<rgw_obj>::iterator iter; - for (iter = objs.begin(); iter != objs.end(); ++iter) { - rgw_obj& obj = *iter; - int r = store->delete_obj(s->obj_ctx, obj); - if (r < 0 && r != -ENOENT) { - ldout(s->cct, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; - } - } -} - -class RGWPutObjProcessor_Plain : public RGWPutObjProcessor -{ - bufferlist data; - rgw_obj obj; - off_t ofs; - -protected: - int prepare(RGWRados *store, struct req_state *s); - int handle_data(bufferlist& bl, off_t ofs, void **phandle); - int throttle_data(void *handle) { return 0; } - int do_complete(string& etag, map<string, bufferlist>& attrs); - -public: - RGWPutObjProcessor_Plain() : ofs(0) {} -}; - -int RGWPutObjProcessor_Plain::prepare(RGWRados *store,struct req_state *s) -{ - RGWPutObjProcessor::prepare(store, s); - - obj.init(s->bucket, s->object_str); - - return 0; -}; - -int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle) -{ - if (ofs != _ofs) - return -EINVAL; - - data.append(bl); - ofs += bl.length(); - - return 0; -} - -int RGWPutObjProcessor_Plain::do_complete(string& etag, map<string, bufferlist>& attrs) -{ - int r = store->put_obj_meta(s->obj_ctx, obj, data.length(), attrs, - RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, - &data); - return r; -} - - -class RGWPutObjProcessor_Aio : public RGWPutObjProcessor -{ - list<struct put_obj_aio_info> pending; - size_t max_chunks; - - struct put_obj_aio_info pop_pending(); - int wait_pending_front(); - bool pending_has_completed(); - int drain_pending(); - -protected: - uint64_t obj_len; - - int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle); - int throttle_data(void *handle); - - RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} - virtual ~RGWPutObjProcessor_Aio() { - drain_pending(); - } -}; - -int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle) -{ - if ((uint64_t)abs_ofs + bl.length() > obj_len) - obj_len = abs_ofs + bl.length(); - - // For the first call pass -1 as the offset to - // do a write_full. - int r = store->aio_put_obj_data(NULL, obj, - bl, - ((ofs != 0) ? ofs : -1), - false, phandle); - - return r; -} - -struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending() -{ - struct put_obj_aio_info info; - info = pending.front(); - pending.pop_front(); - return info; -} - -int RGWPutObjProcessor_Aio::wait_pending_front() -{ - struct put_obj_aio_info info = pop_pending(); - int ret = store->aio_wait(info.handle); - return ret; -} - -bool RGWPutObjProcessor_Aio::pending_has_completed() -{ - if (pending.empty()) - return false; - - struct put_obj_aio_info& info = pending.front(); - return store->aio_completed(info.handle); -} - -int RGWPutObjProcessor_Aio::drain_pending() -{ - int ret = 0; - while (!pending.empty()) { - int r = wait_pending_front(); - if (r < 0) - ret = r; - } - return ret; -} - -int RGWPutObjProcessor_Aio::throttle_data(void *handle) -{ - if (handle) { - struct put_obj_aio_info info; - info.handle = handle; - pending.push_back(info); - } - size_t orig_size = pending.size(); - while (pending_has_completed()) { - int r = wait_pending_front(); - if (r < 0) - return r; - } - - /* resize window in case messages are draining too fast */ - if (orig_size - pending.size() >= max_chunks) { - max_chunks++; - } - - if (pending.size() > max_chunks) { - int r = wait_pending_front(); - if (r < 0) - return r; - } - return 0; -} - -class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio -{ - bufferlist first_chunk; - uint64_t part_size; - off_t cur_part_ofs; - off_t next_part_ofs; - int cur_part_id; -protected: - string oid_prefix; - rgw_obj head_obj; - rgw_obj cur_obj; - RGWObjManifest manifest; - - virtual bool immutable_head() { return false; } - - int prepare(RGWRados *store, struct req_state *s); - virtual int do_complete(string& etag, map<string, bufferlist>& attrs); - - void prepare_next_part(off_t ofs); - void complete_parts(); - -public: - ~RGWPutObjProcessor_Atomic() {} - RGWPutObjProcessor_Atomic(uint64_t _p) : part_size(_p), - cur_part_ofs(0), - next_part_ofs(_p), - cur_part_id(0) {} - int handle_data(bufferlist& bl, off_t ofs, void **phandle) { - if (!ofs && !immutable_head()) { - first_chunk.claim(bl); - *phandle = NULL; - obj_len = (uint64_t)first_chunk.length(); - prepare_next_part(first_chunk.length()); - return 0; - } - if (ofs >= next_part_ofs) - prepare_next_part(ofs); - int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle); - - return r; - } -}; - -int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, struct req_state *s) -{ - RGWPutObjProcessor::prepare(store, s); - - head_obj.init(s->bucket, s->object_str); - - char buf[33]; - gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1); - oid_prefix.append("_"); - oid_prefix.append(buf); - oid_prefix.append("_"); - - return 0; -} - -void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) { - int num_parts = manifest.objs.size(); - RGWObjManifestPart *part; - - /* first update manifest for written data */ - if (!num_parts) { - part = &manifest.objs[cur_part_ofs]; - part->loc = head_obj; - } else { - part = &manifest.objs[cur_part_ofs]; - part->loc = cur_obj; - } - part->loc_ofs = 0; - part->size = ofs - cur_part_ofs; - - if ((uint64_t)ofs > manifest.obj_size) - manifest.obj_size = ofs; - - /* now update params for next part */ - - cur_part_ofs = ofs; - next_part_ofs = cur_part_ofs + part_size; - char buf[16]; - - cur_part_id++; - snprintf(buf, sizeof(buf), "%d", cur_part_id); - string cur_oid = oid_prefix; - cur_oid.append(buf); - cur_obj.init_ns(s->bucket, cur_oid, shadow_ns); - - add_obj(cur_obj); -}; - -void RGWPutObjProcessor_Atomic::complete_parts() -{ - if (obj_len > (uint64_t)cur_part_ofs) - prepare_next_part(obj_len); -} - -int RGWPutObjProcessor_Atomic::do_complete(string& etag, map<string, bufferlist>& attrs) -{ - complete_parts(); - - store->set_atomic(s->obj_ctx, head_obj); - - RGWRados::PutObjMetaExtraParams extra_params; - - extra_params.data = &first_chunk; - extra_params.manifest = &manifest; - extra_params.ptag = &s->req_id; /* use req_id as operation tag */ - - int r = store->put_obj_meta(s->obj_ctx, head_obj, obj_len, attrs, - RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, - extra_params); - return r; -} - class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic { string part_num; RGWMPObj mp; + req_state *s; protected: bool immutable_head() { return true; } - int prepare(RGWRados *store, struct req_state *s); - int do_complete(string& etag, map<string, bufferlist>& attrs); + int prepare(RGWRados *store, void *obj_ctx); + int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs); public: - RGWPutObjProcessor_Multipart(uint64_t _p) : RGWPutObjProcessor_Atomic(_p) {} + RGWPutObjProcessor_Multipart(uint64_t _p, req_state *_s) : RGWPutObjProcessor_Atomic(s->bucket, s->object_str, _p, s->req_id), s(_s) {} }; -int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, struct req_state *s) +int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx) { - RGWPutObjProcessor::prepare(store, s); + RGWPutObjProcessor::prepare(store, obj_ctx); - string oid = s->object_str; + string oid = obj_str; string upload_id; upload_id = s->info.args.get("uploadId"); mp.init(oid, upload_id); @@ -1362,7 +1053,7 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, struct req_state *s) oid = mp.get_part(part_num); - head_obj.init_ns(s->bucket, oid, mp_ns); + head_obj.init_ns(bucket, oid, mp_ns); oid_prefix = oid; oid_prefix.append("_"); cur_obj = head_obj; @@ -1370,11 +1061,15 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, struct req_state *s) return 0; } -int RGWPutObjProcessor_Multipart::do_complete(string& etag, map<string, bufferlist>& attrs) +int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) { complete_parts(); - int r = store->put_obj_meta(s->obj_ctx, head_obj, s->obj_size, attrs, RGW_OBJ_CATEGORY_MAIN, 0); + RGWRados::PutObjMetaExtraParams params; + params.set_mtime = set_mtime; + params.mtime = mtime; + + int r = store->put_obj_meta(obj_ctx, head_obj, s->obj_size, attrs, RGW_OBJ_CATEGORY_MAIN, 0, params); if (r < 0) return r; @@ -1385,14 +1080,14 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, map<string, bufferli info.num = atoi(part_num.c_str()); info.etag = etag; info.size = s->obj_size; - info.modified = ceph_clock_now(s->cct); + info.modified = ceph_clock_now(store->ctx()); info.manifest = manifest; ::encode(info, bl); string multipart_meta_obj = mp.get_meta(); rgw_obj meta_obj; - meta_obj.init_ns(s->bucket, multipart_meta_obj, mp_ns); + meta_obj.init_ns(bucket, multipart_meta_obj, mp_ns); r = store->omap_set(meta_obj, p, bl); @@ -1409,9 +1104,9 @@ RGWPutObjProcessor *RGWPutObj::select_processor() uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size; if (!multipart) { - processor = new RGWPutObjProcessor_Atomic(part_size); + processor = new RGWPutObjProcessor_Atomic(s->bucket, s->object_str, part_size, s->req_id); } else { - processor = new RGWPutObjProcessor_Multipart(part_size); + processor = new RGWPutObjProcessor_Multipart(part_size, s); } return processor; @@ -1467,7 +1162,7 @@ void RGWPutObj::execute() processor = select_processor(); - ret = processor->prepare(store, s); + ret = processor->prepare(store, s->obj_ctx); if (ret < 0) goto done; @@ -1537,7 +1232,7 @@ void RGWPutObj::execute() rgw_get_request_metadata(s->cct, s->info, attrs); - ret = processor->complete(etag, attrs); + ret = processor->complete(etag, &mtime, 0, attrs); done: dispose_processor(processor); perfcounter->tinc(l_rgw_put_lat, @@ -1556,9 +1251,9 @@ RGWPutObjProcessor *RGWPostObj::select_processor() uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size; if (s->content_length <= RGW_MAX_CHUNK_SIZE) - processor = new RGWPutObjProcessor_Plain(); + processor = new RGWPutObjProcessor_Plain(s->bucket, s->object_str); else - processor = new RGWPutObjProcessor_Atomic(part_size); + processor = new RGWPutObjProcessor_Atomic(s->bucket, s->object_str, part_size, s->req_id); return processor; } @@ -1653,7 +1348,7 @@ void RGWPostObj::execute() attrs[RGW_ATTR_CONTENT_TYPE] = ct_bl; } - ret = processor->complete(etag, attrs); + ret = processor->complete(etag, NULL, 0, attrs); done: dispose_processor(processor); @@ -1772,47 +1467,50 @@ int RGWCopyObj::verify_permission() if (ret < 0) return ret; - /* get buckets info (source and dest) */ - - ret = store->get_bucket_info(s->obj_ctx, src_bucket_name, src_bucket_info, NULL); + ret = store->get_bucket_info(s->obj_ctx, src_bucket_name, src_bucket_info, NULL, NULL); if (ret < 0) return ret; src_bucket = src_bucket_info.bucket; - if (src_bucket_name.compare(dest_bucket_name) == 0) { + /* get buckets info (source and dest) */ + if (s->local_source && source_zone.empty()) { + rgw_obj src_obj(src_bucket, src_object); + store->set_atomic(s->obj_ctx, src_obj); + store->set_prefetch_data(s->obj_ctx, src_obj); + + /* check source object permissions */ + ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object); + if (ret < 0) + return ret; + + if (!s->system_request && /* system request overrides permission checks */ + !src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ)) + return -EACCES; + } + + RGWAccessControlPolicy dest_bucket_policy(s->cct); + + if (src_bucket_name.compare(dest_bucket_name) == 0) { /* will only happen if s->local_source */ dest_bucket_info = src_bucket_info; } else { - ret = store->get_bucket_info(s->obj_ctx, dest_bucket_name, dest_bucket_info, NULL); + ret = store->get_bucket_info(s->obj_ctx, dest_bucket_name, dest_bucket_info, NULL, NULL); if (ret < 0) return ret; } dest_bucket = dest_bucket_info.bucket; - rgw_obj src_obj(src_bucket, src_object); - store->set_atomic(s->obj_ctx, src_obj); - store->set_prefetch_data(s->obj_ctx, src_obj); - rgw_obj dest_obj(dest_bucket, dest_object); store->set_atomic(s->obj_ctx, dest_obj); - /* check source object permissions */ - ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object); - if (ret < 0) - return ret; - - if (!src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ)) - return -EACCES; - - RGWAccessControlPolicy dest_bucket_policy(s->cct); - /* check dest bucket permissions */ ret = read_policy(store, s, dest_bucket_info, &dest_bucket_policy, dest_bucket, empty_str); if (ret < 0) return ret; - if (!dest_bucket_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_WRITE)) + if (!s->system_request && /* system request overrides permission checks */ + !dest_bucket_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_WRITE)) return -EACCES; ret = init_dest_policy(); @@ -1866,28 +1564,19 @@ void RGWCopyObj::execute() src_obj.init(src_bucket, src_object); dst_obj.init(dest_bucket, dest_object); store->set_atomic(s->obj_ctx, src_obj); -#if 0 - - if ((dest_bucket_info.region.empty() && !store->region.is_master) || - (dest_bucket_info.region != store->region.name)) { - map<string, bufferlist> src_attrs; - - int ret = get_obj_attrs(store, s, src_obj, src_attrs - uint64_t *obj_size, RGWObjVersionTracker *objv_tracker) - - int ret = store->rest_conn->put_obj(s->user.user_id, dst_obj, - if (ret < 0) - return ret; - } -#endif store->set_atomic(s->obj_ctx, dst_obj); ret = store->copy_obj(s->obj_ctx, s->user.user_id, + client_id, + op_id, + &s->info, + source_zone, dst_obj, src_obj, dest_bucket_info, + src_bucket_info, &mtime, mod_ptr, unmod_ptr, @@ -2256,7 +1945,7 @@ void RGWInitMultipart::execute() obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // the meta object will be indexed with 0 size, we c - ret = store->put_obj_meta(s->obj_ctx, obj, 0, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL); + ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL); } while (ret == -EEXIST); } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 55011b4102f..1632d35fcc8 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -234,6 +234,7 @@ protected: RGWAccessControlPolicy policy; string location_constraint; RGWObjVersionTracker objv_tracker; + RGWBucketInfo info; bufferlist in_data; @@ -267,33 +268,6 @@ public: virtual const char *name() { return "delete_bucket"; } }; -class RGWPutObjProcessor -{ -protected: - RGWRados *store; - struct req_state *s; - bool is_complete; - - virtual int do_complete(string& etag, map<string, bufferlist>& attrs) = 0; - - list<rgw_obj> objs; - - void add_obj(rgw_obj& obj) { - objs.push_back(obj); - } -public: - RGWPutObjProcessor() : store(NULL), s(NULL), is_complete(false) {} - virtual ~RGWPutObjProcessor(); - virtual int prepare(RGWRados *_store, struct req_state *_s) { - store = _store; - s = _s; - return 0; - }; - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0; - virtual int throttle_data(void *handle) = 0; - virtual int complete(string& etag, map<string, bufferlist>& attrs); -}; - class RGWPutObj : public RGWOp { friend class RGWPutObjProcessor; @@ -307,6 +281,7 @@ protected: bool chunked_upload; RGWAccessControlPolicy policy; const char *obj_manifest; + time_t mtime; public: RGWPutObj() { @@ -316,6 +291,7 @@ public: supplied_etag = NULL; chunked_upload = false; obj_manifest = NULL; + mtime = 0; } virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) { @@ -443,6 +419,9 @@ protected: bool replace_attrs; RGWBucketInfo src_bucket_info; RGWBucketInfo dest_bucket_info; + string source_zone; + string client_id; + string op_id; int init_common(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 1c6595489eb..3e82053d6b1 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -11,6 +11,7 @@ #include "rgw_rados.h" #include "rgw_cache.h" #include "rgw_acl.h" +#include "rgw_acl_s3.h" /* for dumping s3policy in debug log */ #include "rgw_metadata.h" #include "rgw_bucket.h" @@ -19,6 +20,7 @@ #include "cls/refcount/cls_refcount_client.h" #include "cls/version/cls_version_client.h" #include "cls/log/cls_log_client.h" +#include "cls/statelog/cls_statelog_client.h" #include "cls/lock/cls_lock_client.h" #include "rgw_tools.h" @@ -58,6 +60,7 @@ static string region_info_oid_prefix = "region_info."; static string default_region_info_oid = "default.region"; static string region_map_oid = "region_map"; +static string log_lock_name = "rgw_log_lock"; static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; @@ -66,6 +69,8 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; #define RGW_DEFAULT_ZONE_ROOT_POOL ".rgw.root" #define RGW_DEFAULT_REGION_ROOT_POOL ".rgw.root" +#define RGW_STATELOG_OBJ_PREFIX "statelog." + #define dout_subsys ceph_subsys_rgw @@ -97,7 +102,7 @@ int RGWRegion::read_default(RGWDefaultRegionInfo& default_info) rgw_bucket pool(pool_name.c_str()); bufferlist bl; - int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL); + int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL, NULL); if (ret < 0) return ret; @@ -131,7 +136,7 @@ int RGWRegion::set_as_default() ::encode(default_info, bl); - int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, NULL); + int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, 0, NULL); if (ret < 0) return ret; @@ -188,7 +193,7 @@ int RGWRegion::read_info(const string& region_name) string oid = region_info_oid_prefix + name; - int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL); + int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL, NULL); if (ret < 0) { lderr(cct) << "failed reading region info from " << pool << ":" << oid << ": " << cpp_strerror(-ret) << dendl; return ret; @@ -243,7 +248,7 @@ int RGWRegion::store_info(bool exclusive) bufferlist bl; ::encode(*this, bl); - int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), exclusive, NULL, NULL); + int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), exclusive, NULL, 0, NULL); return ret; } @@ -296,7 +301,7 @@ int RGWZoneParams::init(CephContext *cct, RGWRados *store, RGWRegion& region) bufferlist bl; string oid = zone_info_oid_prefix + name; - int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL); + int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL, NULL); if (ret < 0) return ret; @@ -322,7 +327,7 @@ int RGWZoneParams::store_info(CephContext *cct, RGWRados *store, RGWRegion& regi bufferlist bl; ::encode(*this, bl); - int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, NULL); + int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, 0, NULL); return ret; } @@ -369,7 +374,7 @@ int RGWRegionMap::read(CephContext *cct, RGWRados *store) rgw_bucket pool(pool_name.c_str()); bufferlist bl; - int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL); + int ret = rgw_get_system_obj(store, NULL, pool, oid, bl, NULL, NULL); if (ret < 0) return ret; @@ -398,7 +403,7 @@ int RGWRegionMap::store(CephContext *cct, RGWRados *store) bufferlist bl; ::encode(*this, bl); - int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, NULL); + int ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), false, NULL, 0, NULL); return ret; } @@ -479,6 +484,273 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct) append_rand_alpha(cct, write_version.tag, write_version.tag, TAG_LEN); } +int RGWPutObjProcessor::complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) +{ + int r = do_complete(etag, mtime, set_mtime, attrs); + if (r < 0) + return r; + + is_complete = true; + return 0; +} + +RGWPutObjProcessor::~RGWPutObjProcessor() +{ + if (is_complete) + return; + + list<rgw_obj>::iterator iter; + for (iter = objs.begin(); iter != objs.end(); ++iter) { + rgw_obj& obj = *iter; + int r = store->delete_obj(obj_ctx, obj); + if (r < 0 && r != -ENOENT) { + ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; + } + } +} + +int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx) +{ + RGWPutObjProcessor::prepare(store, obj_ctx); + + obj.init(bucket, obj_str); + + return 0; +}; + +int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle) +{ + if (ofs != _ofs) + return -EINVAL; + + data.append(bl); + ofs += bl.length(); + + return 0; +} + +int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) +{ + RGWRados::PutObjMetaExtraParams params; + params.set_mtime = set_mtime; + params.mtime = mtime; + params.data = &data; + + int r = store->put_obj_meta(obj_ctx, obj, data.length(), attrs, + RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, + params); + return r; +} + + +int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle) +{ + if ((uint64_t)abs_ofs + bl.length() > obj_len) + obj_len = abs_ofs + bl.length(); + + // For the first call pass -1 as the offset to + // do a write_full. + int r = store->aio_put_obj_data(NULL, obj, + bl, + ((ofs != 0) ? ofs : -1), + false, phandle); + + return r; +} + +struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending() +{ + struct put_obj_aio_info info; + info = pending.front(); + pending.pop_front(); + return info; +} + +int RGWPutObjProcessor_Aio::wait_pending_front() +{ + struct put_obj_aio_info info = pop_pending(); + int ret = store->aio_wait(info.handle); + return ret; +} + +bool RGWPutObjProcessor_Aio::pending_has_completed() +{ + if (pending.empty()) + return false; + + struct put_obj_aio_info& info = pending.front(); + return store->aio_completed(info.handle); +} + +int RGWPutObjProcessor_Aio::drain_pending() +{ + int ret = 0; + while (!pending.empty()) { + int r = wait_pending_front(); + if (r < 0) + ret = r; + } + return ret; +} + +int RGWPutObjProcessor_Aio::throttle_data(void *handle) +{ + if (handle) { + struct put_obj_aio_info info; + info.handle = handle; + pending.push_back(info); + } + size_t orig_size = pending.size(); + while (pending_has_completed()) { + int r = wait_pending_front(); + if (r < 0) + return r; + } + + /* resize window in case messages are draining too fast */ + if (orig_size - pending.size() >= max_chunks) { + max_chunks++; + } + + if (pending.size() > max_chunks) { + int r = wait_pending_front(); + if (r < 0) + return r; + } + return 0; +} + +int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle) +{ + if (ofs >= next_part_ofs) + prepare_next_part(ofs); + + return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle); +} + +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) { + *phandle = NULL; + if (extra_data_len) { + size_t extra_len = bl.length(); + if (extra_len > extra_data_len) + extra_len = extra_data_len; + + bufferlist extra; + bl.splice(0, extra_len, &extra); + extra_data_bl.append(extra); + + extra_data_len -= extra_len; + if (bl.length() == 0) { + return 0; + } + ofs = extra_data_bl.length(); + } + + pending_data_bl.claim_append(bl); + if (pending_data_bl.length() < RGW_MAX_CHUNK_SIZE) + return 0; + + pending_data_bl.splice(0, RGW_MAX_CHUNK_SIZE, &bl); + + if (!data_ofs && !immutable_head()) { + first_chunk.claim(bl); + obj_len = (uint64_t)first_chunk.length(); + prepare_next_part(first_chunk.length()); + data_ofs = obj_len; + return 0; + } + off_t write_ofs = data_ofs; + data_ofs = write_ofs + bl.length(); + return write_data(bl, write_ofs, phandle); +} + +int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx) +{ + RGWPutObjProcessor::prepare(store, obj_ctx); + + head_obj.init(bucket, obj_str); + + char buf[33]; + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + oid_prefix.append("_"); + oid_prefix.append(buf); + oid_prefix.append("_"); + + return 0; +} + +void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) { + int num_parts = manifest.objs.size(); + RGWObjManifestPart *part; + + /* first update manifest for written data */ + if (!num_parts) { + part = &manifest.objs[cur_part_ofs]; + part->loc = head_obj; + } else { + part = &manifest.objs[cur_part_ofs]; + part->loc = cur_obj; + } + part->loc_ofs = 0; + part->size = ofs - cur_part_ofs; + + if ((uint64_t)ofs > manifest.obj_size) + manifest.obj_size = ofs; + + /* now update params for next part */ + + cur_part_ofs = ofs; + next_part_ofs = cur_part_ofs + part_size; + char buf[16]; + + cur_part_id++; + snprintf(buf, sizeof(buf), "%d", cur_part_id); + string cur_oid = oid_prefix; + cur_oid.append(buf); + cur_obj.init_ns(bucket, cur_oid, shadow_ns); + + add_obj(cur_obj); +}; + +void RGWPutObjProcessor_Atomic::complete_parts() +{ + if (obj_len > (uint64_t)cur_part_ofs) + prepare_next_part(obj_len); +} + +int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) +{ + if (pending_data_bl.length()) { + void *handle; + int r = write_data(pending_data_bl, data_ofs, &handle); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; + return r; + } + r = throttle_data(handle); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; + return r; + } + } + complete_parts(); + + store->set_atomic(obj_ctx, head_obj); + + RGWRados::PutObjMetaExtraParams extra_params; + + extra_params.data = &first_chunk; + extra_params.manifest = &manifest; + extra_params.ptag = &unique_tag; /* use req_id as operation tag */ + extra_params.mtime = mtime; + extra_params.set_mtime = set_mtime; + + int r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs, + RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, + extra_params); + return r; +} + class RGWWatcher : public librados::WatchCtx { RGWRados *rados; public: @@ -525,7 +797,13 @@ void RGWRados::finalize() delete gc; gc = NULL; } - delete rest_conn; + delete rest_master_conn; + + map<string, RGWRESTConn *>::iterator iter; + for (iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) { + RGWRESTConn *conn = iter->second; + delete conn; + } } /** @@ -584,7 +862,18 @@ int RGWRados::init_complete() lderr(cct) << "ERROR: bad region map: inconsistent master region" << dendl; return -EINVAL; } - rest_conn = new RGWRegionConnection(cct, this, iter->second); + RGWRegion& region = iter->second; + rest_master_conn = new RGWRESTConn(cct, this, region.endpoints); + } + + map<string, RGWZone>::iterator ziter; + for (ziter = region.zones.begin(); ziter != region.zones.end(); ++ziter) { + const string& name = ziter->first; + if (name != zone.name) { + RGWZone& z = ziter->second; + ldout(cct, 20) << "generating connection object for zone " << name << dendl; + zone_conn_map[name] = new RGWRESTConn(cct, this, z.endpoints); + } } ret = open_root_pool_ctx(); @@ -1199,8 +1488,8 @@ int RGWRados::time_log_list(const string& oid, utime_t& start_time, utime_t& end int r = rados->ioctx_create(log_pool, io_ctx); if (r < 0) return r; - librados::ObjectReadOperation op; + cls_log_list(op, start_time, end_time, marker, max_entries, entries, &marker, truncated); bufferlist obl; @@ -1225,7 +1514,8 @@ int RGWRados::time_log_trim(const string& oid, utime_t& start_time, utime_t& end } -int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& owner_id) { +int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, + string& zone_id, string& owner_id) { librados::IoCtx io_ctx; const char *pool_name = pool.name.c_str(); @@ -1234,15 +1524,16 @@ int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& durat if (r < 0) return r; - string lock_name = RGW_INDEX_LOCK_NAME; - rados::cls::lock::Lock l(lock_name); + rados::cls::lock::Lock l(log_lock_name); l.set_duration(duration); l.set_cookie(owner_id); + l.set_tag(zone_id); + l.set_renew(true); return l.lock_exclusive(&io_ctx, oid); } -int RGWRados::unlock(rgw_bucket& pool, const string& oid, string& owner_id) { +int RGWRados::unlock(rgw_bucket& pool, const string& oid, string& zone_id, string& owner_id) { librados::IoCtx io_ctx; const char *pool_name = pool.name.c_str(); @@ -1251,8 +1542,8 @@ int RGWRados::unlock(rgw_bucket& pool, const string& oid, string& owner_id) { if (r < 0) return r; - string lock_name = RGW_INDEX_LOCK_NAME; - rados::cls::lock::Lock l(lock_name); + rados::cls::lock::Lock l(log_lock_name); + l.set_tag(zone_id); l.set_cookie(owner_id); return l.unlock(&io_ctx, oid); @@ -1272,6 +1563,29 @@ int RGWRados::decode_policy(bufferlist& bl, ACLOwner *owner) return 0; } +int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy) +{ + map<string, bufferlist>::iterator aiter = attrset.find(RGW_ATTR_ACL); + if (aiter == attrset.end()) + return -EIO; + + bufferlist& bl = aiter->second; + bufferlist::iterator iter = bl.begin(); + try { + policy->decode(iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; + return -EIO; + } + if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { + RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy); + ldout(cct, 15) << "Read AccessControlPolicy"; + s3policy->to_xml(*_dout); + *_dout << dendl; + } + return 0; +} + /** * get listing of the objects in a bucket. * bucket: bucket to list contents of @@ -1384,6 +1698,27 @@ int RGWRados::create_pool(rgw_bucket& bucket) return 0; } + +int RGWRados::init_bucket_index(rgw_bucket& bucket) +{ + librados::IoCtx index_ctx; // context for new bucket + + int r = open_bucket_index_ctx(bucket, index_ctx); + if (r < 0) + return r; + + string dir_oid = dir_oid_prefix; + dir_oid.append(bucket.marker); + + librados::ObjectWriteOperation op; + op.create(true); + r = cls_rgw_init_index(index_ctx, op, dir_oid); + if (r < 0 && r != -EEXIST) + return r; + + return 0; +} + /** * create a bucket with name bucket and the given list of attrs * returns 0 on success, -ERR# otherwise. @@ -1393,6 +1728,9 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket, map<std::string, bufferlist>& attrs, RGWObjVersionTracker& objv_tracker, obj_version *pobjv, + time_t creation_time, + rgw_bucket *pmaster_bucket, + RGWBucketInfo *pinfo, bool exclusive) { #define MAX_CREATE_RETRIES 20 /* need to bound retries */ @@ -1401,12 +1739,6 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket, ret = select_bucket_placement(bucket.name, bucket); if (ret < 0) return ret; - librados::IoCtx index_ctx; // context for new bucket - - int r = open_bucket_index_ctx(bucket, index_ctx); - if (r < 0) - return r; - bufferlist bl; uint32_t nop = 0; ::encode(nop, bl); @@ -1414,24 +1746,27 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket, const string& pool = zone.domain_root.name; const char *pool_str = pool.c_str(); librados::IoCtx id_io_ctx; - r = rados->ioctx_create(pool_str, id_io_ctx); + int r = rados->ioctx_create(pool_str, id_io_ctx); if (r < 0) return r; - uint64_t iid = instance_id(); - uint64_t bid = next_bucket_id(); - char buf[32]; - snprintf(buf, sizeof(buf), "%llu.%llu", (long long)iid, (long long)bid); - bucket.marker = buf; - bucket.bucket_id = bucket.marker; + if (!pmaster_bucket) { + uint64_t iid = instance_id(); + uint64_t bid = next_bucket_id(); + char buf[32]; + snprintf(buf, sizeof(buf), "%s.%llu.%llu", zone.name.c_str(), (long long)iid, (long long)bid); + bucket.marker = buf; + bucket.bucket_id = bucket.marker; + } else { + bucket.marker = pmaster_bucket->marker; + bucket.bucket_id = pmaster_bucket->bucket_id; + } string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - librados::ObjectWriteOperation op; - op.create(true); - r = cls_rgw_init_index(index_ctx, op, dir_oid); - if (r < 0 && r != -EEXIST) + r = init_bucket_index(bucket); + if (r < 0) return r; if (pobjv) { @@ -1444,11 +1779,20 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket, info.bucket = bucket; info.owner = owner; info.region = region_name; - ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs); + if (!creation_time) + time(&info.creation_time); + else + info.creation_time = creation_time; + ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, 0, &attrs); if (ret == -EEXIST) { + librados::IoCtx index_ctx; // context for new bucket + int r = open_bucket_index_ctx(bucket, index_ctx); + if (r < 0) + return r; + index_ctx.remove(dir_oid); /* we need this for this objv_tracker */ - int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL); + r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL); if (r < 0) { if (r == -ENOENT) { continue; @@ -1457,6 +1801,8 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket, return r; } } + if (pinfo) + *pinfo = info; return ret; } @@ -1474,7 +1820,7 @@ int RGWRados::select_bucket_placement(string& bucket_name, rgw_bucket& bucket) rgw_obj obj(zone.domain_root, avail_pools); - int ret = rgw_get_system_obj(this, NULL, zone.domain_root, avail_pools, map_bl, NULL); + int ret = rgw_get_system_obj(this, NULL, zone.domain_root, avail_pools, map_bl, NULL, NULL); if (ret < 0) { goto read_omap; } @@ -1663,7 +2009,8 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, const string *ptag, list<string> *remove_objs, bool modify_version, - RGWObjVersionTracker *objv_tracker) + RGWObjVersionTracker *objv_tracker, + time_t set_mtime) { rgw_bucket bucket; std::string oid, key; @@ -1696,6 +2043,16 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, objv_tracker->prepare_op_for_write(&op); } + utime_t ut; + if (set_mtime) { + ut = utime_t(set_mtime, 0); + } else { + ut = ceph_clock_now(0); + set_mtime = ut.sec(); + } + + op.mtime(&set_mtime); + if (data) { /* if we want to overwrite the data, we also want to overwrite the xattrs, so just remove the object */ @@ -1749,7 +2106,6 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, string index_tag; uint64_t epoch; int64_t poolid; - utime_t ut; if (state) { index_tag = state->write_tag; @@ -1775,17 +2131,13 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl; } - ut = ceph_clock_now(cct); r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size, ut, etag, content_type, &acl_bl, category, remove_objs); if (r < 0) goto done_cancel; - if (mtime) { - r = io_ctx.stat(oid, NULL, mtime); - if (r < 0) - return r; + *mtime = set_mtime; } return 0; @@ -1870,6 +2222,65 @@ bool RGWRados::aio_completed(void *handle) AioCompletion *c = (AioCompletion *)handle; return c->is_complete(); } + +class RGWRadosPutObj : public RGWGetDataCB +{ + rgw_obj obj; + RGWPutObjProcessor_Atomic *processor; + RGWOpStateSingleOp *opstate; +public: + RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops) : processor(p), opstate(_ops) {} + int handle_data(bufferlist& bl, off_t ofs, off_t len) { + void *handle; + int ret = processor->handle_data(bl, ofs, &handle); + if (ret < 0) + return ret; + + if (opstate) { + /* need to update opstate repository with new state. This is ratelimited, so we're not + * really doing it every time + */ + ret = opstate->renew_state(); + if (ret < 0) { + /* could not renew state! might have been marked as cancelled */ + return ret; + } + } + + ret = processor->throttle_data(handle); + if (ret < 0) + return ret; + + return 0; + } + + void set_extra_data_len(uint64_t len) { + RGWGetDataCB::set_extra_data_len(len); + processor->set_extra_data_len(len); + } + + int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) { + return processor->complete(etag, mtime, set_mtime, attrs); + } +}; + +/* + * prepare attrset, either replace it with new attrs, or keep it (other than acls). + */ +static void set_copy_attrs(map<string, bufferlist>& src_attrs, map<string, bufferlist>& attrs, bool replace_attrs, bool intra_region) +{ + if (replace_attrs) { + if (!attrs[RGW_ATTR_ETAG].length()) + attrs[RGW_ATTR_ETAG] = src_attrs[RGW_ATTR_ETAG]; + + src_attrs = attrs; + } else { + /* copying attrs from source, however acls should only be copied if it's intra-region operation */ + if (!intra_region) + src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL]; + } +} + /** * Copy an object. * dest_obj: the object to copy into @@ -1880,9 +2291,14 @@ bool RGWRados::aio_completed(void *handle) */ int RGWRados::copy_obj(void *ctx, const string& user_id, + const string& client_id, + const string& op_id, + req_info *info, + const string& source_zone, rgw_obj& dest_obj, rgw_obj& src_obj, RGWBucketInfo& dest_bucket_info, + RGWBucketInfo& src_bucket_info, time_t *mtime, const time_t *mod_ptr, const time_t *unmod_ptr, @@ -1900,32 +2316,119 @@ int RGWRados::copy_obj(void *ctx, rgw_obj shadow_obj = dest_obj; string shadow_oid; + bool remote_src; + bool remote_dest; + append_rand_alpha(cct, dest_obj.object, shadow_oid, 32); shadow_obj.init_ns(dest_obj.bucket, shadow_oid, shadow_ns); + remote_dest = ((dest_bucket_info.region.empty() && !region.is_master) || + (dest_bucket_info.region != region.name)); + + remote_src = ((src_bucket_info.region.empty() && !region.is_master) || + (src_bucket_info.region != region.name)); + + if (remote_src && remote_dest) { + ldout(cct, 0) << "ERROR: can't copy object when both src and dest buckets are remote" << dendl; + return -EINVAL; + } + ldout(cct, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.object << " => " << dest_obj.bucket << ":" << dest_obj.object << dendl; void *handle = NULL; - map<string, bufferlist> attrset; + map<string, bufferlist> src_attrs; off_t ofs = 0; off_t end = -1; - ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset, - mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err); + if (!remote_src && source_zone.empty()) { + ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &src_attrs, + mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err); + if (ret < 0) + return ret; + } else { + /* source is in a different region, copy it there */ - if (ret < 0) - return ret; + RGWRESTStreamReadRequest *in_stream_req; + string tag; + append_rand_alpha(cct, tag, tag, 32); - if (replace_attrs) { - if (!attrs[RGW_ATTR_ETAG].length()) - attrs[RGW_ATTR_ETAG] = attrset[RGW_ATTR_ETAG]; + RGWPutObjProcessor_Atomic processor(dest_obj.bucket, dest_obj.object, + cct->_conf->rgw_obj_stripe_size, tag); + ret = processor.prepare(this, ctx); + if (ret < 0) + return ret; - attrset = attrs; - } else { - /* copying attrs from source, however acls should not be copied */ - attrset[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL]; + RGWRESTConn *conn; + if (source_zone.empty()) { + conn = rest_master_conn; + } else { + map<string, RGWRESTConn *>::iterator iter = zone_conn_map.find(source_zone); + if (iter == zone_conn_map.end()) { + ldout(cct, 0) << "could not find zone connection to zone: " << source_zone << dendl; + return -ENOENT; + } + conn = iter->second; + } + + string obj_name = dest_obj.bucket.name + "/" + dest_obj.object; + + RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name); + + int ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + return ret; + } + RGWRadosPutObj cb(&processor, &opstate); + string etag; + map<string, string> req_headers; + time_t set_mtime; + + ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req); + if (ret < 0) + goto set_err_state; + + ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers); + if (ret < 0) + goto set_err_state; + + { /* opening scope so that we can do goto, sorry */ + bufferlist& extra_data_bl = processor.get_extra_data(); + if (extra_data_bl.length()) { + JSONParser jp; + if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { + ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; + goto set_err_state; + } + + JSONDecoder::decode_json("attrs", src_attrs, &jp); + + src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout + } + } + + set_copy_attrs(src_attrs, attrs, replace_attrs, !source_zone.empty()); + + ret = cb.complete(etag, mtime, set_mtime, src_attrs); + if (ret < 0) + goto set_err_state; + + ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + } + + return 0; +set_err_state: + int r = opstate.set_state(RGWOpState::OPSTATE_ERROR); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl; + } + return ret; } + set_copy_attrs(src_attrs, attrs, replace_attrs, false); + RGWObjManifest manifest; RGWObjState *astate = NULL; RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx); @@ -1953,15 +2456,15 @@ int RGWRados::copy_obj(void *ctx, } - if ((dest_bucket_info.region.empty() && !region.is_master) || - (dest_bucket_info.region != region.name)) { + if (remote_dest) { /* dest is in a different region, copy it there */ map<string, bufferlist> src_attrs; + string etag; - RGWRESTStreamRequest *out_stream_req; - - int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, attrset, &out_stream_req); + RGWRESTStreamWriteRequest *out_stream_req; + + int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req); if (ret < 0) return ret; @@ -1969,15 +2472,13 @@ int RGWRados::copy_obj(void *ctx, if (ret < 0) return ret; - ret = rest_conn->complete_request(out_stream_req); + ret = rest_master_conn->complete_request(out_stream_req, etag, mtime); if (ret < 0) return ret; return 0; - } - - if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */ - return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, attrset, category, ptag, err); + } else if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */ + return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, src_attrs, category, ptag, err); } map<uint64_t, RGWObjManifestPart>::iterator miter = astate->manifest.objs.begin(); @@ -2052,7 +2553,7 @@ int RGWRados::copy_obj(void *ctx, ep.manifest = pmanifest; ep.ptag = &tag; - ret = put_obj_meta(ctx, dest_obj, end + 1, attrset, category, PUT_OBJ_CREATE, ep); + ret = put_obj_meta(ctx, dest_obj, end + 1, src_attrs, category, PUT_OBJ_CREATE, ep); if (mtime) obj_stat(ctx, dest_obj, NULL, mtime, NULL, NULL, NULL, NULL); @@ -2220,7 +2721,7 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner) RGWBucketInfo info; map<string, bufferlist> attrs; RGWObjVersionTracker objv_tracker; - int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, &attrs); + int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL, &attrs); if (r < 0) { ldout(cct, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl; return r; @@ -2228,7 +2729,7 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner) info.owner = owner.get_id(); - r = put_bucket_info(bucket.name, info, false, &objv_tracker, &attrs); + r = put_bucket_info(bucket.name, info, false, &objv_tracker, 0, &attrs); if (r < 0) { ldout(cct, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl; return r; @@ -2254,7 +2755,7 @@ int RGWRados::set_buckets_enabled(vector<rgw_bucket>& buckets, bool enabled) RGWBucketInfo info; RGWObjVersionTracker objv_tracker; map<string, bufferlist> attrs; - int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, &attrs); + int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL, &attrs); if (r < 0) { ldout(cct, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl; ret = r; @@ -2266,7 +2767,7 @@ int RGWRados::set_buckets_enabled(vector<rgw_bucket>& buckets, bool enabled) info.flags |= BUCKET_SUSPENDED; } - r = put_bucket_info(bucket.name, info, false, &objv_tracker, &attrs); + r = put_bucket_info(bucket.name, info, false, &objv_tracker, 0, &attrs); if (r < 0) { ldout(cct, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl; ret = r; @@ -2279,7 +2780,7 @@ int RGWRados::set_buckets_enabled(vector<rgw_bucket>& buckets, bool enabled) int RGWRados::bucket_suspended(rgw_bucket& bucket, bool *suspended) { RGWBucketInfo bucket_info; - int ret = get_bucket_info(NULL, bucket.name, bucket_info, NULL); + int ret = get_bucket_info(NULL, bucket.name, bucket_info, NULL, NULL); if (ret < 0) { return ret; } @@ -3550,7 +4051,11 @@ void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg) for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) { bufferlist& bl = *iter; - d->client_cb->handle_data(bl, 0, bl.length()); + int r = d->client_cb->handle_data(bl, 0, bl.length()); + if (r < 0) { + d->set_cancelled(r); + break; + } } done_unlock: @@ -3569,10 +4074,16 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx); ObjectReadOperation op; struct get_obj_data *d = (struct get_obj_data *)arg; + string oid, key; + rgw_bucket bucket; + bufferlist *pbl; + AioCompletion *c; + + int r; if (is_head_obj) { /* only when reading from the head object do we need to do the atomic test */ - int r = append_atomic_test(rctx, obj, op, &astate); + r = append_atomic_test(rctx, obj, op, &astate); if (r < 0) return r; @@ -3581,8 +4092,10 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len); d->data_lock.Lock(); - d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); + r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); d->data_lock.Unlock(); + if (r < 0) + return r; d->lock.Lock(); d->total_read += chunk_len; @@ -3596,33 +4109,35 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate, } } - string oid, key; - rgw_bucket bucket; get_obj_bucket_and_oid_key(obj, bucket, oid, key); - bufferlist *pbl; - AioCompletion *c; - - d->add_io(obj_ofs, len, &pbl, &c); - d->throttle.get(len); if (d->is_cancelled()) { return d->get_err_code(); } + /* add io after we check that we're not cancelled, otherwise we're going to have trouble + * cleaning up + */ + d->add_io(obj_ofs, len, &pbl, &c); + ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; op.read(read_ofs, len, pbl, NULL); librados::IoCtx io_ctx(d->io_ctx); io_ctx.locator_set_key(key); - int r = io_ctx.aio_operate(oid, c, &op, NULL); + r = io_ctx.aio_operate(oid, c, &op, NULL); ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl; + if (r < 0) + goto done_err; - if (r < 0) { - d->set_cancelled(r); - d->cancel_io(obj_ofs); - } + return 0; + +done_err: + ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl; + d->set_cancelled(r); + d->cancel_io(obj_ofs); return r; } @@ -3643,6 +4158,7 @@ int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj, int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data); if (r < 0) { + data->cancel_all_io(); goto done; } @@ -3838,11 +4354,12 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_ return 0; } -int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs) +int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, RGWObjVersionTracker *objv_tracker, + time_t *pmtime, map<string, bufferlist> *pattrs) { bufferlist bl; - int ret = rgw_get_system_obj(this, ctx, zone.domain_root, bucket_name, bl, objv_tracker, pattrs); + int ret = rgw_get_system_obj(this, ctx, zone.domain_root, bucket_name, bl, objv_tracker, pmtime, pattrs); if (ret < 0) { info.bucket.name = bucket_name; /* only init this field */ return ret; @@ -3861,13 +4378,14 @@ int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& inf return 0; } -int RGWRados::put_bucket_info(string& bucket_name, RGWBucketInfo& info, bool exclusive, RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs) +int RGWRados::put_bucket_info(string& bucket_name, RGWBucketInfo& info, bool exclusive, RGWObjVersionTracker *objv_tracker, + time_t mtime, map<string, bufferlist> *pattrs) { bufferlist bl; ::encode(info, bl); - int ret = rgw_bucket_store_info(this, info.bucket.name, bl, exclusive, pattrs, objv_tracker); + int ret = rgw_bucket_store_info(this, info.bucket.name, bl, exclusive, pattrs, objv_tracker, mtime); return ret; } @@ -4730,6 +5248,271 @@ int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid, return ret; } + +void RGWStateLog::oid_str(int shard, string& oid) { + oid = RGW_STATELOG_OBJ_PREFIX + module_name + "."; + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard); + oid += buf; +} + +int RGWStateLog::get_shard_num(const string& object) { + uint32_t val = ceph_str_hash_linux(object.c_str(), object.length()); + return val % num_shards; +} + +string RGWStateLog::get_oid(const string& object) { + int shard = get_shard_num(object); + string oid; + oid_str(shard, oid); + return oid; +} + +int RGWStateLog::open_ioctx(librados::IoCtx& ioctx) { + string pool_name; + store->get_log_pool_name(pool_name); + int r = store->rados->ioctx_create(pool_name.c_str(), ioctx); + if (r < 0) { + lderr(store->ctx()) << "ERROR: could not open rados pool" << dendl; + return r; + } + return 0; +} + +int RGWStateLog::store_entry(const string& client_id, const string& op_id, const string& object, + uint32_t state, bufferlist *bl, uint32_t *check_state) +{ + if (client_id.empty() || + op_id.empty() || + object.empty()) { + ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl; + } + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx); + if (r < 0) + return r; + + string oid = get_oid(object); + + librados::ObjectWriteOperation op; + if (check_state) { + cls_statelog_check_state(op, client_id, op_id, object, *check_state); + } + utime_t ts = ceph_clock_now(store->ctx()); + bufferlist nobl; + cls_statelog_add(op, client_id, op_id, object, ts, state, (bl ? *bl : nobl)); + r = ioctx.operate(oid, &op); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWStateLog::remove_entry(const string& client_id, const string& op_id, const string& object) +{ + if (client_id.empty() || + op_id.empty() || + object.empty()) { + ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl; + } + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx); + if (r < 0) + return r; + + string oid = get_oid(object); + + librados::ObjectWriteOperation op; + cls_statelog_remove_by_object(op, object, op_id); + r = ioctx.operate(oid, &op); + if (r < 0) { + return r; + } + + return 0; +} + +void RGWStateLog::init_list_entries(const string& client_id, const string& op_id, const string& object, + void **handle) +{ + list_state *state = new list_state; + state->client_id = client_id; + state->op_id = op_id; + state->object = object; + if (object.empty()) { + state->cur_shard = 0; + state->max_shard = num_shards - 1; + } else { + state->cur_shard = state->max_shard = get_shard_num(object); + } + *handle = (void *)state; +} + +int RGWStateLog::list_entries(void *handle, int max_entries, + list<cls_statelog_entry>& entries, + bool *done) +{ + list_state *state = (list_state *)handle; + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx); + if (r < 0) + return r; + + entries.clear(); + + for (; state->cur_shard <= state->max_shard && max_entries > 0; ++state->cur_shard) { + string oid; + oid_str(state->cur_shard, oid); + + librados::ObjectReadOperation op; + list<cls_statelog_entry> ents; + bool truncated; + cls_statelog_list(op, state->client_id, state->op_id, state->object, state->marker, + max_entries, ents, &state->marker, &truncated); + bufferlist ibl; + r = ioctx.operate(oid, &op, &ibl); + if (r == -ENOENT) { + truncated = false; + r = 0; + } + if (r < 0) { + ldout(store->ctx(), 0) << "cls_statelog_list returned " << r << dendl; + return r; + } + + if (!truncated) { + state->marker.clear(); + } + + max_entries -= ents.size(); + + entries.splice(entries.end(), ents); + + if (truncated) + break; + } + + *done = (state->cur_shard > state->max_shard); + + return 0; +} + +void RGWStateLog::finish_list_entries(void *handle) +{ + list_state *state = (list_state *)handle; + delete state; +} + +void RGWStateLog::dump_entry(const cls_statelog_entry& entry, Formatter *f) +{ + f->open_object_section("statelog_entry"); + f->dump_string("client_id", entry.client_id); + f->dump_string("op_id", entry.op_id); + f->dump_string("object", entry.object); + entry.timestamp.gmtime(f->dump_stream("timestamp")); + if (!dump_entry_internal(entry, f)) { + f->dump_int("state", entry.state); + } + f->close_section(); +} + +RGWOpState::RGWOpState(RGWRados *_store) : RGWStateLog(_store, _store->ctx()->_conf->rgw_num_zone_opstate_shards, string("obj_opstate")) +{ +} + +bool RGWOpState::dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) +{ + string s; + switch ((OpState)entry.state) { + case OPSTATE_UNKNOWN: + s = "unknown"; + break; + case OPSTATE_IN_PROGRESS: + s = "in-progress"; + break; + case OPSTATE_COMPLETE: + s = "complete"; + break; + case OPSTATE_ERROR: + s = "error"; + break; + case OPSTATE_ABORT: + s = "abort"; + break; + case OPSTATE_CANCELLED: + s = "cancelled"; + break; + default: + s = "invalid"; + } + f->dump_string("state", s); + return true; +} + +int RGWOpState::state_from_str(const string& s, OpState *state) +{ + if (s == "unknown") { + *state = OPSTATE_UNKNOWN; + } else if (s == "in-progress") { + *state = OPSTATE_IN_PROGRESS; + } else if (s == "complete") { + *state = OPSTATE_COMPLETE; + } else if (s == "error") { + *state = OPSTATE_ERROR; + } else if (s == "abort") { + *state = OPSTATE_ABORT; + } else if (s == "cancelled") { + *state = OPSTATE_CANCELLED; + } else { + return -EINVAL; + } + + return 0; +} + +int RGWOpState::set_state(const string& client_id, const string& op_id, const string& object, OpState state) +{ + uint32_t s = (uint32_t)state; + return store_entry(client_id, op_id, object, s, NULL, NULL); +} + +int RGWOpState::renew_state(const string& client_id, const string& op_id, const string& object, OpState state) +{ + uint32_t s = (uint32_t)state; + return store_entry(client_id, op_id, object, s, NULL, &s); +} + +RGWOpStateSingleOp::RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, + const string& obj) : os(store), client_id(cid), op_id(oid), object(obj) +{ + cct = store->ctx(); + cur_state = RGWOpState::OPSTATE_UNKNOWN; +} + +int RGWOpStateSingleOp::set_state(RGWOpState::OpState state) { + last_update = ceph_clock_now(cct); + cur_state = state; + return os.set_state(client_id, op_id, object, state); +} + +int RGWOpStateSingleOp::renew_state() { + utime_t now = ceph_clock_now(cct); + + int rate_limit_sec = cct->_conf->rgw_opstate_ratelimit_sec; + + if (rate_limit_sec && now - last_update < rate_limit_sec) { + return 0; + } + + last_update = now; + return os.renew_state(client_id, op_id, object, cur_state); +} + + uint64_t RGWRados::instance_id() { return rados->get_instance_id(); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index d6897823794..1a78c6d6a9f 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -8,6 +8,7 @@ #include "cls/rgw/cls_rgw_types.h" #include "cls/version/cls_version_types.h" #include "cls/log/cls_log_types.h" +#include "cls/statelog/cls_statelog_types.h" #include "rgw_log.h" #include "rgw_metadata.h" #include "rgw_rest_conn.h" @@ -24,7 +25,6 @@ class RGWGC; #define RGW_OBJ_NS_MULTIPART "multipart" #define RGW_OBJ_NS_SHADOW "shadow" -#define RGW_INDEX_LOCK_NAME "rgw_process" static inline void prepend_bucket_marker(rgw_bucket& bucket, string& orig_oid, string& oid) { @@ -44,6 +44,8 @@ static inline void get_obj_bucket_and_oid_key(rgw_obj& obj, rgw_bucket& bucket, prepend_bucket_marker(bucket, obj.key, key); } +int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy); + struct RGWUsageBatch { map<utime_t, rgw_usage_log_entry> m; @@ -62,9 +64,15 @@ struct RGWUsageIter { }; class RGWGetDataCB { +protected: + uint64_t extra_data_len; public: virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0; + RGWGetDataCB() : extra_data_len(0) {} virtual ~RGWGetDataCB() {} + virtual void set_extra_data_len(uint64_t len) { + extra_data_len = len; + } }; class RGWAccessListFilter { @@ -170,6 +178,131 @@ struct RGWUploadPartInfo { }; WRITE_CLASS_ENCODER(RGWUploadPartInfo) +class RGWPutObjProcessor +{ +protected: + RGWRados *store; + void *obj_ctx; + bool is_complete; + + virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) = 0; + + list<rgw_obj> objs; + + void add_obj(rgw_obj& obj) { + objs.push_back(obj); + } +public: + RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {} + virtual ~RGWPutObjProcessor(); + virtual int prepare(RGWRados *_store, void *_o) { + store = _store; + obj_ctx = _o; + return 0; + }; + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0; + virtual int throttle_data(void *handle) = 0; + virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs); +}; + +class RGWPutObjProcessor_Plain : public RGWPutObjProcessor +{ + rgw_bucket bucket; + string obj_str; + + bufferlist data; + rgw_obj obj; + off_t ofs; + +protected: + int prepare(RGWRados *store, void *obj_ctx); + int handle_data(bufferlist& bl, off_t ofs, void **phandle); + int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs); + +public: + int throttle_data(void *handle) { return 0; } + RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {} +}; + +struct put_obj_aio_info { + void *handle; +}; + +class RGWPutObjProcessor_Aio : public RGWPutObjProcessor +{ + list<struct put_obj_aio_info> pending; + size_t max_chunks; + + struct put_obj_aio_info pop_pending(); + int wait_pending_front(); + bool pending_has_completed(); + int drain_pending(); + +protected: + uint64_t obj_len; + + int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle); + +public: + int throttle_data(void *handle); + + RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} + virtual ~RGWPutObjProcessor_Aio() { + drain_pending(); + } +}; + +class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio +{ + bufferlist first_chunk; + uint64_t part_size; + off_t cur_part_ofs; + off_t next_part_ofs; + int cur_part_id; + off_t data_ofs; + + uint64_t extra_data_len; + bufferlist extra_data_bl; + bufferlist pending_data_bl; +protected: + rgw_bucket bucket; + string obj_str; + + string unique_tag; + + string oid_prefix; + rgw_obj head_obj; + rgw_obj cur_obj; + RGWObjManifest manifest; + + virtual bool immutable_head() { return false; } + + int write_data(bufferlist& bl, off_t ofs, void **phandle); + virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs); + + void prepare_next_part(off_t ofs); + void complete_parts(); + +public: + ~RGWPutObjProcessor_Atomic() {} + RGWPutObjProcessor_Atomic(rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t) : part_size(_p), + cur_part_ofs(0), + next_part_ofs(_p), + cur_part_id(0), + data_ofs(0), + extra_data_len(0), + bucket(_b), + obj_str(_o), + unique_tag(_t) {} + int prepare(RGWRados *store, void *obj_ctx); + void set_extra_data_len(uint64_t len) { + extra_data_len = len; + } + int handle_data(bufferlist& bl, off_t ofs, void **phandle); + bufferlist& get_extra_data() { return extra_data_bl; } +}; + + struct RGWObjState { bool is_atomic; bool has_attrs; @@ -433,10 +566,118 @@ struct RGWRegionMap { WRITE_CLASS_ENCODER(RGWRegionMap); class RGWDataChangesLog; +class RGWReplicaLogger; +class RGWStateLog { + RGWRados *store; + int num_shards; + string module_name; + + void oid_str(int shard, string& oid); + int get_shard_num(const string& object); + string get_oid(const string& object); + int open_ioctx(librados::IoCtx& ioctx); + + struct list_state { + int cur_shard; + int max_shard; + string marker; + string client_id; + string op_id; + string object; + + list_state() : cur_shard(0), max_shard(0) {} + }; + +protected: + virtual bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) { + return false; + } + +public: + RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) : + store(_store), num_shards(_num_shards), module_name(_module_name) {} + virtual ~RGWStateLog() {} + + int store_entry(const string& client_id, const string& op_id, const string& object, + uint32_t state, bufferlist *bl, uint32_t *check_state); + + int remove_entry(const string& client_id, const string& op_id, const string& object); + + void init_list_entries(const string& client_id, const string& op_id, const string& object, + void **handle); + + int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries, bool *done); + + void finish_list_entries(void *handle); + + virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f); +}; + +/* + * state transitions: + * + * unknown -> in-progress -> complete + * -> error + * + * user can try setting the 'abort' state, and it can only succeed if state is + * in-progress. + * + * state renewal cannot switch state (stays in the same state) + * + * rgw can switch from in-progress to complete + * rgw can switch from in-progress to error + * + * rgw can switch from abort to cancelled + * + */ + +class RGWOpState : public RGWStateLog { +protected: + bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f); +public: + + enum OpState { + OPSTATE_UNKNOWN = 0, + OPSTATE_IN_PROGRESS = 1, + OPSTATE_COMPLETE = 2, + OPSTATE_ERROR = 3, + OPSTATE_ABORT = 4, + OPSTATE_CANCELLED = 5, + }; + + RGWOpState(RGWRados *_store); + + int state_from_str(const string& s, OpState *state); + int set_state(const string& client_id, const string& op_id, const string& object, OpState state); + int renew_state(const string& client_id, const string& op_id, const string& object, OpState state); +}; + +class RGWOpStateSingleOp +{ + RGWOpState os; + string client_id; + string op_id; + string object; + + CephContext *cct; + + RGWOpState::OpState cur_state; + utime_t last_update; + +public: + RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj); + + int set_state(RGWOpState::OpState state); + int renew_state(); +}; + + class RGWRados { friend class RGWGC; + friend class RGWStateLog; + friend class RGWReplicaLogger; /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); @@ -521,7 +762,6 @@ class RGWRados int complete_atomic_overwrite(RGWRadosCtx *rctx, RGWObjState *state, rgw_obj& obj); int update_placement_map(); - int select_bucket_placement(std::string& bucket_name, rgw_bucket& bucket); int store_bucket_info(RGWBucketInfo& info, map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive); protected: @@ -541,7 +781,7 @@ public: bucket_id_lock("rados_bucket_id"), max_bucket_id(0), cct(NULL), rados(NULL), pools_initialized(false), - rest_conn(NULL), + rest_master_conn(NULL), meta_mgr(NULL), data_log(NULL) {} void set_context(CephContext *_cct) { @@ -559,7 +799,8 @@ public: RGWRegion region; RGWZoneParams zone; RGWRegionMap region_map; - RGWRegionConnection *rest_conn; + RGWRESTConn *rest_master_conn; + map<string, RGWRESTConn *> zone_conn_map; RGWMetadataManager *meta_mgr; @@ -643,11 +884,16 @@ public: * create a bucket with name bucket and the given list of attrs * returns 0 on success, -ERR# otherwise. */ + virtual int init_bucket_index(rgw_bucket& bucket); + int select_bucket_placement(std::string& bucket_name, rgw_bucket& bucket); virtual int create_bucket(string& owner, rgw_bucket& bucket, const string& region_name, map<std::string,bufferlist>& attrs, RGWObjVersionTracker& objv_tracker, obj_version *pobjv, + time_t creation_time, + rgw_bucket *master_bucket, + RGWBucketInfo *pinfo, bool exclusive = true); virtual int add_bucket_placement(std::string& new_pool); virtual int remove_bucket_placement(std::string& new_pool); @@ -663,11 +909,12 @@ public: list<string> *remove_objs; bool modify_version; RGWObjVersionTracker *objv_tracker; + time_t set_mtime; PutObjMetaExtraParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL), remove_objs(NULL), modify_version(false), - objv_tracker(NULL) {} + objv_tracker(NULL), set_mtime(0) {} }; /** Write/overwrite an object to the bucket storage. */ @@ -675,21 +922,22 @@ public: map<std::string, bufferlist>& attrs, RGWObjCategory category, int flags, map<std::string, bufferlist>* rmattrs, const bufferlist *data, RGWObjManifest *manifest, const string *ptag, list<string> *remove_objs, - bool modify_version, RGWObjVersionTracker *objv_tracker); + bool modify_version, RGWObjVersionTracker *objv_tracker, + time_t set_mtime /* 0 for don't set */); - virtual int put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, + virtual int put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mtime, map<std::string, bufferlist>& attrs, RGWObjCategory category, int flags, const bufferlist *data = NULL) { - return put_obj_meta_impl(ctx, obj, size, NULL, attrs, category, flags, + return put_obj_meta_impl(ctx, obj, size, mtime, attrs, category, flags, NULL, data, NULL, NULL, NULL, - false, NULL); + false, NULL, 0); } virtual int put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, map<std::string, bufferlist>& attrs, RGWObjCategory category, int flags, PutObjMetaExtraParams& params) { return put_obj_meta_impl(ctx, obj, size, params.mtime, attrs, category, flags, params.rmattrs, params.data, params.manifest, params.ptag, params.remove_objs, - params.modify_version, params.objv_tracker); + params.modify_version, params.objv_tracker, params.set_mtime); } virtual int put_obj_data(void *ctx, rgw_obj& obj, const char *data, @@ -698,7 +946,8 @@ public: off_t ofs, bool exclusive, void **handle); /* note that put_obj doesn't set category on an object, only use it for none user objects */ int put_system_obj(void *ctx, rgw_obj& obj, const char *data, size_t len, bool exclusive, - time_t *mtime, map<std::string, bufferlist>& attrs, RGWObjVersionTracker *objv_tracker) { + time_t *mtime, map<std::string, bufferlist>& attrs, RGWObjVersionTracker *objv_tracker, + time_t set_mtime) { bufferlist bl; bl.append(data, len); int flags = PUT_OBJ_CREATE; @@ -710,6 +959,7 @@ public: ep.data = &bl; ep.modify_version = true; ep.objv_tracker = objv_tracker; + ep.set_mtime = set_mtime; int ret = put_obj_meta(ctx, obj, len, attrs, RGW_OBJ_CATEGORY_NONE, flags, ep); return ret; @@ -761,9 +1011,14 @@ public: */ virtual int copy_obj(void *ctx, const string& user_id, + const string& client_id, + const string& op_id, + req_info *info, + const string& source_zone, rgw_obj& dest_obj, rgw_obj& src_obj, RGWBucketInfo& dest_bucket_info, + RGWBucketInfo& src_bucket_info, time_t *mtime, const time_t *mod_ptr, const time_t *unmod_ptr, @@ -935,8 +1190,10 @@ public: int decode_policy(bufferlist& bl, ACLOwner *owner); int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWBucketStats>& stats); - virtual int get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs = NULL); - virtual int put_bucket_info(string& bucket_name, RGWBucketInfo& info, bool exclusive, RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs); + virtual int get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, RGWObjVersionTracker *objv_tracker, + time_t *pmtime, map<string, bufferlist> *pattrs = NULL); + virtual int put_bucket_info(string& bucket_name, RGWBucketInfo& info, bool exclusive, RGWObjVersionTracker *objv_tracker, + time_t mtime, map<string, bufferlist> *pattrs); int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid); int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, @@ -984,8 +1241,8 @@ public: int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated); int time_log_trim(const string& oid, utime_t& start_time, utime_t& end_time); - int lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& owner_id); - int unlock(rgw_bucket& pool, const string& oid, string& owner_id); + int lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& zone_id, string& owner_id); + int unlock(rgw_bucket& pool, const string& oid, string& zone_id, string& owner_id); /// clean up/process any temporary objects older than given date[/time] int remove_temp_objects(string date, string time); @@ -1011,6 +1268,11 @@ public: return s; } + + void get_log_pool_name(string& name) { + name = zone.log_pool.name; + } + private: int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc new file mode 100644 index 00000000000..9934902aef9 --- /dev/null +++ b/src/rgw/rgw_replica_log.cc @@ -0,0 +1,107 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "rgw_replica_log.h" +#include "cls/replica_log/cls_replica_log_client.h" +#include "rgw_rados.h" + +RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : + cct(_store->cct), store(_store) {} + +int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool) +{ + int r = store->rados->ioctx_create(pool.c_str(), ctx); + if (r < 0) { + lderr(cct) << "ERROR: could not open rados pool " + << pool << dendl; + } + return r; +} + +int RGWReplicaLogger::update_bound(const string& oid, const string& pool, + const string& daemon_id, + const string& marker, const utime_t& time, + const list<pair<string, utime_t> > *entries) +{ + cls_replica_log_progress_marker progress; + cls_replica_log_prepare_marker(progress, daemon_id, marker, time, + entries); + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + cls_replica_log_update_bound(opw, progress); + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::delete_bound(const string& oid, const string& pool, + const string& daemon_id) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + cls_replica_log_delete_bound(opw, daemon_id); + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::get_bounds(const string& oid, const string& pool, + string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers); +} + +void RGWReplicaLogger::get_bound_info( + const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t time, + list<pair<string, utime_t> >& entries) { + cls_replica_log_extract_marker(progress, entity, marker, time, entries); +} + +RGWReplicaObjectLogger:: +RGWReplicaObjectLogger(RGWRados *_store, + const string& _pool, + const string& _prefix) : RGWReplicaLogger(_store), + pool(_pool), prefix(_prefix) { + if (pool.empty()) + store->get_log_pool_name(pool); +} + +int RGWReplicaObjectLogger::create_log_objects(int shards) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + for (int i = 0; i < shards; ++i) { + string oid; + get_shard_oid(i, oid); + r = ioctx.create(oid, false); + if (r < 0) + return r; + } + return r; +} diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h new file mode 100644 index 00000000000..fd461c2340f --- /dev/null +++ b/src/rgw/rgw_replica_log.h @@ -0,0 +1,113 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef RGW_REPLICA_LOG_H_ +#define RGW_REPLICA_LOG_H_ + +#include <string> +#include "cls/replica_log/cls_replica_log_types.h" +#include "include/types.h" +#include "include/utime.h" +#include "include/rados/librados.hpp" +#include "rgw_common.h" + +class RGWRados; +class CephContext; + +using namespace std; + +#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog." +#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog." + +class RGWReplicaLogger { +protected: + CephContext *cct; + RGWRados *store; + int open_ioctx(librados::IoCtx& ctx, const string& pool); + + RGWReplicaLogger(RGWRados *_store); + + int update_bound(const string& oid, const string& pool, + const string& daemon_id, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries); + int delete_bound(const string& oid, const string& pool, + const string& daemon_id); + int get_bounds(const string& oid, const string& pool, + string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers); + +public: + static void get_bound_info(const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t time, + list<pair<string, utime_t> >& entries); +}; + +class RGWReplicaObjectLogger : private RGWReplicaLogger { + string pool; + string prefix; + + void get_shard_oid(int id, string& oid) { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", id); + oid = prefix + buf; + } + +public: + RGWReplicaObjectLogger(RGWRados *_store, + const string& _pool, + const string& _prefix); + + int create_log_objects(int shards); + int update_bound(int shard, const string& daemon_id, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::update_bound(oid, pool, + daemon_id, marker, time, entries); + } + int delete_bound(int shard, const string& daemon_id) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::delete_bound(oid, pool, + daemon_id); + } + int get_bounds(int shard, string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::get_bounds(oid, pool, + marker, oldest_time, markers); + } +}; + +class RGWReplicaBucketLogger : private RGWReplicaLogger { +public: + RGWReplicaBucketLogger(RGWRados *_store) : + RGWReplicaLogger(_store) {} + int update_bound(const rgw_bucket& bucket, const string& daemon_id, + const string& marker, const utime_t& time, + const list<pair<string, utime_t> > *entries) { + return RGWReplicaLogger::update_bound(bucket.name, bucket.index_pool, + daemon_id, marker, time, entries); + } + int delete_bound(const rgw_bucket& bucket, const string& daemon_id) { + return RGWReplicaLogger::delete_bound(bucket.name, bucket.index_pool, + daemon_id); + } + int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) { + return RGWReplicaLogger::get_bounds(bucket.name, bucket.index_pool, + marker, oldest_time, markers); + } +}; + +#endif /* RGW_REPLICA_LOG_H_ */ diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 76461433774..b1578989d4b 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -302,7 +302,7 @@ void dump_redirect(struct req_state *s, const string& redirect) s->cio->print("Location: %s\n", redirect.c_str()); } -void dump_last_modified(struct req_state *s, time_t t) +static void dump_time_header(struct req_state *s, const char *name, time_t t) { char timestr[TIME_BUF_SIZE]; @@ -314,7 +314,23 @@ void dump_last_modified(struct req_state *s, time_t t) if (strftime(timestr, sizeof(timestr), "%a, %d %b %Y %H:%M:%S %Z", tmp) == 0) return; - int r = s->cio->print("Last-Modified: %s\n", timestr); + int r = s->cio->print("%s: %s\n", name, timestr); + if (r < 0) { + ldout(s->cct, 0) << "ERROR: s->cio->print() returned err=" << r << dendl; + } +} + +void dump_last_modified(struct req_state *s, time_t t) +{ + dump_time_header(s, "Last-Modified", t); +} + +void dump_epoch_header(struct req_state *s, const char *name, time_t t) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%lld", (long long)t); + + int r = s->cio->print("%s: %s\n", name, buf); if (r < 0) { ldout(s->cct, 0) << "ERROR: s->cio->print() returned err=" << r << dendl; } @@ -943,111 +959,6 @@ int RGWRESTOp::verify_permission() return check_caps(s->user.caps); } -static void line_unfold(const char *line, string& sdest) -{ - char dest[strlen(line) + 1]; - const char *p = line; - char *d = dest; - - while (isspace(*p)) - ++p; - - bool last_space = false; - - while (*p) { - switch (*p) { - case '\n': - case '\r': - *d = ' '; - if (!last_space) - ++d; - last_space = true; - break; - default: - *d = *p; - ++d; - last_space = false; - break; - } - ++p; - } - *d = 0; - sdest = dest; -} - -struct str_len { - const char *str; - int len; -}; - -#define STR_LEN_ENTRY(s) { s, sizeof(s) - 1 } - -struct str_len meta_prefixes[] = { STR_LEN_ENTRY("HTTP_X_AMZ"), - STR_LEN_ENTRY("HTTP_X_GOOG"), - STR_LEN_ENTRY("HTTP_X_DHO"), - STR_LEN_ENTRY("HTTP_X_RGW"), - STR_LEN_ENTRY("HTTP_X_OBJECT"), - STR_LEN_ENTRY("HTTP_X_CONTAINER"), - {NULL, 0} }; - -static int init_meta_info(struct req_state *s) -{ - const char *p; - - s->info.x_meta_map.clear(); - - const char **envp = s->cio->envp(); - - for (int i=0; (p = envp[i]); ++i) { - const char *prefix; - for (int prefix_num = 0; (prefix = meta_prefixes[prefix_num].str) != NULL; prefix_num++) { - int len = meta_prefixes[prefix_num].len; - if (strncmp(p, prefix, len) == 0) { - dout(10) << "meta>> " << p << dendl; - const char *name = p+len; /* skip the prefix */ - const char *eq = strchr(name, '='); - if (!eq) /* shouldn't happen! */ - continue; - int name_len = eq - name; - - if (strncmp(name, "_META_", name_len) == 0) - s->has_bad_meta = true; - - char name_low[meta_prefixes[0].len + name_len + 1]; - snprintf(name_low, meta_prefixes[0].len - 5 + name_len + 1, "%s%s", meta_prefixes[0].str + 5 /* skip HTTP_ */, name); // normalize meta prefix - int j; - for (j = 0; name_low[j]; j++) { - if (name_low[j] != '_') - name_low[j] = tolower(name_low[j]); - else - name_low[j] = '-'; - } - name_low[j] = 0; - string val; - line_unfold(eq + 1, val); - - map<string, string>::iterator iter; - iter = s->info.x_meta_map.find(name_low); - if (iter != s->info.x_meta_map.end()) { - string old = iter->second; - int pos = old.find_last_not_of(" \t"); /* get rid of any whitespaces after the value */ - old = old.substr(0, pos + 1); - old.append(","); - old.append(val); - s->info.x_meta_map[name_low] = old; - } else { - s->info.x_meta_map[name_low] = val; - } - } - } - } - map<string, string>::iterator iter; - for (iter = s->info.x_meta_map.begin(); iter != s->info.x_meta_map.end(); ++iter) { - dout(10) << "x>> " << iter->first << ":" << iter->second << dendl; - } - - return 0; -} int RGWHandler_ObjStore::allocate_formatter(struct req_state *s, int default_type, bool configurable) { @@ -1290,7 +1201,7 @@ int RGWREST::preprocess(struct req_state *s, RGWClientIO *cio) } s->op = op_from_method(info.method); - init_meta_info(s); + info.init_meta_info(&s->has_bad_meta); return 0; } diff --git a/src/rgw/rgw_rest.h b/src/rgw/rgw_rest.h index e3b20bd7160..0b7204fe9cb 100644 --- a/src/rgw/rgw_rest.h +++ b/src/rgw/rgw_rest.h @@ -314,6 +314,7 @@ extern void list_all_buckets_start(struct req_state *s); extern void dump_owner(struct req_state *s, string& id, string& name, const char *section = NULL); extern void dump_content_length(struct req_state *s, uint64_t len); extern void dump_etag(struct req_state *s, const char *etag); +extern void dump_epoch_header(struct req_state *s, const char *name, time_t t); extern void dump_last_modified(struct req_state *s, time_t t); extern void abort_early(struct req_state *s, int err); extern void dump_range(struct req_state *s, uint64_t ofs, uint64_t end, uint64_t total_size); diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index c7e11a95841..f7479effa76 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -6,6 +6,7 @@ #include "common/ceph_crypto_cms.h" #include "common/armor.h" +#include "common/strtol.h" #define dout_subsys ceph_subsys_rgw @@ -33,17 +34,29 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len) l++; if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) { - status = atoi(l); + http_status = atoi(l); + if (http_status == 100) /* 100-continue response */ + continue; + status = rgw_http_error_to_errno(http_status); } else { /* convert header field name to upper case */ char *src = tok; char buf[len + 1]; size_t i; for (i = 0; i < len && *src; ++i, ++src) { - buf[i] = toupper(*src); + switch (*src) { + case '-': + buf[i] = '_'; + break; + default: + buf[i] = toupper(*src); + } } buf[i] = '\0'; out_headers[buf] = l; + int r = handle_header(buf, l); + if (r < 0) + return r; } } } @@ -100,7 +113,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const c if (r < 0) return r; - return rgw_http_error_to_errno(status); + return status; } int RGWRESTSimpleRequest::send_data(void *ptr, size_t len) @@ -245,13 +258,13 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz outbl->claim(response); } - return rgw_http_error_to_errno(status); + return status; } class RGWRESTStreamOutCB : public RGWGetDataCB { - RGWRESTStreamRequest *req; + RGWRESTStreamWriteRequest *req; public: - RGWRESTStreamOutCB(RGWRESTStreamRequest *_req) : req(_req) {} + RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len); /* callback for object iteration when sending data */ }; @@ -269,14 +282,19 @@ int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) return req->add_output_data(new_bl); } -RGWRESTStreamRequest::~RGWRESTStreamRequest() +RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest() { delete cb; } -int RGWRESTStreamRequest::add_output_data(bufferlist& bl) +int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl) { lock.Lock(); + if (status < 0) { + int ret = status; + lock.Unlock(); + return ret; + } pending_send.push_back(bl); lock.Unlock(); @@ -284,7 +302,77 @@ int RGWRESTStreamRequest::add_output_data(bufferlist& bl) return process_request(handle, false, &done); } -int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs) +static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant) +{ + string& s = grants_by_type[perm]; + + if (!s.empty()) + s.append(", "); + + string id_type_str; + ACLGranteeType& type = grant.get_type(); + switch (type.get_type()) { + case ACL_TYPE_GROUP: + id_type_str = "uri"; + break; + case ACL_TYPE_EMAIL_USER: + id_type_str = "emailAddress"; + break; + default: + id_type_str = "id"; + } + string id; + grant.get_id(id); + s.append(id_type_str + "=\"" + id + "\""); +} + +struct grant_type_to_header { + int type; + const char *header; +}; + +struct grant_type_to_header grants_headers_def[] = { + { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"}, + { RGW_PERM_READ, "x-amz-grant-read"}, + { RGW_PERM_WRITE, "x-amz-grant-write"}, + { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"}, + { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"}, + { 0, NULL} +}; + +static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm) +{ + if ((perm & check_perm) == perm) { + grants_by_type_add_one_grant(grants_by_type, check_perm, grant); + return true; + } + return false; +} + +static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant) +{ + struct grant_type_to_header *t; + + for (t = grants_headers_def; t->header; t++) { + if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type)) + return; + } +} + +static void add_grants_headers(map<int, string>& grants, map<string, string>& attrs, map<string, string>& meta_map) +{ + struct grant_type_to_header *t; + + for (t = grants_headers_def; t->header; t++) { + map<int, string>::iterator iter = grants.find(t->type); + if (iter != grants.end()) { + attrs[t->header] = iter->second; + meta_map[t->header] = iter->second; + } + } +} + +int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs) { string resource = obj.bucket.name + "/" + obj.object; string new_url = url; @@ -311,12 +399,6 @@ int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t new_info.script_uri.append(resource); new_info.request_uri = new_info.script_uri; - int ret = sign_request(key, new_env, new_info); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to sign request" << dendl; - return ret; - } - map<string, string>& m = new_env.get_map(); map<string, bufferlist>::iterator bliter; @@ -324,13 +406,38 @@ int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t for (bliter = attrs.begin(); bliter != attrs.end(); ++bliter) { bufferlist& bl = bliter->second; const string& name = bliter->first; - string val(bl.c_str(), bl.length()); + string val = bl.c_str(); if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) { string header_name = RGW_AMZ_META_PREFIX; header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1)); m[header_name] = val; + new_info.x_meta_map[header_name] = val; } } + RGWAccessControlPolicy policy; + int ret = rgw_policy_from_attrset(cct, attrs, &policy); + if (ret < 0) { + ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl; + return ret; + } + + /* update acl headers */ + RGWAccessControlList& acl = policy.get_acl(); + multimap<string, ACLGrant>& grant_map = acl.get_grant_map(); + multimap<string, ACLGrant>::iterator giter; + map<int, string> grants_by_type; + for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) { + ACLGrant& grant = giter->second; + ACLPermission& perm = grant.get_permission(); + grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant); + } + add_grants_headers(grants_by_type, m, new_info.x_meta_map); + ret = sign_request(key, new_env, new_info); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to sign request" << dendl; + return ret; + } + map<string, string>::iterator iter; for (iter = m.begin(); iter != m.end(); ++iter) { headers.push_back(make_pair<string, string>(iter->first, iter->second)); @@ -347,15 +454,15 @@ int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t return 0; } -int RGWRESTStreamRequest::send_data(void *ptr, size_t len) +int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len) { uint64_t sent = 0; - dout(20) << "RGWRESTStreamRequest::send_data()" << dendl; + dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl; lock.Lock(); - if (pending_send.empty()) { + if (pending_send.empty() || status < 0) { lock.Unlock(); - return 0; + return status; } list<bufferlist>::iterator iter = pending_send.begin(); @@ -392,7 +499,162 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len) } -int RGWRESTStreamRequest::complete() +void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str) +{ + map<string, string>::iterator iter = out_headers.find(header_name); + if (iter != out_headers.end()) { + str = iter->second; + } else { + str.clear(); + } +} + +int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime) { - return complete_request(handle); + int ret = complete_request(handle); + if (ret < 0) + return ret; + + set_str_from_headers(out_headers, "ETAG", etag); + if (mtime) { + string mtime_str; + set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str); + string err; + long t = strict_strtol(mtime_str.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: failed converting mtime (" << mtime_str << ") to int " << dendl; + return -EINVAL; + } + *mtime = (time_t)t; + } + + return status; } + +int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj) +{ + string resource = obj.bucket.name + "/" + obj.object; + string new_url = url; + if (new_url[new_url.size() - 1] != '/') + new_url.append("/"); + + string date_str; + get_new_date_str(cct, date_str); + + RGWEnv new_env; + req_info new_info(cct, &new_env); + + string params_str; + map<string, string>& args = new_info.args.get_params(); + get_params_str(args, params_str); + + new_url.append(resource + params_str); + + new_env.set("HTTP_DATE", date_str.c_str()); + + for (map<string, string>::iterator iter = extra_headers.begin(); + iter != extra_headers.end(); ++iter) { + new_env.set(iter->first.c_str(), iter->second.c_str()); + } + + new_info.method = "GET"; + + new_info.script_uri = "/"; + new_info.script_uri.append(resource); + new_info.request_uri = new_info.script_uri; + + new_info.init_meta_info(NULL); + + int ret = sign_request(key, new_env, new_info); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to sign request" << dendl; + return ret; + } + + map<string, string>& m = new_env.get_map(); + map<string, string>::iterator iter; + for (iter = m.begin(); iter != m.end(); ++iter) { + headers.push_back(make_pair<string, string>(iter->first, iter->second)); + } + + int r = process(new_info.method, new_url.c_str()); + if (r < 0) + return r; + + return 0; +} + +int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime, map<string, string>& attrs) +{ + set_str_from_headers(out_headers, "ETAG", etag); + if (mtime) { + string mtime_str; + set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str); + if (!mtime_str.empty()) { + string err; + long t = strict_strtol(mtime_str.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: failed converting mtime (" << mtime_str << ") to int " << dendl; + return -EINVAL; + } + *mtime = (time_t)t; + } + } + + map<string, string>::iterator iter; + for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) { + const string& attr_name = iter->first; + if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) { + string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1); + const char *src = name.c_str(); + char buf[name.size() + 1]; + char *dest = buf; + for (; *src; ++src, ++dest) { + switch(*src) { + case '_': + *dest = '-'; + break; + default: + *dest = tolower(*src); + } + } + *dest = '\0'; + attrs[buf] = iter->second; + } + } + return status; +} + +int RGWRESTStreamReadRequest::handle_header(const string& name, const string& val) +{ + if (name == "RGWX_EMBEDDED_METADATA_LEN") { + string err; + long len = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl; + return -EINVAL; + } + + cb->set_extra_data_len(len); + } + return 0; +} + +int RGWRESTStreamReadRequest::receive_data(void *ptr, size_t len) +{ + bufferptr bp((const char *)ptr, len); + bufferlist bl; + bl.append(bp); + int ret = cb->handle_data(bl, ofs, len); + if (ret < 0) + return ret; + ofs += len; + return len; +} + +int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len) +{ + /* not sending any data */ + return 0; +} + diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index f3f9f7ff91c..b709fd49a1a 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -11,6 +11,7 @@ class RGWRESTSimpleRequest : public RGWHTTPClient { protected: CephContext *cct; + int http_status; int status; string url; @@ -23,14 +24,16 @@ protected: size_t max_response; /* we need this as we don't stream out response */ bufferlist response; + virtual int handle_header(const string& name, const string& val) { return 0; } void append_param(string& dest, const string& name, const string& val); void get_params_str(map<string, string>& extra_args, string& dest); int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info); public: RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, - list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL), - max_response(0) { + list<pair<string, string> > *_params) : cct(_cct), http_status(0), status(0), + url(_url), send_iter(NULL), + max_response(0) { if (_headers) headers = *_headers; @@ -51,7 +54,7 @@ public: }; -class RGWRESTStreamRequest : public RGWRESTSimpleRequest { +class RGWRESTStreamWriteRequest : public RGWRESTSimpleRequest { Mutex lock; list<bufferlist> pending_send; void *handle; @@ -60,15 +63,38 @@ public: int add_output_data(bufferlist& bl); int send_data(void *ptr, size_t len); - RGWRESTStreamRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, + RGWRESTStreamWriteRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params), - lock("RGWRESTStreamRequest"), handle(NULL), cb(NULL) {} - ~RGWRESTStreamRequest(); + lock("RGWRESTStreamWriteRequest"), handle(NULL), cb(NULL) {} + ~RGWRESTStreamWriteRequest(); int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs); - int complete(); + int complete(string& etag, time_t *mtime); RGWGetDataCB *get_out_cb() { return cb; } }; +class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest { + Mutex lock; + RGWGetDataCB *cb; + bufferlist in_data; + size_t chunk_ofs; + size_t ofs; +protected: + int handle_header(const string& name, const string& val); +public: + int send_data(void *ptr, size_t len); + int receive_data(void *ptr, size_t len); + + RGWRESTStreamReadRequest(CephContext *_cct, string& _url, RGWGetDataCB *_cb, list<pair<string, string> > *_headers, + list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params), + lock("RGWRESTStreamReadRequest"), cb(_cb), + chunk_ofs(0), ofs(0) {} + ~RGWRESTStreamReadRequest() {} + int get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj); + int complete(string& etag, time_t *mtime, map<string, string>& attrs); + + void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } +}; + #endif diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 8dc2c502d53..5caf3ce0bcd 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -3,18 +3,18 @@ #define dout_subsys ceph_subsys_rgw -RGWRegionConnection::RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream) : cct(_cct) +RGWRESTConn::RGWRESTConn(CephContext *_cct, RGWRados *store, list<string>& remote_endpoints) : cct(_cct) { list<string>::iterator iter; int i; - for (i = 0, iter = upstream.endpoints.begin(); iter != upstream.endpoints.end(); ++iter, ++i) { + for (i = 0, iter = remote_endpoints.begin(); iter != remote_endpoints.end(); ++iter, ++i) { endpoints[i] = *iter; } key = store->zone.system_key; region = store->region.name; } -int RGWRegionConnection::get_url(string& endpoint) +int RGWRESTConn::get_url(string& endpoint) { if (endpoints.empty()) { ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl; @@ -27,7 +27,7 @@ int RGWRegionConnection::get_url(string& endpoint) return 0; } -int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl) +int RGWRESTConn::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl) { string url; int ret = get_url(url); @@ -46,8 +46,8 @@ public: StreamObjData(rgw_obj& _obj) : obj(_obj) {} }; -int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, - map<string, bufferlist>& attrs, RGWRESTStreamRequest **req) +int RGWRESTConn::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, + map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req) { string url; int ret = get_url(url); @@ -57,14 +57,57 @@ int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t list<pair<string, string> > params; params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid)); params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region)); - *req = new RGWRESTStreamRequest(cct, url, NULL, ¶ms); + *req = new RGWRESTStreamWriteRequest(cct, url, NULL, ¶ms); return (*req)->put_obj_init(key, obj, obj_size, attrs); } -int RGWRegionConnection::complete_request(RGWRESTStreamRequest *req) +int RGWRESTConn::complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime) { - int ret = req->complete(); + int ret = req->complete(etag, mtime); delete req; return ret; } + +int RGWRESTConn::get_obj(const string& uid, req_info *info /* optional */, rgw_obj& obj, bool prepend_metadata, + RGWGetDataCB *cb, RGWRESTStreamReadRequest **req) +{ + string url; + int ret = get_url(url); + if (ret < 0) + return ret; + + list<pair<string, string> > params; + params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid)); + params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region)); + if (prepend_metadata) { + params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "prepend-metadata", region)); + } + *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, ¶ms); + map<string, string> extra_headers; + if (info) { + map<string, string>& orig_map = info->env->get_map(); + + /* add original headers that start with HTTP_X_AMZ_ */ +#define SEARCH_AMZ_PREFIX "HTTP_X_AMZ_" + for (map<string, string>::iterator iter = orig_map.lower_bound(SEARCH_AMZ_PREFIX); iter != orig_map.end(); ++iter) { + const string& name = iter->first; + if (name == "HTTP_X_AMZ_DATE") /* dont forward date from original request */ + continue; + if (name.compare(0, sizeof(SEARCH_AMZ_PREFIX) - 1, "HTTP_X_AMZ_") != 0) + break; + extra_headers[iter->first] = iter->second; + } + } + return (*req)->get_obj(key, extra_headers, obj); +} + +int RGWRESTConn::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime, map<string, string>& attrs) +{ + int ret = req->complete(etag, mtime, attrs); + delete req; + + return ret; +} + + diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 5119cdc250e..6fe572d2cf7 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -5,10 +5,9 @@ class CephContext; class RGWRados; -class RGWRegion; class RGWGetObjData; -class RGWRegionConnection +class RGWRESTConn { CephContext *cct; map<int, string> endpoints; @@ -17,7 +16,7 @@ class RGWRegionConnection atomic_t counter; public: - RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream); + RGWRESTConn(CephContext *_cct, RGWRados *store, list<string>& endpoints); int get_url(string& endpoint); /* sync request */ @@ -25,8 +24,11 @@ public: /* async request */ int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, - map<string, bufferlist>& attrs, RGWRESTStreamRequest **req); - int complete_request(RGWRESTStreamRequest *req); + map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req); + int complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime); + + int get_obj(const string& uid, req_info *info /* optional */, rgw_obj& obj, bool prepend_metadata, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req); + int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime, map<string, string>& attrs); }; #endif diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index b37c88a3ad4..43bb84fc763 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -20,6 +20,7 @@ #include "rgw_client_io.h" #include "common/errno.h" +#define LOG_CLASS_LIST_MAX_ENTRIES (1000) #define dout_subsys ceph_subsys_rgw static int parse_date_str(string& in, utime_t& out) { @@ -37,16 +38,17 @@ static int parse_date_str(string& in, utime_t& out) { void RGWOp_MDLog_List::execute() { string shard = s->info.args.get("id"); - + string max_entries_str = s->info.args.get("max-entries"); string st = s->info.args.get("start-time"), et = s->info.args.get("end-time"), + marker = s->info.args.get("marker"), err; utime_t ut_st, ut_et; void *handle; - int shard_id; + unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES; - shard_id = strict_strtol(shard.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id " << shard << dendl; http_ret = -EINVAL; @@ -63,13 +65,28 @@ void RGWOp_MDLog_List::execute() { return; } + if (!max_entries_str.empty()) { + max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing max-entries " << max_entries_str << dendl; + http_ret = -EINVAL; + return; + } + } + RGWMetadataLog *meta_log = store->meta_mgr->get_log(); - meta_log->init_list_entries(shard_id, ut_st, ut_et, &handle); + meta_log->init_list_entries(shard_id, ut_st, ut_et, marker, &handle); bool truncated; + do { + http_ret = meta_log->list_entries(handle, max_entries, entries, &truncated); + if (http_ret < 0) + break; - http_ret = meta_log->list_entries(handle, 1000, entries, &truncated); + if (!max_entries_str.empty()) + max_entries -= entries.size(); + } while (truncated && (max_entries > 0)); } void RGWOp_MDLog_List::send_response() { @@ -114,11 +131,11 @@ void RGWOp_MDLog_Delete::execute() { err; utime_t ut_st, ut_et; - int shard_id; + unsigned shard_id; http_ret = 0; - shard_id = strict_strtol(shard.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id " << shard << dendl; http_ret = -EINVAL; @@ -144,25 +161,27 @@ void RGWOp_MDLog_Delete::execute() { } void RGWOp_MDLog_Lock::execute() { - string shard_id_str, duration_str, lock_id; - int shard_id; + string shard_id_str, duration_str, locker_id, zone_id; + unsigned shard_id; http_ret = 0; shard_id_str = s->info.args.get("id"); duration_str = s->info.args.get("length"); - lock_id = s->info.args.get("lock_id"); + locker_id = s->info.args.get("locker-id"); + zone_id = s->info.args.get("zone-id"); if (shard_id_str.empty() || (duration_str.empty()) || - lock_id.empty()) { + locker_id.empty() || + zone_id.empty()) { dout(5) << "Error invalid parameter list" << dendl; http_ret = -EINVAL; return; } string err; - shard_id = strict_strtol(shard_id_str.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id param " << shard_id_str << dendl; http_ret = -EINVAL; @@ -170,35 +189,37 @@ void RGWOp_MDLog_Lock::execute() { } RGWMetadataLog *meta_log = store->meta_mgr->get_log(); - int dur; - dur = strict_strtol(duration_str.c_str(), 10, &err); + unsigned dur; + dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err); if (!err.empty() || dur <= 0) { dout(5) << "invalid length param " << duration_str << dendl; http_ret = -EINVAL; return; } utime_t time(dur, 0); - http_ret = meta_log->lock_exclusive(shard_id, time, lock_id); + http_ret = meta_log->lock_exclusive(shard_id, time, zone_id, locker_id); } void RGWOp_MDLog_Unlock::execute() { - string shard_id_str, lock_id; - int shard_id; + string shard_id_str, locker_id, zone_id; + unsigned shard_id; http_ret = 0; shard_id_str = s->info.args.get("id"); - lock_id = s->info.args.get("lock_id"); + locker_id = s->info.args.get("locker-id"); + zone_id = s->info.args.get("zone-id"); if (shard_id_str.empty() || - lock_id.empty()) { + locker_id.empty() || + zone_id.empty()) { dout(5) << "Error invalid parameter list" << dendl; http_ret = -EINVAL; return; } string err; - shard_id = strict_strtol(shard_id_str.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id param " << shard_id_str << dendl; http_ret = -EINVAL; @@ -206,7 +227,7 @@ void RGWOp_MDLog_Unlock::execute() { } RGWMetadataLog *meta_log = store->meta_mgr->get_log(); - http_ret = meta_log->unlock(shard_id, lock_id); + http_ret = meta_log->unlock(shard_id, zone_id, locker_id); } void RGWOp_BILog_List::execute() { @@ -214,7 +235,7 @@ void RGWOp_BILog_List::execute() { marker = s->info.args.get("marker"), max_entries_str = s->info.args.get("max-entries"); RGWBucketInfo bucket_info; - int max_entries; + unsigned max_entries; if (bucket_name.empty()) { dout(5) << "ERROR: bucket not specified" << dendl; @@ -222,19 +243,19 @@ void RGWOp_BILog_List::execute() { return; } - http_ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL); + http_ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL, NULL); if (http_ret < 0) { dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl; return; } bool truncated; - int count = 0; + unsigned count = 0; string err; - max_entries = strict_strtol(max_entries_str.c_str(), 10, &err); + max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err); if (!err.empty()) - max_entries = 1000; + max_entries = LOG_CLASS_LIST_MAX_ENTRIES; send_response(); do { @@ -301,7 +322,7 @@ void RGWOp_BILog_Delete::execute() { http_ret = -EINVAL; return; } - http_ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL); + http_ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL, NULL); if (http_ret < 0) { dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl; return; @@ -318,12 +339,14 @@ void RGWOp_DATALog_List::execute() { string st = s->info.args.get("start-time"), et = s->info.args.get("end-time"), + max_entries_str = s->info.args.get("max-entries"), + marker = s->info.args.get("marker"), err; utime_t ut_st, ut_et; - int shard_id; + unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES; - shard_id = strict_strtol(shard.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id " << shard << dendl; http_ret = -EINVAL; @@ -340,12 +363,25 @@ void RGWOp_DATALog_List::execute() { return; } - string marker; + if (!max_entries_str.empty()) { + max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing max-entries " << max_entries_str << dendl; + http_ret = -EINVAL; + return; + } + } + bool truncated; -#define DATALOG_LIST_MAX_ENTRIES 1000 - - http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et, - DATALOG_LIST_MAX_ENTRIES, entries, marker, &truncated); + do { + http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et, + max_entries, entries, marker, &truncated); + if (http_ret < 0) + break; + + if (!max_entries_str.empty()) + max_entries -= entries.size(); + } while (truncated && (max_entries > 0)); } void RGWOp_DATALog_List::send_response() { @@ -385,67 +421,71 @@ void RGWOp_DATALog_GetShardsInfo::send_response() { } void RGWOp_DATALog_Lock::execute() { - string shard_id_str, duration_str, lock_id; - int shard_id; + string shard_id_str, duration_str, locker_id, zone_id; + unsigned shard_id; http_ret = 0; shard_id_str = s->info.args.get("id"); duration_str = s->info.args.get("length"); - lock_id = s->info.args.get("lock_id"); + locker_id = s->info.args.get("locker-id"); + zone_id = s->info.args.get("zone-id"); if (shard_id_str.empty() || (duration_str.empty()) || - lock_id.empty()) { + locker_id.empty() || + zone_id.empty()) { dout(5) << "Error invalid parameter list" << dendl; http_ret = -EINVAL; return; } string err; - shard_id = strict_strtol(shard_id_str.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id param " << shard_id_str << dendl; http_ret = -EINVAL; return; } - int dur; - dur = strict_strtol(duration_str.c_str(), 10, &err); + unsigned dur; + dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err); if (!err.empty() || dur <= 0) { dout(5) << "invalid length param " << duration_str << dendl; http_ret = -EINVAL; return; } utime_t time(dur, 0); - http_ret = store->data_log->lock_exclusive(shard_id, time, lock_id); + http_ret = store->data_log->lock_exclusive(shard_id, time, zone_id, locker_id); } void RGWOp_DATALog_Unlock::execute() { - string shard_id_str, lock_id; - int shard_id; + string shard_id_str, locker_id, zone_id; + unsigned shard_id; http_ret = 0; shard_id_str = s->info.args.get("id"); - lock_id = s->info.args.get("lock_id"); + locker_id = s->info.args.get("locker-id"); + zone_id = s->info.args.get("zone-id"); if (shard_id_str.empty() || - lock_id.empty()) { + locker_id.empty() || + zone_id.empty()) { dout(5) << "Error invalid parameter list" << dendl; http_ret = -EINVAL; return; } string err; - shard_id = strict_strtol(shard_id_str.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id param " << shard_id_str << dendl; http_ret = -EINVAL; return; } - http_ret = store->data_log->unlock(shard_id, lock_id); + http_ret = store->data_log->unlock(shard_id, zone_id, locker_id); } void RGWOp_DATALog_Delete::execute() { @@ -455,11 +495,11 @@ void RGWOp_DATALog_Delete::execute() { err; utime_t ut_st, ut_et; - int shard_id; + unsigned shard_id; http_ret = 0; - shard_id = strict_strtol(shard.c_str(), 10, &err); + shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id " << shard << dendl; http_ret = -EINVAL; diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 07cd55718fd..bfb23f0be38 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -4,6 +4,7 @@ #include "common/ceph_crypto.h" #include "common/Formatter.h" #include "common/utf8.h" +#include "common/ceph_json.h" #include "rgw_rest.h" #include "rgw_rest_s3.h" @@ -34,7 +35,7 @@ void dump_bucket(struct req_state *s, RGWBucketEnt& obj) { s->formatter->open_object_section("Bucket"); s->formatter->dump_string("Name", obj.bucket.name); - dump_time(s, "CreationDate", &obj.mtime); + dump_time(s, "CreationDate", &obj.creation_time); s->formatter->close_section(); } @@ -73,6 +74,7 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_ string content_type_str; map<string, string> response_attrs; map<string, string>::iterator riter; + bufferlist metadata_bl; if (ret) goto done; @@ -83,6 +85,27 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_ if (range_str) dump_range(s, start, end, s->obj_size); + if (s->system_request && + s->info.args.exists(RGW_SYS_PARAM_PREFIX "prepend-metadata")) { + + /* JSON encode object metadata */ + JSONFormatter jf; + jf.open_object_section("obj_metadata"); + encode_json("attrs", attrs, &jf); + encode_json("mtime", lastmod, &jf); + jf.close_section(); + stringstream ss; + jf.flush(ss); + metadata_bl.append(ss.str()); + s->cio->print("Rgwx-Embedded-Metadata-Len: %lld\r\n", (long long)metadata_bl.length()); + total_len += metadata_bl.length(); + } + + if (s->system_request && lastmod) { + /* we end up dumping mtime in two different methods, a bit redundant */ + dump_epoch_header(s, "Rgwx-Mtime", lastmod); + } + dump_content_length(s, total_len); dump_last_modified(s, lastmod); @@ -143,6 +166,10 @@ done: if (!content_type) content_type = "binary/octet-stream"; end_header(s, content_type); + + if (metadata_bl.length()) { + s->cio->write(metadata_bl.c_str(), metadata_bl.length()); + } sent_header = true; send_data: @@ -417,6 +444,7 @@ void RGWCreateBucket_ObjStore_S3::send_response() f.open_object_section("info"); encode_json("object_ver", objv_tracker.read_version, &f); + encode_json("bucket_info", info, &f); f.close_section(); rgw_flush_formatter_and_reset(s, &f); } @@ -480,59 +508,13 @@ void RGWPutObj_ObjStore_S3::send_response() dump_etag(s, etag.c_str()); dump_content_length(s, 0); } + if (s->system_request && mtime) { + dump_epoch_header(s, "Rgwx-Mtime", mtime); + } dump_errno(s); end_header(s); } -string trim_whitespace(const string& src) -{ - if (src.empty()) { - return string(); - } - - int start = 0; - for (; start != (int)src.size(); start++) { - if (!isspace(src[start])) - break; - } - - int end = src.size() - 1; - if (end <= start) { - return string(); - } - - for (; end > start; end--) { - if (!isspace(src[end])) - break; - } - - return src.substr(start, end - start + 1); -} - -string trim_quotes(const string& val) -{ - string s = trim_whitespace(val); - if (s.size() < 2) - return s; - - int start = 0; - int end = s.size() - 1; - int quotes_count = 0; - - if (s[start] == '"') { - start++; - quotes_count++; - } - if (s[end] == '"') { - end--; - quotes_count++; - } - if (quotes_count == 2) { - return s.substr(start, end - start + 1); - } - return s; -} - /* * parses params in the format: 'first; param1=foo; param2=bar' */ @@ -540,11 +522,11 @@ static void parse_params(const string& params_str, string& first, map<string, st { int pos = params_str.find(';'); if (pos < 0) { - first = trim_whitespace(params_str); + first = rgw_trim_whitespace(params_str); return; } - first = trim_whitespace(params_str.substr(0, pos)); + first = rgw_trim_whitespace(params_str.substr(0, pos)); pos++; @@ -557,11 +539,11 @@ static void parse_params(const string& params_str, string& first, map<string, st int eqpos = param.find('='); if (eqpos > 0) { - string param_name = trim_whitespace(param.substr(0, eqpos)); - string val = trim_quotes(param.substr(eqpos + 1)); + string param_name = rgw_trim_whitespace(param.substr(0, eqpos)); + string val = rgw_trim_quotes(param.substr(eqpos + 1)); params[param_name] = val; } else { - params[trim_whitespace(param)] = ""; + params[rgw_trim_whitespace(param)] = ""; } pos = end + 1; @@ -739,7 +721,7 @@ int RGWPostObj_ObjStore_S3::read_form_part_header(struct post_form_part *part, /* * iterate through fields */ - string line = trim_whitespace(string(bl.c_str(), bl.length())); + string line = rgw_trim_whitespace(string(bl.c_str(), bl.length())); if (line.empty()) break; @@ -774,7 +756,7 @@ bool RGWPostObj_ObjStore_S3::part_str(const string& name, string *val) bufferlist& data = iter->second.data; string str = string(data.c_str(), data.length()); - *val = trim_whitespace(str); + *val = rgw_trim_whitespace(str); return true; } @@ -1258,31 +1240,53 @@ int RGWCopyObj_ObjStore_S3::get_params() if_nomatch = s->info.env->get("HTTP_X_AMZ_COPY_IF_NONE_MATCH"); const char *req_src = s->copy_source; - if (!req_src) + if (!req_src) { + ldout(s->cct, 0) << "copy source is NULL" << dendl; return -EINVAL; + } ret = parse_copy_location(req_src, src_bucket_name, src_object); - if (!ret) - return -EINVAL; + if (!ret) { + ldout(s->cct, 0) << "failed to parse copy location" << dendl; + return -EINVAL; + } dest_bucket_name = s->bucket.name; dest_object = s->object_str; + if (s->system_request) { + source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone"); + if (!source_zone.empty()) { + client_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "client-id"); + op_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "op-id"); + + if (client_id.empty() || op_id.empty()) { + ldout(s->cct, 0) << RGW_SYS_PARAM_PREFIX "client-id or " RGW_SYS_PARAM_PREFIX "op-id were not provided, required for intra-region copy" << dendl; + return -EINVAL; + } + } + } + const char *md_directive = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE"); if (md_directive) { if (strcasecmp(md_directive, "COPY") == 0) { replace_attrs = false; } else if (strcasecmp(md_directive, "REPLACE") == 0) { replace_attrs = true; + } else if (!source_zone.empty()) { + replace_attrs = false; // default for intra-region copy } else { + ldout(s->cct, 0) << "invalid metadata directive" << dendl; return -EINVAL; } } - if ((dest_bucket_name.compare(src_bucket_name) == 0) && + if (source_zone.empty() && + (dest_bucket_name.compare(src_bucket_name) == 0) && (dest_object.compare(src_object) == 0) && !replace_attrs) { /* can only copy object into itself if replacing attrs */ + ldout(s->cct, 0) << "can't copy object into itself if not replacing attrs" << dendl; return -ERR_INVALID_REQUEST; } return 0; diff --git a/src/rgw/rgw_swift.cc b/src/rgw/rgw_swift.cc index 56933b3d722..d42bc579e76 100644 --- a/src/rgw/rgw_swift.cc +++ b/src/rgw/rgw_swift.cc @@ -511,7 +511,7 @@ int RGWSwift::update_user_info(RGWRados *store, struct rgw_swift_auth_info *info user_info.user_id = info->user; user_info.display_name = info->display_name; - int ret = rgw_store_user_info(store, user_info, NULL, NULL, true); + int ret = rgw_store_user_info(store, user_info, NULL, NULL, 0, true); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to store new user's info: ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 3f77c52a0c1..95ae52b7941 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -15,7 +15,7 @@ static map<string, string> ext_mime_map; int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs) + RGWObjVersionTracker *objv_tracker, time_t set_mtime, map<string, bufferlist> *pattrs) { map<string,bufferlist> no_attrs; if (!pattrs) @@ -23,19 +23,19 @@ int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, cons rgw_obj obj(bucket, oid); - int ret = rgwstore->put_system_obj(NULL, obj, data, size, exclusive, NULL, *pattrs, objv_tracker); + int ret = rgwstore->put_system_obj(NULL, obj, data, size, exclusive, NULL, *pattrs, objv_tracker, set_mtime); if (ret == -ENOENT) { ret = rgwstore->create_pool(bucket); if (ret >= 0) - ret = rgwstore->put_system_obj(NULL, obj, data, size, exclusive, NULL, *pattrs, objv_tracker); + ret = rgwstore->put_system_obj(NULL, obj, data, size, exclusive, NULL, *pattrs, objv_tracker, set_mtime); } return ret; } int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs) + RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs) { int ret; struct rgw_err err; @@ -46,7 +46,7 @@ int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, string do { ret = rgwstore->prepare_get_obj(ctx, obj, NULL, NULL, pattrs, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, objv_tracker, &handle, &err); + NULL, pmtime, NULL, NULL, NULL, NULL, objv_tracker, &handle, &err); if (ret < 0) return ret; diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index 5c79525ad8f..0c3ba0559c7 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -12,9 +12,9 @@ class RGWObjVersionTracker; struct obj_version; int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs = NULL); + RGWObjVersionTracker *objv_tracker, time_t set_mtime, map<string, bufferlist> *pattrs = NULL); int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs = NULL); + RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs = NULL); int rgw_tools_init(CephContext *cct); void rgw_tools_cleanup(); diff --git a/src/rgw/rgw_user.cc b/src/rgw/rgw_user.cc index d5e341d4c8f..23cfc418674 100644 --- a/src/rgw/rgw_user.cc +++ b/src/rgw/rgw_user.cc @@ -45,7 +45,8 @@ bool rgw_user_is_authenticated(RGWUserInfo& info) * Save the given user information to storage. * Returns: 0 on success, -ERR# on failure. */ -int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_info, RGWObjVersionTracker *objv_tracker, bool exclusive) +int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_info, + RGWObjVersionTracker *objv_tracker, time_t mtime, bool exclusive) { bufferlist bl; info.encode(bl); @@ -106,7 +107,7 @@ int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_inf ::encode(ui, data_bl); ::encode(info, data_bl); - ret = store->meta_mgr->put_entry(user_meta_handler, info.user_id, data_bl, exclusive, &ot); + ret = store->meta_mgr->put_entry(user_meta_handler, info.user_id, data_bl, exclusive, &ot, mtime); if (ret < 0) return ret; @@ -114,7 +115,7 @@ int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_inf if (!old_info || old_info->user_email.compare(info.user_email) != 0) { /* only if new index changed */ ret = rgw_put_system_obj(store, store->zone.user_email_pool, info.user_email, - link_bl.c_str(), link_bl.length(), exclusive, NULL); + link_bl.c_str(), link_bl.length(), exclusive, NULL, 0); if (ret < 0) return ret; } @@ -129,7 +130,7 @@ int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_inf ret = rgw_put_system_obj(store, store->zone.user_keys_pool, k.id, link_bl.c_str(), link_bl.length(), exclusive, - NULL); + NULL, 0); if (ret < 0) return ret; } @@ -143,7 +144,7 @@ int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_inf ret = rgw_put_system_obj(store, store->zone.user_swift_pool, k.id, link_bl.c_str(), link_bl.length(), exclusive, - NULL); + NULL, 0); if (ret < 0) return ret; } @@ -151,12 +152,13 @@ int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_inf return ret; } -int rgw_get_user_info_from_index(RGWRados *store, string& key, rgw_bucket& bucket, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker) +int rgw_get_user_info_from_index(RGWRados *store, string& key, rgw_bucket& bucket, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker, time_t *pmtime) { bufferlist bl; RGWUID uid; - int ret = rgw_get_system_obj(store, NULL, bucket, key, bl, NULL); + int ret = rgw_get_system_obj(store, NULL, bucket, key, bl, NULL, pmtime); if (ret < 0) return ret; @@ -176,12 +178,13 @@ int rgw_get_user_info_from_index(RGWRados *store, string& key, rgw_bucket& bucke * Given a uid, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -int rgw_get_user_info_by_uid(RGWRados *store, string& uid, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker) +int rgw_get_user_info_by_uid(RGWRados *store, string& uid, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker, time_t *pmtime) { bufferlist bl; RGWUID user_id; - int ret = rgw_get_system_obj(store, NULL, store->zone.user_uid_pool, uid, bl, objv_tracker); + int ret = rgw_get_system_obj(store, NULL, store->zone.user_uid_pool, uid, bl, objv_tracker, pmtime); if (ret < 0) return ret; @@ -207,27 +210,30 @@ int rgw_get_user_info_by_uid(RGWRados *store, string& uid, RGWUserInfo& info, RG * Given an email, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -int rgw_get_user_info_by_email(RGWRados *store, string& email, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker) +int rgw_get_user_info_by_email(RGWRados *store, string& email, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker, time_t *pmtime) { - return rgw_get_user_info_from_index(store, email, store->zone.user_email_pool, info, objv_tracker); + return rgw_get_user_info_from_index(store, email, store->zone.user_email_pool, info, objv_tracker, pmtime); } /** * Given an swift username, finds the user_info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_swift(RGWRados *store, string& swift_name, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker) +extern int rgw_get_user_info_by_swift(RGWRados *store, string& swift_name, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker, time_t *pmtime) { - return rgw_get_user_info_from_index(store, swift_name, store->zone.user_swift_pool, info, objv_tracker); + return rgw_get_user_info_from_index(store, swift_name, store->zone.user_swift_pool, info, objv_tracker, pmtime); } /** * Given an access key, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_access_key(RGWRados *store, string& access_key, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker) +extern int rgw_get_user_info_by_access_key(RGWRados *store, string& access_key, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker, time_t *pmtime) { - return rgw_get_user_info_from_index(store, access_key, store->zone.user_keys_pool, info, objv_tracker); + return rgw_get_user_info_from_index(store, access_key, store->zone.user_keys_pool, info, objv_tracker, pmtime); } int rgw_remove_key_index(RGWRados *store, RGWAccessKey& access_key) @@ -241,7 +247,7 @@ int rgw_remove_uid_index(RGWRados *store, string& uid) { RGWObjVersionTracker objv_tracker; RGWUserInfo info; - int ret = rgw_get_user_info_by_uid(store, uid, info, &objv_tracker); + int ret = rgw_get_user_info_by_uid(store, uid, info, &objv_tracker, NULL); if (ret < 0) return ret; @@ -1564,7 +1570,7 @@ int RGWUser::update(RGWUserAdminOpState& op_state, std::string *err_msg) } if (is_populated()) { - ret = rgw_store_user_info(store, user_info, &old_info, &op_state.objv, false); + ret = rgw_store_user_info(store, user_info, &old_info, &op_state.objv, 0, false); if (ret < 0) { set_err_msg(err_msg, "unable to store user info"); return ret; @@ -1576,7 +1582,7 @@ int RGWUser::update(RGWUserAdminOpState& op_state, std::string *err_msg) return ret; } } else { - ret = rgw_store_user_info(store, user_info, NULL, &op_state.objv, false); + ret = rgw_store_user_info(store, user_info, NULL, &op_state.objv, 0, false); if (ret < 0) { set_err_msg(err_msg, "unable to store user info"); return ret; @@ -2250,8 +2256,9 @@ int RGWUserAdminOp_Caps::remove(RGWRados *store, RGWUserAdminOpState& op_state, class RGWUserMetadataObject : public RGWMetadataObject { RGWUserInfo info; public: - RGWUserMetadataObject(RGWUserInfo& i, obj_version& v) : info(i) { + RGWUserMetadataObject(RGWUserInfo& i, obj_version& v, time_t m) : info(i) { objv = v; + mtime = m; } void dump(Formatter *f) const { @@ -2267,19 +2274,20 @@ public: RGWUserInfo info; RGWObjVersionTracker objv_tracker; + time_t mtime; - int ret = rgw_get_user_info_by_uid(store, entry, info, &objv_tracker); + int ret = rgw_get_user_info_by_uid(store, entry, info, &objv_tracker, &mtime); if (ret < 0) return ret; - RGWUserMetadataObject *mdo = new RGWUserMetadataObject(info, objv_tracker.read_version); + RGWUserMetadataObject *mdo = new RGWUserMetadataObject(info, objv_tracker.read_version, mtime); *obj = mdo; return 0; } - int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, JSONObj *obj) { + int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) { RGWUserInfo info; decode_json_obj(info, obj); @@ -2290,7 +2298,7 @@ public: return ret; - ret = rgw_store_user_info(store, info, &old_info, &objv_tracker, false); + ret = rgw_store_user_info(store, info, &old_info, &objv_tracker, mtime, false); if (ret < 0) return ret; diff --git a/src/rgw/rgw_user.h b/src/rgw/rgw_user.h index 42f8b3e6988..9f21c42e554 100644 --- a/src/rgw/rgw_user.h +++ b/src/rgw/rgw_user.h @@ -51,27 +51,32 @@ extern bool rgw_user_is_authenticated(RGWUserInfo& info); * Save the given user information to storage. * Returns: 0 on success, -ERR# on failure. */ -extern int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_info, RGWObjVersionTracker *objv_tracker, bool exclusive); +extern int rgw_store_user_info(RGWRados *store, RGWUserInfo& info, RGWUserInfo *old_info, + RGWObjVersionTracker *objv_tracker, time_t mtime, bool exclusive); /** * Given an email, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_uid(RGWRados *store, string& user_id, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker = NULL); +extern int rgw_get_user_info_by_uid(RGWRados *store, string& user_id, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker = NULL, time_t *pmtime = NULL); /** * Given an swift username, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_email(RGWRados *store, string& email, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker = NULL); +extern int rgw_get_user_info_by_email(RGWRados *store, string& email, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker = NULL, time_t *pmtime = NULL); /** * Given an swift username, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_swift(RGWRados *store, string& swift_name, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker = NULL); +extern int rgw_get_user_info_by_swift(RGWRados *store, string& swift_name, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker = NULL, time_t *pmtime = NULL); /** * Given an access key, finds the user info associated with it. * returns: 0 on success, -ERR# on failure (including nonexistence) */ -extern int rgw_get_user_info_by_access_key(RGWRados *store, string& access_key, RGWUserInfo& info, RGWObjVersionTracker *objv_tracker = NULL); +extern int rgw_get_user_info_by_access_key(RGWRados *store, string& access_key, RGWUserInfo& info, + RGWObjVersionTracker *objv_tracker = NULL, time_t *pmtime = NULL); /** * Given an RGWUserInfo, deletes the user and its bucket ACLs. */ diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index c44cc6e0d3d..dae5e563a84 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -56,6 +56,17 @@ bilog trim trim bucket index log (use start-marker, end-marker) datalog list list data log datalog trim trim data log + opstate list list stateful operations entries (use client_id, + op_id, object) + opstate set set state on an entry (use client_id, op_id, object) + opstate renewstate renew state on an entry (use client_id, op_id, object) + opstate rmstate remove entry (use client_id, op_id, object) + replicamdlog get get the replica metadata log + replicamdlog delete delete the replica metadata log + replicadatalog get get the replica data log + replicadatalog delete delete the replica data log + replicabucketlog get get the replica bucket log + replicabucketlog delete delete the replica bucket log options: --uid=<id> user id --subuser=<name> subuser name diff --git a/src/test/cls_replica_log/test_cls_replica_log.cc b/src/test/cls_replica_log/test_cls_replica_log.cc new file mode 100644 index 00000000000..eabe0b3860d --- /dev/null +++ b/src/test/cls_replica_log/test_cls_replica_log.cc @@ -0,0 +1,153 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "gtest/gtest.h" +#include "test/librados/test.h" + +#include "cls/replica_log/cls_replica_log_client.h" +#include "cls/replica_log/cls_replica_log_types.h" + +#define SETUP_DATA \ + librados::Rados rados; \ + librados::IoCtx ioctx; \ + string pool_name = get_temp_pool_name(); \ + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); \ + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); \ + string oid = "obj"; \ + ASSERT_EQ(0, ioctx.create(oid, true)); + +#define ADD_MARKER \ + string entity = "tester_entity"; \ + string marker = "tester_marker1"; \ + utime_t time; \ + time.set_from_double(10); \ + list<pair<string, utime_t> > entries; \ + entries.push_back(make_pair("tester_obj1", time)); \ + time.set_from_double(20); \ + cls_replica_log_progress_marker progress; \ + cls_replica_log_prepare_marker(progress, entity, marker, time, &entries); \ + librados::ObjectWriteOperation opw; \ + cls_replica_log_update_bound(opw, progress); \ + ASSERT_EQ(0, ioctx.operate(oid, &opw)); + +TEST(cls_replica_log, test_set_get_marker) +{ + SETUP_DATA + + ADD_MARKER + + string reply_position_marker; + utime_t reply_time; + list<cls_replica_log_progress_marker> return_progress_list; + ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker, + reply_time, return_progress_list)); + + ASSERT_EQ(reply_position_marker, marker); + ASSERT_EQ((double)10, (double)reply_time); + string response_entity; + string response_marker; + utime_t response_time; + list<pair<string, utime_t> > response_item_list; + + cls_replica_log_extract_marker(return_progress_list.front(), + response_entity, response_marker, + response_time, response_item_list); + ASSERT_EQ(response_entity, entity); + ASSERT_EQ(response_marker, marker); + ASSERT_EQ(response_time, time); + ASSERT_EQ((unsigned)1, response_item_list.size()); + ASSERT_EQ("tester_obj1", response_item_list.front().first); +} + +TEST(cls_replica_log, test_bad_update) +{ + SETUP_DATA + + ADD_MARKER + + time.set_from_double(15); + cls_replica_log_progress_marker bad_marker; + cls_replica_log_prepare_marker(bad_marker, entity, marker, time, &entries); + librados::ObjectWriteOperation badw; + cls_replica_log_update_bound(badw, bad_marker); + ASSERT_EQ(-EINVAL, ioctx.operate(oid, &badw)); +} + +TEST(cls_replica_log, test_bad_delete) +{ + SETUP_DATA + + ADD_MARKER + + librados::ObjectWriteOperation badd; + cls_replica_log_delete_bound(badd, entity); + ASSERT_EQ(-ENOTEMPTY, ioctx.operate(oid, &badd)); +} + +TEST(cls_replica_log, test_good_delete) +{ + SETUP_DATA + + ADD_MARKER + + librados::ObjectWriteOperation opc; + progress.items.clear(); + cls_replica_log_update_bound(opc, progress); + ASSERT_EQ(0, ioctx.operate(oid, &opc)); + librados::ObjectWriteOperation opd; + cls_replica_log_delete_bound(opd, entity); + ASSERT_EQ(0, ioctx.operate(oid, &opd)); + + string reply_position_marker; + utime_t reply_time; + list<cls_replica_log_progress_marker> return_progress_list; + ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker, + reply_time, return_progress_list)); + ASSERT_EQ((unsigned)0, return_progress_list.size()); +} + +TEST(cls_replica_log, test_bad_get) +{ + SETUP_DATA + + string reply_position_marker; + utime_t reply_time; + list<cls_replica_log_progress_marker> return_progress_list; + ASSERT_EQ(-ENOENT, + cls_replica_log_get_bounds(ioctx, oid, reply_position_marker, + reply_time, return_progress_list)); +} + +TEST(cls_replica_log, test_double_delete) +{ + SETUP_DATA + + ADD_MARKER + + librados::ObjectWriteOperation opc; + progress.items.clear(); + cls_replica_log_update_bound(opc, progress); + ASSERT_EQ(0, ioctx.operate(oid, &opc)); + librados::ObjectWriteOperation opd; + cls_replica_log_delete_bound(opd, entity); + ASSERT_EQ(0, ioctx.operate(oid, &opd)); + + librados::ObjectWriteOperation opd2; + cls_replica_log_delete_bound(opd2, entity); + ASSERT_EQ(0, ioctx.operate(oid, &opd2)); + + string reply_position_marker; + utime_t reply_time; + list<cls_replica_log_progress_marker> return_progress_list; + ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker, + reply_time, return_progress_list)); + ASSERT_EQ((unsigned)0, return_progress_list.size()); + +} diff --git a/src/test/cls_statelog/test_cls_statelog.cc b/src/test/cls_statelog/test_cls_statelog.cc new file mode 100644 index 00000000000..294b528f5db --- /dev/null +++ b/src/test/cls_statelog/test_cls_statelog.cc @@ -0,0 +1,207 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" +#include "cls/statelog/cls_statelog_types.h" +#include "cls/statelog/cls_statelog_client.h" + +#include "include/utime.h" +#include "common/Clock.h" +#include "global/global_context.h" + +#include "gtest/gtest.h" +#include "test/librados/test.h" + +#include <errno.h> +#include <string> +#include <vector> + +static librados::ObjectWriteOperation *new_op() { + return new librados::ObjectWriteOperation(); +} + +static librados::ObjectReadOperation *new_rop() { + return new librados::ObjectReadOperation(); +} + +static void reset_op(librados::ObjectWriteOperation **pop) { + delete *pop; + *pop = new_op(); +} +static void reset_rop(librados::ObjectReadOperation **pop) { + delete *pop; + *pop = new_rop(); +} + +void add_log(librados::ObjectWriteOperation *op, const string& client_id, const string& op_id, string& obj, uint32_t state) +{ + bufferlist bl; + ::encode(state, bl); + + utime_t ts = ceph_clock_now(g_ceph_context); + + cls_statelog_add(*op, client_id, op_id, obj, ts, state, bl); +} + +void next_op_id(string& op_id, int *id) +{ + char buf[16]; + snprintf(buf, sizeof(buf), "%d", *id); + op_id = buf; + (*id)++; +} + +static string get_obj_name(int num) +{ + char buf[16]; + snprintf(buf, sizeof(buf), "obj-%d", num); + return string(buf); +} + +static void get_entries_by_object(librados::IoCtx& ioctx, string& oid, + list<cls_statelog_entry>& entries, string& object, string& op_id, int expected) +{ + /* search everything */ + string empty_str, marker; + + librados::ObjectReadOperation *rop = new_rop(); + bufferlist obl; + bool truncated; + cls_statelog_list(*rop, empty_str, op_id, object, marker, 0, entries, &marker, &truncated); + ASSERT_EQ(0, ioctx.operate(oid, rop, &obl)); + ASSERT_EQ(expected, (int)entries.size()); +} + +static void get_entries_by_client_id(librados::IoCtx& ioctx, string& oid, + list<cls_statelog_entry>& entries, string& client_id, string& op_id, int expected) +{ + /* search everything */ + string empty_str, marker; + + librados::ObjectReadOperation *rop = new_rop(); + bufferlist obl; + bool truncated; + cls_statelog_list(*rop, client_id, op_id, empty_str, marker, 0, entries, &marker, &truncated); + ASSERT_EQ(0, ioctx.operate(oid, rop, &obl)); + ASSERT_EQ(expected, (int)entries.size()); +} + +static void get_all_entries(librados::IoCtx& ioctx, string& oid, list<cls_statelog_entry>& entries, int expected) +{ + /* search everything */ + string object, op_id; + get_entries_by_object(ioctx, oid, entries, object, op_id, expected); +} + +TEST(cls_rgw, test_statelog_basic) +{ + librados::Rados rados; + librados::IoCtx ioctx; + string pool_name = get_temp_pool_name(); + + /* create pool */ + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + + string oid = "obj"; + + /* create object */ + ASSERT_EQ(0, ioctx.create(oid, true)); + + int id = 0; + string client_id[] = { "client-1", "client-2" }; + + int num_ops = 10; + string op_ids[num_ops]; + + librados::ObjectWriteOperation *op = new_op(); + + for (int i = 0; i < num_ops; i++) { + next_op_id(op_ids[i], &id); + string obj = get_obj_name(i / 2); + string cid = client_id[i / (num_ops / 2)]; + add_log(op, cid, op_ids[i], obj, i /* just for testing */); + } + ASSERT_EQ(0, ioctx.operate(oid, op)); + + librados::ObjectReadOperation *rop = new_rop(); + + list<cls_statelog_entry> entries; + bool truncated; + + /* check list by client_id */ + + int total_count = 0; + for (int j = 0; j < 2; j++) { + string marker; + string obj; + string cid = client_id[j]; + string op_id; + + bufferlist obl; + + cls_statelog_list(*rop, cid, op_id, obj, marker, 1, entries, &marker, &truncated); + ASSERT_EQ(0, ioctx.operate(oid, rop, &obl)); + ASSERT_EQ(1, (int)entries.size()); + + reset_rop(&rop); + marker.clear(); + cls_statelog_list(*rop, cid, op_id, obj, marker, 0, entries, &marker, &truncated); + obl.clear(); + ASSERT_EQ(0, ioctx.operate(oid, rop, &obl)); + + ASSERT_EQ(5, (int)entries.size()); + ASSERT_EQ(0, (int)truncated); + + map<string, string> emap; + for (list<cls_statelog_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + ASSERT_EQ(cid, iter->client_id); + emap[iter->op_id] = iter->object; + } + ASSERT_EQ(5, (int)emap.size()); + /* verify correct object */ + for (int i = 0; i < num_ops / 2; i++, total_count++) { + string ret_obj = emap[op_ids[total_count]]; + string obj = get_obj_name(total_count / 2); + ASSERT_EQ(0, ret_obj.compare(obj)); + } + } + + entries.clear(); + /* now search by object */ + total_count = 0; + for (int i = 0; i < num_ops; i++) { + string marker; + string obj = get_obj_name(i / 2); + string cid; + string op_id; + bufferlist obl; + + reset_rop(&rop); + cls_statelog_list(*rop, cid, op_id, obj, marker, 0, entries, &marker, &truncated); + ASSERT_EQ(0, ioctx.operate(oid, rop, &obl)); + ASSERT_EQ(2, (int)entries.size()); + } + + /* search everything */ + + get_all_entries(ioctx, oid, entries, 10); + + /* now remove an entry */ + cls_statelog_entry e = entries.front(); + entries.pop_front(); + + reset_op(&op); + cls_statelog_remove_by_client(*op, e.client_id, e.op_id); + ASSERT_EQ(0, ioctx.operate(oid, op)); + + get_all_entries(ioctx, oid, entries, 9); + + get_entries_by_object(ioctx, oid, entries, e.object, e.op_id, 0); + get_entries_by_client_id(ioctx, oid, entries, e.client_id, e.op_id, 0); + + string empty_str; + get_entries_by_client_id(ioctx, oid, entries, e.client_id, empty_str, 4); + get_entries_by_object(ioctx, oid, entries, e.object, empty_str, 1); +} + diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index a1d2773a946..969f43d0cb0 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -236,6 +236,15 @@ TYPE(cls_lock_get_info_op) TYPE(cls_lock_get_info_reply) TYPE(cls_lock_list_locks_reply) +#include "cls/replica_log/cls_replica_log_types.h" +TYPE(cls_replica_log_item_marker) +TYPE(cls_replica_log_progress_marker) +TYPE(cls_replica_log_bound) +#include "cls/replica_log/cls_replica_log_ops.h" +TYPE(cls_replica_log_delete_marker_op) +TYPE(cls_replica_log_set_marker_op) +TYPE(cls_replica_log_get_bounds_op) +TYPE(cls_replica_log_get_bounds_ret) // --- messages --- #include "messages/MAuth.h" diff --git a/src/test/test_rgw_admin_log.cc b/src/test/test_rgw_admin_log.cc index 40d0f70c3bc..7c419e7d3c4 100644 --- a/src/test/test_rgw_admin_log.cc +++ b/src/test/test_rgw_admin_log.cc @@ -799,6 +799,16 @@ TEST(TestRGWAdmin, datalog_list) { } ss.str(""); + ss << "/admin/log?type=data&id=" << shard_id << "&start-time=" << start_time + << "&max-entries=1"; + rest_req = ss.str(); + g_test->send_request(string("GET"), rest_req); + EXPECT_EQ(200U, g_test->get_resp_code()); + entries.clear(); + get_datalog_list(entries); + EXPECT_EQ(1U, entries.size()); + + ss.str(""); ss << "/admin/log?type=data&id=" << shard_id << "&start-time=" << start_time << "&end-time=" << end_time; rest_req = ss.str(); @@ -837,92 +847,104 @@ TEST(TestRGWAdmin, datalog_lock_unlock) { ASSERT_EQ(0, user_create(uid, display_name)); ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=data&lock&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&length=3&locker-id=ceph&zone-id=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ + + rest_req = "/admin/log?type=data&lock&id=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=data&lock&id=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&length=3&id=1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=data&lock&length=3&id=1"; + rest_req = "/admin/log?type=data&lock&length=3&id=1&locker-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=data&unlock&id=1"; + rest_req = "/admin/log?type=data&unlock&id=1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=data&unlock&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&locker-id=ceph&id=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ + + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph1"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); utime_t sleep_time(3, 0); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(500U, g_test->get_resp_code()); + + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph1&zone-id=2"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(500U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); - EXPECT_EQ(409U, g_test->get_resp_code()); + EXPECT_EQ(200U, g_test->get_resp_code()); sleep_time.sleep(); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph1"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); perm = "read"; ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); perm = "write"; ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); - rest_req = "/admin/log?type=data&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=data&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); - rest_req = "/admin/log?type=data&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=data&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); @@ -1157,6 +1179,25 @@ TEST(TestRGWAdmin, mdlog_list) { EXPECT_EQ(get_log_list(entries), 0); EXPECT_EQ(entries.size(), 14U); + ss.str(""); + ss << "/admin/log?type=metadata&id=" << shard_id << "&start-time=" << start_time + << "&max-entries=" << 1; + rest_req = ss.str(); + g_test->send_request(string("GET"), rest_req); + EXPECT_EQ(200U, g_test->get_resp_code()); + entries.clear(); + EXPECT_EQ(get_log_list(entries), 0); + EXPECT_EQ(entries.size(), 1U); + + ss.str(""); + ss << "/admin/log?type=metadata&id=" << shard_id << "&start-time=" << start_time + << "&max-entries=" << 6; + rest_req = ss.str(); + g_test->send_request(string("GET"), rest_req); + EXPECT_EQ(200U, g_test->get_resp_code()); + entries.clear(); + EXPECT_EQ(get_log_list(entries), 0); + EXPECT_EQ(entries.size(), 6U); ASSERT_EQ(0, caps_rm(cname, perm)); ss.str(""); @@ -1247,92 +1288,104 @@ TEST(TestRGWAdmin, mdlog_lock_unlock) { ASSERT_EQ(0, user_create(uid, display_name)); ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=metadata&lock&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=metadata&lock&id=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=metadata&lock&length=3&id=1"; + rest_req = "/admin/log?type=metadata&lock&length=3&id=1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=metadata&unlock&id=1"; + rest_req = "/admin/log?type=metadata&lock&id=3&locker-id=ceph&length=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ + + rest_req = "/admin/log?type=metadata&unlock&id=1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=metadata&unlock&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&locker-id=ceph&id=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(400U, g_test->get_resp_code()); /*Bad request*/ + + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph1"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); utime_t sleep_time(3, 0); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(500U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=2"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); - EXPECT_EQ(409U, g_test->get_resp_code()); + EXPECT_EQ(500U, g_test->get_resp_code()); + + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; + g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); + EXPECT_EQ(200U, g_test->get_resp_code()); sleep_time.sleep(); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph1"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph1"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph1&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); perm = "read"; ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); perm = "write"; ASSERT_EQ(0, caps_add(cname, perm)); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(200U, g_test->get_resp_code()); ASSERT_EQ(0, caps_rm(cname, perm)); - rest_req = "/admin/log?type=metadata&lock&id=1&length=3&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&lock&id=1&length=3&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); - rest_req = "/admin/log?type=metadata&unlock&id=1&lock_id=ceph"; + rest_req = "/admin/log?type=metadata&unlock&id=1&locker-id=ceph&zone-id=1"; g_test->send_request(string("POST"), rest_req, read_dummy_post, NULL, sizeof(int)); EXPECT_EQ(403U, g_test->get_resp_code()); |