diff options
31 files changed, 751 insertions, 286 deletions
diff --git a/configure.ac b/configure.ac index 6e54e58f4c2..bdea8b18ceb 100644 --- a/configure.ac +++ b/configure.ac @@ -8,7 +8,7 @@ AC_PREREQ(2.59) # VERSION define is not used by the code. It gets a version string # from 'git describe'; see src/ceph_ver.[ch] -AC_INIT([ceph], [0.68], [ceph-devel@vger.kernel.org]) +AC_INIT([ceph], [0.69], [ceph-devel@vger.kernel.org]) # Create release string. Used with VERSION for RPMs. RPM_RELEASE=0 diff --git a/debian/changelog b/debian/changelog index 874f85b1500..ce73472f9eb 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ceph (0.69-1) precise; urgency=low + + * New upstream release + + -- Gary Lowell <gary.lowell@inktank.com> Wed, 18 Sep 2013 01:39:47 +0000 + ceph (0.68-1) precise; urgency=low * New upstream release diff --git a/doc/install/rpm.rst b/doc/install/rpm.rst index 72934cc8d5e..ea96d394c7a 100644 --- a/doc/install/rpm.rst +++ b/doc/install/rpm.rst @@ -202,9 +202,7 @@ Installing Ceph Object Storage ServerName {FQDN} -#. Create an Apache httpd virtual host in ``/etc/httpd/conf.d/rgw.conf``. - -.. code-block:: ini +#. Create an Apache httpd virtual host in ``/etc/httpd/conf.d/rgw.conf``. :: FastCgiExternalServer /var/www/s3gw.fcgi -socket /tmp/radosgw.sock <VirtualHost *:80> @@ -235,7 +233,7 @@ Installing Ceph Object Storage #FastCgiWrapper On -#. Add a ``fastcgi`` script. :: +#. Add a ``fastcgi`` script with the following path ``/var/www/s3gw.fcgi``. :: #!/bin/sh exec /usr/bin/radosgw -c /etc/ceph/ceph.conf -n client.radosgw.gateway @@ -243,7 +241,7 @@ Installing Ceph Object Storage #. Make ``s3gw.fcgi`` executable:: - chmod +x /var/www/rgw/s3gw.fcgi + chmod +x /var/www/s3gw.fcgi #. Create a user key. :: diff --git a/doc/rados/configuration/ceph-conf.rst b/doc/rados/configuration/ceph-conf.rst index fc316478034..e0633483120 100644 --- a/doc/rados/configuration/ceph-conf.rst +++ b/doc/rados/configuration/ceph-conf.rst @@ -504,12 +504,8 @@ See `General Settings`_, `OSD Settings`_, `Monitor Settings`_, `MDS Settings`_, .. _RGW Settings: ../../../radosgw/config-ref/ .. _Log Settings: ../log-and-debug-ref -When deploying the Ceph configuration file, ensure that you use the cluster name -in your command line syntax. For example:: - ssh myserver01 sudo tee /etc/ceph/openstack.conf < /etc/ceph/openstack.conf - -When creating default directories or files, you should also use the cluster +When creating default directories or files, you should use the cluster name at the appropriate places in the path. For example:: sudo mkdir /var/lib/ceph/osd/openstack-0 @@ -520,10 +516,10 @@ name at the appropriate places in the path. For example:: have monitors using port 6789, use a different port for your other cluster(s). To invoke a cluster other than the default ``ceph`` cluster, use the -``--cluster=clustername`` option with the ``ceph`` command. For example:: - - ceph --cluster=openstack health +``-c {filename}.conf`` option with the ``ceph`` command. For example:: + ceph -c {cluster-name}.conf health + ceph -c openstack.conf health .. _Hardware Recommendations: ../../../install/hardware-recommendations diff --git a/doc/release-notes.rst b/doc/release-notes.rst index accc61e112d..604b4fa296b 100644 --- a/doc/release-notes.rst +++ b/doc/release-notes.rst @@ -2,6 +2,56 @@ Release Notes =============== +v0.69 +----- + +Upgrading +~~~~~~~~~ + +* Users of the librados C++ API should replace users of get_version() + with get_version64() as the old method only returns a 32-bit value + for a 64-bit field. The existing 32-bit get_version() method is now + deprecated. + +* The OSDs are now more picky that request payload match their + declared size. A write operation across N bytes that includes M + bytes of data will now be rejected. No known clients do this, but + the because the server-side behavior has changed it is possible that + an application misusing the interface may now get errors. + +Notable Changes +~~~~~~~~~~~~~~~ + +* build cleanly under clang (Christophe Courtaut) +* common: migrate SharedPtrRegistry to use boost::shared_ptr<> (Loic Dachary) +* doc: erasure coding design notes (Loic Dachary) +* improved intel-optimized crc32c support (~8x faster on my laptop!) +* librados: get_version64() method for C++ API +* mds: fix locking deadlock (David Disseldorp) +* mon, osd: initial CLI for configuring tiering +* mon: allow cap strings with . to be unquoted +* mon: continue to discover peer addr info during election phase +* mon: fix 'osd crush move ...' command for buckets (Joao Luis) +* mon: warn when mon data stores grow very large (Joao Luis) +* objecter, librados: redirect requests based on cache tier config +* osd, librados: add new COPY_FROM rados operation +* osd, librados: add new COPY_GET rados operations (used by COPY_FROM) +* osd: add 'osd heartbeat min healthy ratio' configurable (was hard-coded at 33%) +* osd: add option to disable pg log debug code (which burns CPU) +* osd: allow cap strings with . to be unquoted +* osd: fix version value returned by various operations (Greg Farnum) +* osd: infrastructure to copy objects from other OSDs +* osd: use fdatasync(2) instead of fsync(2) to improve performance (Sam Just) +* rgw: fix major CPU utilization bug with internal caching (Yehuda Sadeh, Mark Nelson) +* rgw: fix ordering of write operations (preventing data loss on crash) (Yehuda Sadeh) +* rgw: fix ordering of writes for mulitpart upload (Yehuda Sadeh) +* rgw: fix various CORS bugs (Yehuda Sadeh) +* rgw: improve help output (Christophe Courtaut) +* rgw: validate S3 tokens against keystone (Roald J. van Loon) +* rgw: wildcard support for keystone roles (Christophe Courtaut) +* sysvinit radosgw: fix status return code (Danny Al-Gaaf) +* sysvinit rbdmap: fix error 'service rbdmap stop' (Laurent Barbe) + v0.68 ----- diff --git a/qa/workunits/mon/crush_ops.sh b/qa/workunits/mon/crush_ops.sh index 4f66e552153..09e49acfbf6 100755 --- a/qa/workunits/mon/crush_ops.sh +++ b/qa/workunits/mon/crush_ops.sh @@ -64,4 +64,8 @@ ceph osd crush rm host2 ceph osd crush rm osd.$o1 ceph osd crush rm osd.$o2 +ceph osd crush add-bucket foo host +ceph osd crush move foo root=default rack=localrack +ceph osd crush rm foo + echo OK diff --git a/src/crush/mapper.c b/src/crush/mapper.c index 3215564172a..ce23ef7c711 100644 --- a/src/crush/mapper.c +++ b/src/crush/mapper.c @@ -562,7 +562,7 @@ int crush_do_rule(const struct crush_map *map, /* copy final _leaf_ values to output set */ memcpy(o, c, osize*sizeof(*o)); - /* swap t and w arrays */ + /* swap o and w arrays */ tmp = o; o = w; w = tmp; diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index 50b1a926957..4169e01325e 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -25,7 +25,7 @@ class MOSDSubOp : public Message { - static const int HEAD_VERSION = 7; + static const int HEAD_VERSION = 8; static const int COMPAT_VERSION = 1; public: @@ -86,6 +86,9 @@ public: // indicates that we must fix hobject_t encoding bool hobject_incorrect_pool; + hobject_t new_temp_oid; ///< new temp object that we must now start tracking + hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking + int get_cost() const { if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL) return ops[0].op.extent.length; @@ -150,6 +153,11 @@ public: poid.pool = pgid.pool(); hobject_incorrect_pool = true; } + + if (header.version >= 8) { + ::decode(new_temp_oid, p); + ::decode(discard_temp_oid, p); + } } virtual void encode_payload(uint64_t features) { @@ -194,6 +202,8 @@ public: ::encode(current_progress, payload); ::encode(omap_entries, payload); ::encode(omap_header, payload); + ::encode(new_temp_oid, payload); + ::encode(discard_temp_oid, payload); } MOSDSubOp() diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 53d7f75e699..365fd28b64e 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -384,7 +384,7 @@ COMMAND("osd crush create-or-move " \ "create entry or move existing entry for <name> <weight> at/to location <args>", \ "osd", "rw", "cli,rest") COMMAND("osd crush move " \ - "name=id,type=CephOsdName " \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "move existing entry for <name> to location <args>", \ "osd", "rw", "cli,rest") diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 8eb88a829b1..36fe6d345f2 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -2846,6 +2846,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m) string args; vector<string> argvec; + cmd_getval(g_ceph_context, cmdmap, "name", name); cmd_getval(g_ceph_context, cmdmap, "args", argvec); map<string,string> loc; parse_loc_map(argvec, &loc); diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 7ef415213b0..5d9e9d1482d 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -246,14 +246,14 @@ void FileStore::lfn_close(FDRef fd) { } -int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) +int FileStore::lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) { Index index_new, index_old; IndexedPath path_new, path_old; int exist; int r; - if (c < cid) { - r = get_index(cid, &index_new); + if (c < newcid) { + r = get_index(newcid, &index_new); if (r < 0) return r; r = get_index(c, &index_old); @@ -263,7 +263,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) r = get_index(c, &index_old); if (r < 0) return r; - r = get_index(cid, &index_new); + r = get_index(newcid, &index_new); if (r < 0) return r; } @@ -276,7 +276,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) if (!exist) return -ENOENT; - r = index_new->lookup(o, &path_new, &exist); + r = index_new->lookup(newoid, &path_new, &exist); if (r < 0) { assert(!m_filestore_fail_eio || r != -EIO); return r; @@ -290,7 +290,7 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) if (r < 0) return -errno; - r = index_new->created(o, path_new->path()); + r = index_new->created(newoid, path_new->path()); if (r < 0) { assert(!m_filestore_fail_eio || r != -EIO); return r; @@ -299,7 +299,8 @@ int FileStore::lfn_link(coll_t c, coll_t cid, const hobject_t& o) } int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, - const SequencerPosition &spos) + const SequencerPosition &spos, + bool force_clear_omap) { Index index; int r = get_index(cid, &index); @@ -315,14 +316,17 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, return r; } - struct stat st; - r = ::stat(path->path(), &st); - if (r < 0) { - r = -errno; - assert(!m_filestore_fail_eio || r != -EIO); - return r; + if (!force_clear_omap) { + struct stat st; + r = ::stat(path->path(), &st); + if (r < 0) { + r = -errno; + assert(!m_filestore_fail_eio || r != -EIO); + return r; + } + force_clear_omap = true; } - if (st.st_nlink == 1) { + if (force_clear_omap) { dout(20) << __func__ << ": clearing omap on " << o << " in cid " << cid << dendl; r = object_map->clear(o, &spos); @@ -2176,6 +2180,16 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n } break; + case Transaction::OP_COLL_MOVE_RENAME: + { + coll_t oldcid = i.get_cid(); + hobject_t oldoid = i.get_oid(); + coll_t newcid = i.get_cid(); + hobject_t newoid = i.get_oid(); + r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos); + } + break; + case Transaction::OP_COLL_SETATTR: { coll_t cid = i.get_cid(); @@ -4086,7 +4100,7 @@ int FileStore::_destroy_collection(coll_t c) int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, - const SequencerPosition& spos) + const SequencerPosition& spos) { dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl; @@ -4116,7 +4130,7 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, _set_replay_guard(**fd, spos, &o, true); } - r = lfn_link(oldcid, c, o); + r = lfn_link(oldcid, c, o, o); if (replaying && !backend->can_checkpoint() && r == -EEXIST) // crashed between link() and set_replay_guard() r = 0; @@ -4133,6 +4147,73 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, return r; } +int FileStore::_collection_move_rename(coll_t oldcid, const hobject_t& oldoid, + coll_t c, const hobject_t& o, + const SequencerPosition& spos) +{ + dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl; + int r = 0; + int dstcmp, srccmp; + + dstcmp = _check_replay_guard(c, o, spos); + if (dstcmp < 0) + goto out_rm_src; + + // check the src name too; it might have a newer guard, and we don't + // want to clobber it + srccmp = _check_replay_guard(oldcid, oldoid, spos); + if (srccmp < 0) + return 0; + + { + // open guard on object so we don't any previous operations on the + // new name that will modify the source inode. + FDRef fd; + r = lfn_open(oldcid, oldoid, 0, &fd); + if (r < 0) { + // the source collection/object does not exist. If we are replaying, we + // should be safe, so just return 0 and move on. + assert(replaying); + dout(10) << __func__ << " " << c << "/" << o << " from " + << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl; + return 0; + } + if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress" + _set_replay_guard(**fd, spos, &o, true); + } + + r = lfn_link(oldcid, c, oldoid, o); + if (replaying && !backend->can_checkpoint() && + r == -EEXIST) // crashed between link() and set_replay_guard() + r = 0; + + _inject_failure(); + + // the name changed; link the omap content + r = object_map->clone(oldoid, o, &spos); + if (r == -ENOENT) + r = 0; + + _inject_failure(); + + // close guard on object so we don't do this again + if (r == 0) { + _close_replay_guard(**fd, spos); + } + lfn_close(fd); + } + + out_rm_src: + // remove source + if (_check_replay_guard(oldcid, oldoid, spos) > 0) { + r = lfn_unlink(oldcid, oldoid, spos, true); + } + + dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid + << " = " << r << dendl; + return r; +} + void FileStore::_inject_failure() { if (m_filestore_kill_at.read()) { diff --git a/src/os/FileStore.h b/src/os/FileStore.h index c603949b399..4f58df4d698 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -298,8 +298,9 @@ public: IndexedPath *path = 0, Index *index = 0); void lfn_close(FDRef fd); - int lfn_link(coll_t c, coll_t cid, const hobject_t& o) ; - int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos); + int lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) ; + int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos, + bool force_clear_omap=false); public: FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false); @@ -499,6 +500,9 @@ public: int _destroy_collection(coll_t c); int _collection_add(coll_t c, coll_t ocid, const hobject_t& o, const SequencerPosition& spos); + int _collection_move_rename(coll_t oldcid, const hobject_t& oldoid, + coll_t c, const hobject_t& o, + const SequencerPosition& spos); void dump_start(const std::string& file); void dump_stop(); void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr); diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 655afee004f..7e8f6ce43bf 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -159,6 +159,7 @@ public: OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination doesn't create the destination */ OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey + OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid }; private: @@ -554,6 +555,15 @@ public: collection_remove(oldcid, oid); return; } + void collection_move_rename(coll_t oldcid, const hobject_t& oldoid, + coll_t cid, const hobject_t& oid) { + __u32 op = OP_COLL_MOVE_RENAME; + ::encode(op, tbl); + ::encode(oldcid, tbl); + ::encode(oldoid, tbl); + ::encode(cid, tbl); + ::encode(oid, tbl); + } void collection_setattr(coll_t cid, const char* name, bufferlist& val) { string n(name); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 20aef0301ec..ff1276969d8 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -134,8 +134,6 @@ static ostream& _prefix(std::ostream* _dout, int whoami, OSDMapRef osdmap) { << " "; } -const coll_t coll_t::META_COLL("meta"); - static CompatSet get_osd_compat_set() { CompatSet::FeatureSet ceph_osd_feature_compat; CompatSet::FeatureSet ceph_osd_feature_ro_compat; @@ -2541,7 +2539,12 @@ void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat) time_t now = ceph_clock_gettime(NULL); - float ratio = ((float)osd_stat.kb_used) / ((float)osd_stat.kb); + // We base ratio on kb_avail rather than kb_used because they can + // differ significantly e.g. on btrfs volumes with a large number of + // chunks reserved for metadata, and for our purposes (avoiding + // completely filling the disk) it's far more important to know how + // much space is available to use than how much we've already used. + float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb); float nearfull_ratio = get_nearfull_ratio(); float full_ratio = get_full_ratio(); cur_ratio = ratio; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index f5f50be4af8..c2f45196870 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -179,8 +179,6 @@ class HistoricOpsSocketHook; class TestOpsSocketHook; struct C_CompleteSplits; -extern const coll_t meta_coll; - typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef; class DeletingState { diff --git a/src/osd/PG.h b/src/osd/PG.h index cbafd0f43d9..cdbe827a4a9 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -523,7 +523,8 @@ protected: list<OpRequestRef> waiting_for_active; list<OpRequestRef> waiting_for_all_missing; map<hobject_t, list<OpRequestRef> > waiting_for_missing_object, - waiting_for_degraded_object; + waiting_for_degraded_object, + waiting_for_blocked_object; // Callbacks should assume pg (and nothing else) is locked map<hobject_t, list<Context*> > callbacks_for_degraded_object; map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3cd5a7ef865..a92403ae370 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -50,6 +50,7 @@ #include "include/compat.h" #include "common/cmdparse.h" +#include "mon/MonClient.h" #include "osdc/Objecter.h" #include "json_spirit/json_spirit_value.h" @@ -191,6 +192,13 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef op->mark_delayed("waiting for degraded object"); } +void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op) +{ + dout(10) << __func__ << " " << soid << " " << op << dendl; + waiting_for_blocked_object[soid].push_back(op); + op->mark_delayed("waiting for blocked object"); +} + void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op) { waiting_for_backfill_pos.push_back(op); @@ -622,7 +630,9 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap, PG(o, curmap, _pool, p, oid, ioid), snapset_contexts_lock("ReplicatedPG::snapset_contexts"), temp_created(false), - temp_coll(coll_t::make_temp_coll(p)), snap_trimmer_machine(this) + temp_coll(coll_t::make_temp_coll(p)), + temp_seq(0), + snap_trimmer_machine(this) { snap_trimmer_machine.initiate(); } @@ -737,7 +747,7 @@ void ReplicatedPG::do_op(OpRequestRef op) osd->reply_op_error(op, r); return; } - + // make sure locator is consistent object_locator_t oloc(obc->obs.oi.soid); if (m->get_object_locator() != oloc) { @@ -748,6 +758,12 @@ void ReplicatedPG::do_op(OpRequestRef op) << " op " << *m << "\n"; } + // io blocked on obc? + if (obc->is_blocked()) { + wait_for_blocked_object(obc->obs.oi.soid, op); + return; + } + if ((op->may_read()) && (obc->obs.oi.lost)) { // This object is lost. Reading from it returns an error. dout(20) << __func__ << ": object " << obc->obs.oi.soid @@ -1028,12 +1044,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // note my stats utime_t now = ceph_clock_now(cct); - // note some basic context for op replication that prepare_transaction may clobber - eversion_t old_last_update = pg_log.get_head(); - bool old_exists = obc->obs.exists; - uint64_t old_size = obc->obs.oi.size; - eversion_t old_version = obc->obs.oi.version; - if (op->may_read()) { dout(10) << " taking ondisk_read_lock" << dendl; obc->ondisk_read_lock(); @@ -1152,7 +1162,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) repop->src_obc.swap(src_obc); // and src_obc. - issue_repop(repop, now, old_last_update, old_exists, old_size, old_version); + issue_repop(repop, now); eval_repop(repop); repop->put(); @@ -3490,7 +3500,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } ::encode(out_omap, osd_op.outdata); - dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl; + dout(20) << " cursor.is_complete=" << cursor.is_complete() + << " " << out_attrs.size() << " attrs" + << " " << bl.length() << " bytes" + << " " << out_omap.size() << " keys" + << dendl; ::encode(cursor, osd_op.outdata); result = 0; } @@ -3511,44 +3525,20 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) result = -EINVAL; goto fail; } - pg_t raw_pg; - get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg); - hobject_t src(src_name, src_oloc.key, src_snapid, - raw_pg.ps(), raw_pg.pool(), - src_oloc.nspace); if (!ctx->copy_op) { // start + pg_t raw_pg; + get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg); + hobject_t src(src_name, src_oloc.key, src_snapid, + raw_pg.ps(), raw_pg.pool(), + src_oloc.nspace); result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op); if (result < 0) goto fail; result = -EINPROGRESS; } else { // finish - CopyOpRef cop = ctx->copy_op; - - if (!obs.exists) { - ctx->delta_stats.num_objects++; - obs.exists = true; - } else { - t.remove(coll, soid); - } - t.write(coll, soid, 0, cop->data.length(), cop->data); - for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) - t.setattr(coll, soid, string("_") + p->first, p->second); - t.omap_setkeys(coll, soid, cop->omap); - - interval_set<uint64_t> ch; - if (oi.size > 0) - ch.insert(0, oi.size); - ctx->modified_ranges.union_of(ch); - - if (cop->data.length() != oi.size) { - ctx->delta_stats.num_bytes -= oi.size; - oi.size = cop->data.length(); - ctx->delta_stats.num_bytes += oi.size; - } - ctx->delta_stats.num_wr++; - ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10); + result = finish_copy(ctx); } } break; @@ -3987,6 +3977,15 @@ coll_t ReplicatedPG::get_temp_coll(ObjectStore::Transaction *t) return temp_coll; } +hobject_t ReplicatedPG::generate_temp_object() +{ + ostringstream ss; + ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq); + hobject_t hoid(object_t(ss.str()), "", CEPH_NOSNAP, 0, -1, ""); + dout(20) << __func__ << " " << hoid << dendl; + return hoid; +} + int ReplicatedPG::prepare_transaction(OpContext *ctx) { assert(!ctx->ops.empty()); @@ -4185,7 +4184,13 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) { dout(10) << __func__ << " " << ctx << " " << cop << dendl; ObjectOperation op; - op.assert_version(cop->version); + if (cop->version) { + op.assert_version(cop->version); + } else { + // we should learn the version after the first chunk, if we didn't know + // it already! + assert(cop->cursor.is_initial()); + } op.copy_get(&cop->cursor, cct->_conf->osd_copyfrom_max_chunk, &cop->size, &cop->mtime, &cop->attrs, &cop->data, &cop->omap, @@ -4195,10 +4200,11 @@ void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) get_last_peering_reset()); osd->objecter_lock.Lock(); tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op, - cop->src.snap, NULL, 0, - new C_OnFinisher(fin, - &osd->objecter_finisher), - NULL); + cop->src.snap, NULL, 0, + new C_OnFinisher(fin, + &osd->objecter_finisher), + // discover the object version if we don't know it yet + cop->version ? NULL : &cop->version); fin->tid = tid; cop->objecter_tid = tid; osd->objecter_lock.Unlock(); @@ -4223,14 +4229,33 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) if (r < 0) { copy_ops.erase(ctx->obc->obs.oi.soid); --ctx->obc->copyfrom_readside; + kick_object_context_blocked(ctx->obc); reply_ctx(ctx, r); return; } assert(cop->rval >= 0); - // FIXME: this is accumulating the entire object in memory. - if (!cop->cursor.is_complete()) { + // write out what we have so far + vector<OSDOp> ops; + tid_t rep_tid = osd->get_tid(); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); + OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this); + tctx->mtime = ceph_clock_now(g_ceph_context); + RepGather *repop = new_repop(tctx, ctx->obc, rep_tid); + + if (cop->temp_cursor.is_initial()) { + cop->temp_coll = get_temp_coll(&tctx->local_t); + cop->temp_oid = generate_temp_object(); + temp_contents.insert(cop->temp_oid); + repop->ctx->new_temp_oid = cop->temp_oid; + } + + _write_copy_chunk(cop, &tctx->op_t); + + issue_repop(repop, repop->ctx->mtime); + eval_repop(repop); + dout(10) << __func__ << " fetching more" << dendl; _copy_some(ctx, cop); return; @@ -4242,6 +4267,73 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) copy_ops.erase(ctx->obc->obs.oi.soid); --ctx->obc->copyfrom_readside; ctx->copy_op.reset(); + kick_object_context_blocked(ctx->obc); +} + +void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t) +{ + dout(20) << __func__ << " " << cop + << " " << cop->attrs.size() << " attrs" + << " " << cop->data.length() << " bytes" + << " " << cop->omap.size() << " keys" + << dendl; + if (!cop->temp_cursor.attr_complete) { + t->touch(cop->temp_coll, cop->temp_oid); + for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) + t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second); + cop->attrs.clear(); + } + if (!cop->temp_cursor.data_complete) { + t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data); + cop->data.clear(); + } + if (!cop->temp_cursor.omap_complete) { + t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap); + cop->omap.clear(); + } + cop->temp_cursor = cop->cursor; +} + +int ReplicatedPG::finish_copy(OpContext *ctx) +{ + CopyOpRef cop = ctx->copy_op; + ObjectState& obs = ctx->new_obs; + ObjectStore::Transaction& t = ctx->op_t; + + if (!obs.exists) { + ctx->delta_stats.num_objects++; + obs.exists = true; + } else { + t.remove(coll, obs.oi.soid); + } + + if (cop->temp_cursor.is_initial()) { + // write directly to final object + cop->temp_coll = coll; + cop->temp_oid = obs.oi.soid; + _write_copy_chunk(cop, &t); + } else { + // finish writing to temp object, then move into place + _write_copy_chunk(cop, &t); + t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid); + temp_contents.erase(cop->temp_oid); + ctx->discard_temp_oid = cop->temp_oid; + } + + interval_set<uint64_t> ch; + if (obs.oi.size > 0) + ch.insert(0, obs.oi.size); + ctx->modified_ranges.union_of(ch); + + if (cop->cursor.data_offset != obs.oi.size) { + ctx->delta_stats.num_bytes -= obs.oi.size; + ctx->delta_stats.num_bytes += obs.oi.size; + obs.oi.size = cop->cursor.data_offset; + } + ctx->delta_stats.num_wr++; + ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10); + + return 0; } void ReplicatedPG::cancel_copy(CopyOpRef cop) @@ -4261,6 +4353,8 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop) --ctx->obc->copyfrom_readside; ctx->copy_op.reset(); + kick_object_context_blocked(ctx->obc); + delete ctx; } @@ -4366,9 +4460,11 @@ void ReplicatedPG::op_applied(RepGather *repop) repop->waitfor_disk.count(whoami) == 0); // commit before ondisk repop->waitfor_ack.erase(whoami); - assert(info.last_update >= repop->v); - assert(last_update_applied < repop->v); - last_update_applied = repop->v; + if (repop->v != eversion_t()) { + assert(info.last_update >= repop->v); + assert(last_update_applied < repop->v); + last_update_applied = repop->v; + } // chunky scrub if (scrubber.active && scrubber.is_chunky) { @@ -4415,9 +4511,10 @@ void ReplicatedPG::op_commit(RepGather *repop) // is no separate reply sent. repop->waitfor_ack.erase(whoami); - last_update_ondisk = repop->v; - - last_complete_ondisk = repop->pg_local_last_complete; + if (repop->v != eversion_t()) { + last_update_ondisk = repop->v; + last_complete_ondisk = repop->pg_local_last_complete; + } eval_repop(repop); } @@ -4568,8 +4665,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) } } -void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, - eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version) +void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) { OpContext *ctx = repop->ctx; const hobject_t& soid = ctx->obs->oi.soid; @@ -4607,34 +4703,31 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, ((static_cast<MOSDOp *>(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { // replicate original op for parallel execution on replica assert(0 == "broken implementation, do not use"); - wr->oloc = object_locator_t(repop->ctx->obs->oi.soid); - wr->ops = repop->ctx->ops; - wr->mtime = repop->ctx->mtime; - wr->old_exists = old_exists; - wr->old_size = old_size; - wr->old_version = old_version; - wr->snapset = repop->obc->ssc->snapset; - wr->snapc = repop->ctx->snapc; - wr->set_data(repop->ctx->op->request->get_data()); // _copy_ bufferlist - } else { - // ship resulting transaction, log entries, and pg_stats - if (peer == backfill_target && soid >= backfill_pos) { - dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos " - << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl; - ObjectStore::Transaction t; - ::encode(t, wr->get_data()); - } else { - ::encode(repop->ctx->op_t, wr->get_data()); - } - ::encode(repop->ctx->log, wr->logbl); + } - if (backfill_target >= 0 && backfill_target == peer) - wr->pg_stats = pinfo.stats; // reflects backfill progress - else - wr->pg_stats = info.stats; + // ship resulting transaction, log entries, and pg_stats + if (peer == backfill_target && soid >= backfill_pos && + soid.pool == (int64_t)info.pgid.pool()) { // only skip normal (not temp pool=-1) objects + dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos " + << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl; + ObjectStore::Transaction t; + ::encode(t, wr->get_data()); + } else { + ::encode(repop->ctx->op_t, wr->get_data()); } + + ::encode(repop->ctx->log, wr->logbl); + + if (backfill_target >= 0 && backfill_target == peer) + wr->pg_stats = pinfo.stats; // reflects backfill progress + else + wr->pg_stats = info.stats; wr->pg_trim_to = pg_trim_to; + + wr->new_temp_oid = repop->ctx->new_temp_oid; + wr->discard_temp_oid = repop->ctx->discard_temp_oid; + osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch()); // keep peer_info up to date @@ -4870,12 +4963,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) 0, osd_reqid_t(), ctx->mtime)); - eversion_t old_last_update = pg_log.get_head(); - bool old_exists = repop->obc->obs.exists; - uint64_t old_size = repop->obc->obs.oi.size; - eversion_t old_version = repop->obc->obs.oi.version; - - obc->obs.oi.prior_version = old_version; + obc->obs.oi.prior_version = repop->obc->obs.oi.version; obc->obs.oi.version = ctx->at_version; bufferlist bl; ::encode(obc->obs.oi, bl); @@ -4884,8 +4972,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch) append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t); // obc ref swallowed by repop! - issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists, - old_size, old_version); + issue_repop(repop, repop->ctx->mtime); eval_repop(repop); } @@ -5129,6 +5216,24 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t pgstat->stats.cat_sum[oi.category].add(stat); } +void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc) +{ + const hobject_t& soid = obc->obs.oi.soid; + map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid); + if (p == waiting_for_blocked_object.end()) + return; + + if (obc->is_blocked()) { + dout(10) << __func__ << " " << soid << " still blocked" << dendl; + return; + } + + list<OpRequestRef>& ls = waiting_for_blocked_object[soid]; + dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl; + requeue_ops(ls); + waiting_for_blocked_object.erase(soid); +} + SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid) { Mutex::Locker l(snapset_contexts_lock); @@ -5238,6 +5343,16 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) bufferlist::iterator p = m->get_data().begin(); + if (m->new_temp_oid != hobject_t()) { + dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; + temp_contents.insert(m->new_temp_oid); + get_temp_coll(&rm->localt); + } + if (m->discard_temp_oid != hobject_t()) { + dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; + temp_contents.erase(m->discard_temp_oid); + } + ::decode(rm->opt, p); if (!(m->get_connection()->get_features() & CEPH_FEATURE_OSD_SNAPMAPPER)) rm->opt.set_tolerate_collection_add_enoent(); @@ -5309,9 +5424,11 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch()); } - assert(info.last_update >= m->version); - assert(last_update_applied < m->version); - last_update_applied = m->version; + if (m->version != eversion_t()) { + assert(info.last_update >= m->version); + assert(last_update_applied < m->version); + last_update_applied = m->version; + } if (scrubber.active_rep_scrub) { if (last_update_applied == scrubber.active_rep_scrub->scrub_to) { osd->rep_scrub_wq.queue(scrubber.active_rep_scrub); @@ -7072,6 +7189,14 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) p->second.clear(); finish_degraded_object(p->first); } + for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin(); + p != waiting_for_blocked_object.end(); + waiting_for_blocked_object.erase(p++)) { + if (is_primary()) + requeue_ops(p->second); + else + p->second.clear(); + } if (is_primary()) requeue_ops(waiting_for_all_missing); @@ -8203,15 +8328,10 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&) dout(10) << "TrimmingObjects react trimming " << pos << dendl; RepGather *repop = pg->trim_object(pos); assert(repop); - repop->queue_snap_trimmer = true; - eversion_t old_last_update = pg->pg_log.get_head(); - bool old_exists = repop->obc->obs.exists; - uint64_t old_size = repop->obc->obs.oi.size; - eversion_t old_version = repop->obc->obs.oi.version; pg->append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t); - pg->issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists, old_size, old_version); + pg->issue_repop(repop, repop->ctx->mtime); pg->eval_repop(repop); repops.insert(repop); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index fef3814d93a..e880bdecade 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -109,6 +109,10 @@ public: map<string,bufferlist> omap; int rval; + coll_t temp_coll; + hobject_t temp_oid; + object_copy_cursor_t temp_cursor; + CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) : ctx(c), src(s), oloc(l), version(v), objecter_tid(0), @@ -180,6 +184,8 @@ public: CopyOpRef copy_op; + hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -279,8 +285,7 @@ protected: void op_applied(RepGather *repop); void op_commit(RepGather *repop); void eval_repop(RepGather*); - void issue_repop(RepGather *repop, utime_t now, - eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version); + void issue_repop(RepGather *repop, utime_t now); RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid); void remove_repop(RepGather *repop); void repop_ack(RepGather *repop, @@ -419,9 +424,6 @@ protected: }; map<hobject_t, PullInfo> pulling; - // Track contents of temp collection, clear on reset - set<hobject_t> temp_contents; - ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info); static void trim_pushed_data(const interval_set<uint64_t> ©_subset, const interval_set<uint64_t> &intervals_received, @@ -793,7 +795,9 @@ protected: int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version, CopyOpRef *pcop); void process_copy_chunk(hobject_t oid, tid_t tid, int r); + void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t); void _copy_some(OpContext *ctx, CopyOpRef cop); + int finish_copy(OpContext *ctx); void cancel_copy(CopyOpRef cop); void requeue_cancel_copy_ops(bool requeue=true); @@ -855,7 +859,10 @@ public: private: bool temp_created; coll_t temp_coll; + set<hobject_t> temp_contents; ///< contents of temp collection, clear on reset + uint64_t temp_seq; ///< last id for naming temp objects coll_t get_temp_coll(ObjectStore::Transaction *t); + hobject_t generate_temp_object(); ///< generate a new temp object name public: bool have_temp_coll(); coll_t get_temp_coll() { @@ -932,6 +939,9 @@ public: bool is_degraded_object(const hobject_t& oid); void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op); + void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op); + void kick_object_context_blocked(ObjectContextRef obc); + void mark_all_unfound_lost(int what); eversion_t pick_newest_available(const hobject_t& oid); ObjectContextRef mark_object_lost(ObjectStore::Transaction *t, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index e94fd02a5ad..aa20dc592fa 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -393,6 +393,8 @@ ostream& operator<<(ostream& out, const pg_t &pg) // -- coll_t -- +const coll_t coll_t::META_COLL("meta"); + bool coll_t::is_temp(pg_t& pgid) const { const char *cstr(str.c_str()); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 901d9dbb488..091b2b95e8f 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2197,6 +2197,11 @@ public: if (destructor_callback) destructor_callback->complete(0); } + + bool is_blocked() const { + return copyfrom_readside > 0; + } + // do simple synchronous mutual exclusion, for now. now waitqueues or anything fancy. void ondisk_write_lock() { lock.Lock(); diff --git a/src/rgw/rgw_cache.cc b/src/rgw/rgw_cache.cc index 5b96eb45b08..d0afdcd389c 100644 --- a/src/rgw/rgw_cache.cc +++ b/src/rgw/rgw_cache.cc @@ -107,7 +107,7 @@ void ObjectCache::remove(string& name) void ObjectCache::touch_lru(string& name, std::list<string>::iterator& lru_iter) { - while (lru.size() > (size_t)cct->_conf->rgw_cache_lru_size) { + while (lru_size > (size_t)cct->_conf->rgw_cache_lru_size) { list<string>::iterator iter = lru.begin(); if ((*iter).compare(name) == 0) { /* @@ -121,10 +121,12 @@ void ObjectCache::touch_lru(string& name, std::list<string>::iterator& lru_iter) if (map_iter != cache_map.end()) cache_map.erase(map_iter); lru.pop_front(); + lru_size--; } if (lru_iter == lru.end()) { lru.push_back(name); + lru_size++; lru_iter--; ldout(cct, 10) << "adding " << name << " to cache LRU end" << dendl; } else { @@ -142,6 +144,7 @@ void ObjectCache::remove_lru(string& name, std::list<string>::iterator& lru_iter return; lru.erase(lru_iter); + lru_size--; lru_iter = lru.end(); } diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 601fcdfc963..68720d0e6ac 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -131,13 +131,14 @@ struct ObjectCacheEntry { class ObjectCache { std::map<string, ObjectCacheEntry> cache_map; std::list<string> lru; + unsigned long lru_size; Mutex lock; CephContext *cct; void touch_lru(string& name, std::list<string>::iterator& lru_iter); void remove_lru(string& name, std::list<string>::iterator& lru_iter); public: - ObjectCache() : lock("ObjectCache"), cct(NULL) { } + ObjectCache() : lru_size(0), lock("ObjectCache"), cct(NULL) { } int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask); void put(std::string& name, ObjectCacheInfo& bl); void remove(std::string& name); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0c2119ecf9d..bada7d22d1b 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -761,6 +761,11 @@ int RGWPutObjProcessor_Atomic::complete_writing_data() } } complete_parts(); + + int r = drain_pending(); + if (r < 0) + return r; + return 0; } @@ -2611,6 +2616,7 @@ int RGWRados::copy_obj(void *ctx, { /* opening scope so that we can do goto, sorry */ bufferlist& extra_data_bl = processor.get_extra_data(); if (extra_data_bl.length()) { + extra_data_bl.push_back((char)0); JSONParser jp; if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index ef98ec1f9fb..a55f1c1f94c 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -238,11 +238,11 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor struct put_obj_aio_info pop_pending(); int wait_pending_front(); bool pending_has_completed(); - int drain_pending(); protected: uint64_t obj_len; + int drain_pending(); int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle); public: diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc index 483d256377b..f80ebf88525 100644 --- a/src/rgw/rgw_replica_log.cc +++ b/src/rgw/rgw_replica_log.cc @@ -34,6 +34,15 @@ RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool) { int r = store->rados->ioctx_create(pool.c_str(), ctx); + if (r == -ENOENT) { + rgw_bucket p(pool.c_str()); + r = store->create_pool(p); + if (r < 0) + return r; + + // retry + r = store->rados->ioctx_create(pool.c_str(), ctx); + } if (r < 0) { lderr(cct) << "ERROR: could not open rados pool " << pool << dendl; } diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 196bd29e99b..4aa1d401211 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -709,7 +709,7 @@ int RGWPutObj_ObjStore::get_data(bufferlist& bl) int r = s->cio->read(bp.c_str(), cl, &read_len); len = read_len; if (r < 0) - return ret; + return r; bl.append(bp, 0, len); } diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 80ec69425ca..9ce4a246673 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -384,6 +384,16 @@ unittest_bufferlist_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) unittest_bufferlist_CXXFLAGS = $(UNITTEST_CXXFLAGS) check_PROGRAMS += unittest_bufferlist +unittest_crc32c_SOURCES = test/common/test_crc32c.cc +unittest_crc32c_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) +unittest_crc32c_CXXFLAGS = $(UNITTEST_CXXFLAGS) +check_PROGRAMS += unittest_crc32c + +unittest_arch_SOURCES = test/test_arch.c +unittest_arch_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) +unittest_arch_CXXFLAGS = $(UNITTEST_CXXFLAGS) +check_PROGRAMS += unittest_arch + unittest_crypto_SOURCES = test/crypto.cc unittest_crypto_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) unittest_crypto_CXXFLAGS = $(UNITTEST_CXXFLAGS) diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc index 80c775052ec..92104960127 100644 --- a/src/test/filestore/store_test.cc +++ b/src/test/filestore/store_test.cc @@ -898,6 +898,65 @@ TEST_F(StoreTest, TwoHash) { ASSERT_EQ(r, 0); } +TEST_F(StoreTest, MoveRename) { + coll_t temp_cid("mytemp"); + hobject_t temp_oid("tmp_oid", "", CEPH_NOSNAP, 0, 0, ""); + coll_t cid("dest"); + hobject_t oid("dest_oid", "", CEPH_NOSNAP, 0, 0, ""); + int r; + { + ObjectStore::Transaction t; + t.create_collection(cid); + t.touch(cid, oid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(cid, oid)); + bufferlist data, attr; + map<string, bufferlist> omap; + data.append("data payload"); + attr.append("attr value"); + omap["omap_key"].append("omap value"); + { + ObjectStore::Transaction t; + t.create_collection(temp_cid); + t.touch(temp_cid, temp_oid); + t.write(temp_cid, temp_oid, 0, data.length(), data); + t.setattr(temp_cid, temp_oid, "attr", attr); + t.omap_setkeys(temp_cid, temp_oid, omap); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(temp_cid, temp_oid)); + { + ObjectStore::Transaction t; + t.remove(cid, oid); + t.collection_move_rename(temp_cid, temp_oid, cid, oid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(cid, oid)); + ASSERT_FALSE(store->exists(temp_cid, temp_oid)); + { + bufferlist newdata; + r = store->read(cid, oid, 0, 1000, newdata); + ASSERT_GE(r, 0); + ASSERT_TRUE(newdata.contents_equal(data)); + bufferlist newattr; + r = store->getattr(cid, oid, "attr", newattr); + ASSERT_GE(r, 0); + ASSERT_TRUE(newattr.contents_equal(attr)); + set<string> keys; + keys.insert("omap_key"); + map<string, bufferlist> newomap; + r = store->omap_get_values(cid, oid, keys, &newomap); + ASSERT_GE(r, 0); + ASSERT_EQ(1u, newomap.size()); + ASSERT_TRUE(newomap.count("omap_key")); + ASSERT_TRUE(newomap["omap_key"].contents_equal(omap["omap_key"])); + } +} + // // support tests for qa/workunits/filestore/filestore.sh // diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc index af17847aeab..803c8b1cc77 100644 --- a/src/test/librados/misc.cc +++ b/src/test/librados/misc.cc @@ -581,18 +581,33 @@ TEST(LibRadosMisc, CopyPP) { ASSERT_EQ(0, ioctx.write_full("foo", blc)); ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc)); - ObjectWriteOperation op; - op.copy_from("foo", ioctx, ioctx.get_last_version()); - ASSERT_EQ(0, ioctx.operate("foo.copy", &op)); + { + ObjectWriteOperation op; + op.copy_from("foo", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("foo.copy", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + } - bufferlist bl2, x2; - ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0)); - ASSERT_TRUE(bl.contents_equal(bl2)); - ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); - ASSERT_TRUE(x.contents_equal(x2)); + // small object without a version + { + ObjectWriteOperation op; + op.copy_from("foo", ioctx, 0); + ASSERT_EQ(0, ioctx.operate("foo.copy2", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy2", bl2, 10000, 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy2", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + } // do a big object - bl.append(buffer::create(8000000)); + bl.append(buffer::create(g_conf->osd_copyfrom_max_chunk * 3)); bl.zero(); bl.append("tail"); blc = bl; @@ -600,15 +615,29 @@ TEST(LibRadosMisc, CopyPP) { ASSERT_EQ(0, ioctx.write_full("big", blc)); ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc)); - ObjectWriteOperation op2; - op.copy_from("big", ioctx, ioctx.get_last_version()); - ASSERT_EQ(0, ioctx.operate("big.copy", &op)); + { + ObjectWriteOperation op; + op.copy_from("big", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("big.copy", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + } - bl2.clear(); - ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0)); - ASSERT_TRUE(bl.contents_equal(bl2)); - ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); - ASSERT_TRUE(x.contents_equal(x2)); + { + ObjectWriteOperation op; + op.copy_from("big", ioctx, 0); + ASSERT_EQ(0, ioctx.operate("big.copy2", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("big.copy2", bl2, bl.length(), 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy2", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + } ioctx.close(); ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h index 4f1aa4c3582..3a73ac33faf 100644 --- a/src/test/osd/RadosModel.h +++ b/src/test/osd/RadosModel.h @@ -83,14 +83,16 @@ public: class TestOp { public: + int num; RadosTestContext *context; TestOpStat *stat; bool done; - TestOp(RadosTestContext *context, - TestOpStat *stat = 0) : - context(context), - stat(stat), - done(0) + TestOp(int n, RadosTestContext *context, + TestOpStat *stat = 0) + : num(n), + context(context), + stat(stat), + done(0) {} virtual ~TestOp() {}; @@ -230,6 +232,7 @@ public: for (list<TestOp*>::iterator i = inflight.begin(); i != inflight.end();) { if ((*i)->finished()) { + cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl; delete *i; inflight.erase(i++); } else { @@ -238,7 +241,7 @@ public: } if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) { - cout << "Waiting on " << inflight.size() << std::endl; + cout << " waiting on " << inflight.size() << std::endl; wait(); } else { break; @@ -488,11 +491,11 @@ public: librados::ObjectWriteOperation op; librados::AioCompletion *comp; bool done; - RemoveAttrsOp(RadosTestContext *context, + RemoveAttrsOp(int n, RadosTestContext *context, const string &oid, - TestOpStat *stat) : - TestOp(context, stat), oid(oid), comp(NULL), done(false) - {} + TestOpStat *stat) + : TestOp(n, context, stat), oid(oid), comp(NULL), done(false) + {} void _begin() { @@ -577,11 +580,12 @@ public: librados::ObjectWriteOperation op; librados::AioCompletion *comp; bool done; - TmapPutOp(RadosTestContext *context, - const string &oid, - TestOpStat *stat) : - TestOp(context, stat), oid(oid), comp(NULL), done(false) - {} + TmapPutOp(int n, + RadosTestContext *context, + const string &oid, + TestOpStat *stat) + : TestOp(n, context, stat), oid(oid), comp(NULL), done(false) + {} void _begin() { @@ -647,7 +651,7 @@ public: assert(0); } done = true; - context->update_object_version(oid, comp->get_version()); + context->update_object_version(oid, comp->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); @@ -670,11 +674,13 @@ public: librados::ObjectWriteOperation op; librados::AioCompletion *comp; bool done; - SetAttrsOp(RadosTestContext *context, - const string &oid, - TestOpStat *stat) : - TestOp(context, stat), oid(oid), comp(NULL), done(false) - {} + SetAttrsOp(int n, + RadosTestContext *context, + const string &oid, + TestOpStat *stat) + : TestOp(n, context, stat), + oid(oid), comp(NULL), done(false) + {} void _begin() { @@ -738,7 +744,7 @@ public: assert(0); } done = true; - context->update_object_version(oid, comp->get_version()); + context->update_object_version(oid, comp->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); @@ -763,11 +769,12 @@ public: uint64_t waiting_on; uint64_t last_acked_tid; - WriteOp(RadosTestContext *context, + WriteOp(int n, + RadosTestContext *context, const string &oid, - TestOpStat *stat = 0) : - TestOp(context, stat), - oid(oid), waiting_on(0), last_acked_tid(0) + TestOpStat *stat = 0) + : TestOp(n, context, stat), + oid(oid), waiting_on(0), last_acked_tid(0) {} void _begin() @@ -796,16 +803,17 @@ public: } interval_set<uint64_t> ranges; context->cont_gen.get_ranges(cont, ranges); + std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl; context->state_lock.Unlock(); int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset); if (r) { - cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl; + cerr << " r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl; assert(0); } waiting_on = ranges.num_intervals(); - cout << "waiting_on = " << waiting_on << std::endl; + //cout << " waiting_on = " << waiting_on << std::endl; ContentsGenerator::iterator gen_pos = context->cont_gen.get_iterator(cont); uint64_t tid = 1; for (interval_set<uint64_t>::iterator i = ranges.begin(); @@ -818,9 +826,8 @@ public: } assert(to_write.length() == i.get_len()); assert(to_write.length() > 0); - std::cout << "Writing " << context->prefix+oid << " from " << i.get_start() - << " to " << i.get_len() + i.get_start() << " tid " << tid - << " ranges are " << ranges << std::endl; + std::cout << num << ": writing " << context->prefix+oid << " from " << i.get_start() + << " to " << i.get_len() + i.get_start() << " tid " << tid << std::endl; pair<TestOp*, TestOp::CallbackInfo*> *cb_arg = new pair<TestOp*, TestOp::CallbackInfo*>(this, new TestOp::CallbackInfo(tid)); @@ -838,7 +845,7 @@ public: context->state_lock.Lock(); uint64_t tid = info->id; - cout << "finishing write tid " << tid << " to " << context->prefix + oid << std::endl; + cout << num << ": finishing write tid " << tid << " to " << context->prefix + oid << std::endl; if (tid <= last_acked_tid) { cerr << "Error: finished tid " << tid @@ -889,10 +896,11 @@ class DeleteOp : public TestOp { public: string oid; - DeleteOp(RadosTestContext *context, + DeleteOp(int n, + RadosTestContext *context, const string &oid, - TestOpStat *stat = 0) : - TestOp(context, stat), oid(oid) + TestOpStat *stat = 0) + : TestOp(n, context, stat), oid(oid) {} void _begin() @@ -970,16 +978,17 @@ public: bufferlist header; map<string, bufferlist> xattrs; - ReadOp(RadosTestContext *context, + ReadOp(int n, + RadosTestContext *context, const string &oid, - TestOpStat *stat = 0) : - TestOp(context, stat), - completion(NULL), - oid(oid), - old_value(&context->cont_gen), - snap(0), - retval(0), - attrretval(0) + TestOpStat *stat = 0) + : TestOp(n, context, stat), + completion(NULL), + oid(oid), + old_value(&context->cont_gen), + snap(0), + retval(0), + attrretval(0) {} void _begin() @@ -1002,9 +1011,9 @@ public: if (ctx) { assert(old_value.exists); TestAlarm alarm; - std::cerr << "about to start" << std::endl; + std::cerr << num << ": about to start" << std::endl; ctx->start(); - std::cerr << "started" << std::endl; + std::cerr << num << ": started" << std::endl; bufferlist bl; context->io_ctx.set_notify_timeout(600); int r = context->io_ctx.notify(context->prefix+oid, 0, bl); @@ -1012,7 +1021,7 @@ public: std::cerr << "r is " << r << std::endl; assert(0); } - std::cerr << "notified, waiting" << std::endl; + std::cerr << num << ": notified, waiting" << std::endl; ctx->wait(); } if (snap >= 0) { @@ -1059,7 +1068,7 @@ public: uint64_t version = completion->get_version64(); if (int err = completion->get_return_value()) { if (!(err == -ENOENT && old_value.deleted())) { - cerr << "Error: oid " << oid << " read returned error code " + cerr << num << ": Error: oid " << oid << " read returned error code " << err << std::endl; } } else { @@ -1068,16 +1077,16 @@ public: ContDesc to_check; bufferlist::iterator p = result.begin(); if (!context->cont_gen.read_header(p, to_check)) { - cerr << "Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl; + cerr << num << ": Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl; context->errors++; } if (to_check != old_value.most_recent()) { - cerr << "Found incorrect object contents " << to_check + cerr << num << ": Found incorrect object contents " << to_check << ", expected " << old_value.most_recent() << " oid " << oid << std::endl; context->errors++; } if (!old_value.check(result)) { - cerr << "Object " << oid << " contents " << to_check << " corrupt" << std::endl; + cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl; context->errors++; } if (context->errors) assert(0); @@ -1085,28 +1094,28 @@ public: // Attributes if (!(old_value.header == header)) { - cerr << "oid: " << oid << " header does not match, old size: " + cerr << num << ": oid " << oid << " header does not match, old size: " << old_value.header.length() << " new size " << header.length() << std::endl; assert(old_value.header == header); } if (omap.size() != old_value.attrs.size()) { - cerr << "oid: " << oid << " tmap.size() is " << omap.size() + cerr << num << ": oid " << oid << " tmap.size() is " << omap.size() << " and old is " << old_value.attrs.size() << std::endl; assert(omap.size() == old_value.attrs.size()); } if (omap_keys.size() != old_value.attrs.size()) { - cerr << "oid: " << oid << " tmap.size() is " << omap_keys.size() + cerr << num << ": oid " << oid << " tmap.size() is " << omap_keys.size() << " and old is " << old_value.attrs.size() << std::endl; assert(omap_keys.size() == old_value.attrs.size()); } if (xattrs.size() != old_value.attrs.size()) { - cerr << "oid: " << oid << " xattrs.size() is " << xattrs.size() + cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size() << " and old is " << old_value.attrs.size() << std::endl; assert(xattrs.size() == old_value.attrs.size()); } if (version != old_value.version) { - cerr << "oid: " << oid << " version is " << version + cerr << num << ": oid " << oid << " version is " << version << " and expected " << old_value.version << std::endl; assert(version == old_value.version); } @@ -1163,9 +1172,10 @@ public: class SnapCreateOp : public TestOp { public: - SnapCreateOp(RadosTestContext *context, - TestOpStat *stat = 0) : - TestOp(context, stat) + SnapCreateOp(int n, + RadosTestContext *context, + TestOpStat *stat = 0) + : TestOp(n, context, stat) {} void _begin() @@ -1201,11 +1211,11 @@ public: class SnapRemoveOp : public TestOp { public: int to_remove; - SnapRemoveOp(RadosTestContext *context, + SnapRemoveOp(int n, RadosTestContext *context, int snap, - TestOpStat *stat = 0) : - TestOp(context, stat), - to_remove(snap) + TestOpStat *stat = 0) + : TestOp(n, context, stat), + to_remove(snap) {} void _begin() @@ -1241,11 +1251,12 @@ public: class WatchOp : public TestOp { string oid; public: - WatchOp(RadosTestContext *context, - const string &_oid, - TestOpStat *stat = 0) : - TestOp(context, stat), - oid(_oid) + WatchOp(int n, + RadosTestContext *context, + const string &_oid, + TestOpStat *stat = 0) + : TestOp(n, context, stat), + oid(_oid) {} void _begin() @@ -1318,13 +1329,14 @@ public: librados::ObjectWriteOperation op; librados::AioCompletion *comp; - RollbackOp(RadosTestContext *context, + RollbackOp(int n, + RadosTestContext *context, const string &_oid, int snap, - TestOpStat *stat = 0) : - TestOp(context, stat), - oid(_oid), - roll_back_to(snap), done(false) + TestOpStat *stat = 0) + : TestOp(n, context, stat), + oid(_oid), + roll_back_to(snap), done(false) {} void _begin() @@ -1369,7 +1381,7 @@ public: assert(0); } done = true; - context->update_object_version(oid, comp->get_version()); + context->update_object_version(oid, comp->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); @@ -1392,16 +1404,20 @@ public: ObjectDesc src_value; librados::ObjectWriteOperation op; librados::AioCompletion *comp; + librados::AioCompletion *comp_racing_read; int snap; - bool done; - tid_t tid; - CopyFromOp(RadosTestContext *context, + int done; + uint64_t version; + int r; + CopyFromOp(int n, + RadosTestContext *context, const string &oid, const string &oid_src, TestOpStat *stat) - : TestOp(context, stat), oid(oid), oid_src(oid_src), + : TestOp(n, context, stat), + oid(oid), oid_src(oid_src), src_value(&context->cont_gen), - comp(NULL), done(false), tid(0) + comp(NULL), done(0), version(0), r(0) {} void _begin() @@ -1433,35 +1449,67 @@ public: new TestOp::CallbackInfo(0)); comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback, NULL); - tid = context->io_ctx.aio_operate(context->prefix+oid, comp, &op); + context->io_ctx.aio_operate(context->prefix+oid, comp, &op); + + // queue up a racing read, too. + pair<TestOp*, TestOp::CallbackInfo*> *read_cb_arg = + new pair<TestOp*, TestOp::CallbackInfo*>(this, + new TestOp::CallbackInfo(1)); + comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, &write_callback, + NULL); + context->io_ctx.aio_stat(context->prefix+oid, comp_racing_read, NULL, NULL); } void _finish(CallbackInfo *info) { Mutex::Locker l(context->state_lock); - done = true; - int r; - assert(comp->is_complete()); - cout << "finishing copy_from tid " << tid << " to " << context->prefix + oid << std::endl; - if ((r = comp->get_return_value())) { - if (!(r == -ENOENT && src_value.deleted())) { - cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " - << r << std::endl; + + // note that the read can (and atm will) come back before the + // write reply, but will reflect the update and the versions will + // match. + + if (info->id == 0) { + // copy_from + assert(comp->is_complete()); + cout << num << ": finishing copy_from to " << context->prefix + oid << std::endl; + if ((r = comp->get_return_value())) { + if (!(r == -ENOENT && src_value.deleted())) { + cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " + << r << std::endl; + } + } else { + assert(!version || comp->get_version64() == version); + version = comp->get_version64(); + context->update_object_full(oid, src_value); + context->update_object_version(oid, comp->get_version64()); } - } else { - context->update_object_full(oid, src_value); - context->update_object_version(oid, comp->get_version()); + context->oid_in_use.erase(oid_src); + context->oid_not_in_use.insert(oid_src); + context->kick(); + } else if (info->id == 1) { + // racing read + assert(comp_racing_read->is_complete()); + cout << num << ": finishing copy_from racing read to " << context->prefix + oid << std::endl; + if ((r = comp_racing_read->get_return_value())) { + if (!(r == -ENOENT && src_value.deleted())) { + cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " + << r << std::endl; + } + } else { + assert(comp_racing_read->get_return_value() == 0); + assert(!version || comp_racing_read->get_version64() == version); + version = comp_racing_read->get_version64(); + } + context->oid_in_use.erase(oid); + context->oid_not_in_use.insert(oid); + context->kick(); } - context->oid_in_use.erase(oid); - context->oid_not_in_use.insert(oid); - context->oid_in_use.erase(oid_src); - context->oid_not_in_use.insert(oid_src); - context->kick(); + ++done; } bool finished() { - return done; + return done == 2; } string getType() diff --git a/src/test/osd/TestRados.cc b/src/test/osd/TestRados.cc index 1deee23aa2c..1b6bd073a12 100644 --- a/src/test/osd/TestRados.cc +++ b/src/test/osd/TestRados.cc @@ -48,8 +48,8 @@ public: if (m_op <= m_objects) { stringstream oid; oid << m_op; - cout << m_op << ": Writing initial " << oid.str() << std::endl; - return new WriteOp(&context, oid.str()); + cout << m_op << ": write initial oid " << oid.str() << std::endl; + return new WriteOp(m_op, &context, oid.str()); } else if (m_op >= m_ops) { return NULL; } @@ -71,7 +71,6 @@ public: it != m_weight_sums.end(); ++it) { if (rand_val < it->second) { - cout << m_op << ": "; retval = gen_op(context, it->first); break; } @@ -85,37 +84,39 @@ private: TestOp *gen_op(RadosTestContext &context, TestOpType type) { string oid, oid2; - cout << "oids not in use " << context.oid_not_in_use.size() << std::endl; + //cout << "oids not in use " << context.oid_not_in_use.size() << std::endl; assert(context.oid_not_in_use.size()); + + cout << m_op << ": "; switch (type) { case TEST_OP_READ: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Reading " << oid << std::endl; - return new ReadOp(&context, oid, m_stats); + cout << "read oid " << oid << std::endl; + return new ReadOp(m_op, &context, oid, m_stats); case TEST_OP_WRITE: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Writing " << oid << " current snap is " + cout << "write oid " << oid << " current snap is " << context.current_snap << std::endl; - return new WriteOp(&context, oid, m_stats); + return new WriteOp(m_op, &context, oid, m_stats); case TEST_OP_DELETE: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Deleting " << oid << " current snap is " + cout << "delete oid " << oid << " current snap is " << context.current_snap << std::endl; - return new DeleteOp(&context, oid, m_stats); + return new DeleteOp(m_op, &context, oid, m_stats); case TEST_OP_SNAP_CREATE: - cout << "Snapping" << std::endl; - return new SnapCreateOp(&context, m_stats); + cout << "snap_create" << std::endl; + return new SnapCreateOp(m_op, &context, m_stats); case TEST_OP_SNAP_REMOVE: if (context.snaps.empty()) { return NULL; } else { int snap = rand_choose(context.snaps)->first; - cout << "RemovingSnap " << snap << std::endl; - return new SnapRemoveOp(&context, snap, m_stats); + cout << "snap_remove snap " << snap << std::endl; + return new SnapRemoveOp(m_op, &context, snap, m_stats); } case TEST_OP_ROLLBACK: @@ -124,40 +125,40 @@ private: } else { int snap = rand_choose(context.snaps)->first; string oid = *(rand_choose(context.oid_not_in_use)); - cout << "RollingBack " << oid << " to " << snap << std::endl; - return new RollbackOp(&context, oid, snap); + cout << "rollback oid " << oid << " to " << snap << std::endl; + return new RollbackOp(m_op, &context, oid, snap); } case TEST_OP_SETATTR: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Setting attrs on " << oid + cout << "setattr oid " << oid << " current snap is " << context.current_snap << std::endl; - return new SetAttrsOp(&context, oid, m_stats); + return new SetAttrsOp(m_op, &context, oid, m_stats); case TEST_OP_RMATTR: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Removing attrs on " << oid + cout << "rmattr oid " << oid << " current snap is " << context.current_snap << std::endl; - return new RemoveAttrsOp(&context, oid, m_stats); + return new RemoveAttrsOp(m_op, &context, oid, m_stats); case TEST_OP_TMAPPUT: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Setting tmap on " << oid + cout << "tmapput oid " << oid << " current snap is " << context.current_snap << std::endl; - return new TmapPutOp(&context, oid, m_stats); + return new TmapPutOp(m_op, &context, oid, m_stats); case TEST_OP_WATCH: oid = *(rand_choose(context.oid_not_in_use)); - cout << "Watching " << oid + cout << "watch oid " << oid << " current snap is " << context.current_snap << std::endl; - return new WatchOp(&context, oid, m_stats); + return new WatchOp(m_op, &context, oid, m_stats); case TEST_OP_COPY_FROM: oid = *(rand_choose(context.oid_not_in_use)); oid2 = *(rand_choose(context.oid_not_in_use)); - cout << "copy_from " << oid << " from " << oid2 + cout << "copy_from oid " << oid << " from oid " << oid2 << " current snap is " << context.current_snap << std::endl; - return new CopyFromOp(&context, oid, oid2, m_stats); + return new CopyFromOp(m_op, &context, oid, oid2, m_stats); default: cerr << "Invalid op type " << type << std::endl; |