diff options
author | Gary Lowell <gary.lowell@inktank.com> | 2013-07-22 23:21:33 -0700 |
---|---|---|
committer | Gary Lowell <gary.lowell@inktank.com> | 2013-07-22 23:21:33 -0700 |
commit | 98816e696bdf219c127d4e43755f70e00abdc34f (patch) | |
tree | 8e049b9cde0af82dd4b4a6eb41fedb6c623ea2e9 | |
parent | 835dd973012e4e3ffc3dd15bc45bea3a3045f30c (diff) | |
parent | 3f31540448e20fd6391773d4f3d46d76060b31c8 (diff) | |
download | ceph-98816e696bdf219c127d4e43755f70e00abdc34f.tar.gz |
Merge branch 'next' of github.com:ceph/ceph into next
32 files changed, 308 insertions, 78 deletions
diff --git a/PendingReleaseNotes b/PendingReleaseNotes index a9880942b5a..7a9adf7293e 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -19,3 +19,9 @@ v0.67 commandline tool. ceph_rest_api.py can be used as a WSGI application for deployment in a more-capable web server. See ceph-rest-api.8 for more. + +* rgw copy object operation may return extra progress info during the + operation. At this point it will only happen when doing cross zone + copy operations. The S3 response will now return extra <Progress> + field under the <CopyResult> container. The Swift response will + now send the progress as a json array. diff --git a/qa/workunits/rest/test.py b/qa/workunits/rest/test.py index 0c99891bad3..272760c8289 100755 --- a/qa/workunits/rest/test.py +++ b/qa/workunits/rest/test.py @@ -141,11 +141,9 @@ if __name__ == '__main__': # XXX no ceph -w equivalent yet expect('mds/cluster_down', 'PUT', 200, '') - # failure if down - expect('mds/cluster_down', 'PUT', 400, '') + expect('mds/cluster_down', 'PUT', 200, '') + expect('mds/cluster_up', 'PUT', 200, '') expect('mds/cluster_up', 'PUT', 200, '') - # failure if up - expect('mds/cluster_up', 'PUT', 400, '') expect('mds/compat/rm_incompat?feature=4', 'PUT', 200, '') expect('mds/compat/rm_incompat?feature=4', 'PUT', 200, '') diff --git a/src/ceph.in b/src/ceph.in index 6ba92c99b18..dbb7fb5a8cd 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -118,6 +118,8 @@ def parse_cmdargs(args=None, target=''): parser.add_argument('--admin-daemon', dest='admin_socket', help='submit admin-socket commands (\"help\" for help') + parser.add_argument('--admin-socket', dest='admin_socket_nope', + help='you probably mean --admin-daemon') parser.add_argument('-s', '--status', action='store_true', help='show cluster status') @@ -489,6 +491,11 @@ def main(): global verbose verbose = parsed_args.verbose + if parsed_args.admin_socket_nope: + print >> sys.stderr, '--admin-socket is used by daemons; '\ + 'you probably mean --admin-daemon/daemon' + return 1 + # pass on --id, --name, --conf name = 'client.admin' if parsed_args.client_id: diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index edb48bd96d8..88b807b1b24 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -274,12 +274,6 @@ int main(int argc, const char **argv) messenger->start(); - // set up signal handlers, now that we've daemonized/forked. - init_async_signal_handler(); - register_async_signal_handler(SIGHUP, sighup_handler); - register_async_signal_handler_oneshot(SIGINT, handle_mds_signal); - register_async_signal_handler_oneshot(SIGTERM, handle_mds_signal); - // start mds mds = new MDS(g_conf->name.get_id().c_str(), messenger, &mc); @@ -291,16 +285,26 @@ int main(int argc, const char **argv) r = mds->init(shadow); else r = mds->init(); + if (r < 0) + goto shutdown; - if (r >= 0) { - messenger->wait(); - } + // set up signal handlers, now that we've daemonized/forked. + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + register_async_signal_handler_oneshot(SIGINT, handle_mds_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_mds_signal); + + if (g_conf->inject_early_sigterm) + kill(getpid(), SIGTERM); + + messenger->wait(); unregister_async_signal_handler(SIGHUP, sighup_handler); unregister_async_signal_handler(SIGINT, handle_mds_signal); unregister_async_signal_handler(SIGTERM, handle_mds_signal); shutdown_async_signal_handler(); + shutdown: // yuck: grab the mds lock, so we can be sure that whoever in *mds // called shutdown finishes what they were doing. mds->mds_lock.Lock(); @@ -313,14 +317,15 @@ int main(int argc, const char **argv) if (mds->is_stopped()) delete mds; + g_ceph_context->put(); + // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. char s[20]; snprintf(s, sizeof(s), "gmon/%d", getpid()); if ((mkdir(s, 0755) == 0) && (chdir(s) == 0)) { - dout(0) << "ceph-mds: gmon.out should be in " << s << dendl; + cerr << "ceph-mds: gmon.out should be in " << s << std::endl; } - generic_dout(0) << "stopped." << dendl; return 0; } diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 6ac22ba20e5..35ed56a7985 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -542,15 +542,18 @@ int main(int argc, const char **argv) if (g_conf->daemonize) prefork.daemonize(); + messenger->start(); + + mon->init(); + // set up signal handlers, now that we've daemonized/forked. init_async_signal_handler(); register_async_signal_handler(SIGHUP, sighup_handler); register_async_signal_handler_oneshot(SIGINT, handle_mon_signal); register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal); - messenger->start(); - - mon->init(); + if (g_conf->inject_early_sigterm) + kill(getpid(), SIGTERM); messenger->wait(); diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index b485133514e..d8590bff817 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -451,12 +451,6 @@ int main(int argc, const char **argv) messenger_hb_back_server->start(); cluster_messenger->start(); - // install signal handlers - init_async_signal_handler(); - register_async_signal_handler(SIGHUP, sighup_handler); - register_async_signal_handler_oneshot(SIGINT, handle_osd_signal); - register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal); - // start osd err = osd->init(); if (err < 0) { @@ -465,6 +459,15 @@ int main(int argc, const char **argv) return 1; } + // install signal handlers + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + register_async_signal_handler_oneshot(SIGINT, handle_osd_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal); + + if (g_conf->inject_early_sigterm) + kill(getpid(), SIGTERM); + client_messenger->wait(); messenger_hbclient->wait(); messenger_hb_front_server->wait(); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index defb71ee514..b43808e211c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -123,6 +123,8 @@ OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds +OPTION(inject_early_sigterm, OPT_BOOL, false) + OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id") OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster OPTION(mon_sync_fs_threshold, OPT_INT, 5) // sync() when writing this many objects; 0 to disable. @@ -207,6 +209,7 @@ OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated bef OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it) OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it) +OPTION(paxos_kill_at, OPT_INT, 0) OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients @@ -675,6 +678,8 @@ OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log OPTION(rgw_num_zone_opstate_shards, OPT_INT, 128) // max shards for keeping inter-region copy progress info OPTION(rgw_opstate_ratelimit_sec, OPT_INT, 30) // min time between opstate updates on a single upload (0 for disabling ratelimit) OPTION(rgw_curl_wait_timeout_ms, OPT_INT, 1000) // timeout for certain curl calls +OPTION(rgw_copy_obj_progress, OPT_BOOL, true) // should dump progress during long copy operations? +OPTION(rgw_copy_obj_progress_every_bytes, OPT_INT, 1024 * 1024) // min bytes between copy progress output OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds) OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index a867961ccf3..e70d6fd4dff 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -1628,19 +1628,18 @@ void MDS::suicide() } timer.cancel_all_events(); //timer.join(); + timer.shutdown(); // shut down cache mdcache->shutdown(); if (objecter->initialized) objecter->shutdown_locked(); - - // shut down messenger - messenger->shutdown(); monc->shutdown(); - timer.shutdown(); + // shut down messenger + messenger->shutdown(); } void MDS::respawn() diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 7e484e8db6b..f537c915945 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -605,13 +605,13 @@ void Monitor::shutdown() finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); - timer.shutdown(); + remove_all_sessions(); + // unlock before msgr shutdown... lock.Unlock(); - remove_all_sessions(); messenger->shutdown(); // last thing! ceph_mon.cc will delete mon. } diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index ee2ba3b6fdb..508669deef5 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -103,11 +103,21 @@ void Paxos::collect(version_t oldpn) // look for uncommitted value if (get_store()->exists(get_name(), last_committed+1)) { + version_t v = get_store()->get(get_name(), "pending_v"); + version_t pn = get_store()->get(get_name(), "pending_pn"); + if (v && pn && v == last_committed + 1) { + uncommitted_pn = pn; + } else { + dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn + << " and crossing our fingers" << dendl; + uncommitted_pn = accepted_pn; + } uncommitted_v = last_committed+1; - uncommitted_pn = accepted_pn; + get_store()->get(get_name(), last_committed+1, uncommitted_value); assert(uncommitted_value.length()); dout(10) << "learned uncommitted " << (last_committed+1) + << " pn " << uncommitted_pn << " (" << uncommitted_value.length() << " bytes) from myself" << dendl; } @@ -164,6 +174,8 @@ void Paxos::handle_collect(MMonPaxos *collect) last->last_committed = last_committed; last->first_committed = first_committed; + version_t previous_pn = accepted_pn; + // can we accept this pn? if (collect->pn > accepted_pn) { // ok, accept it @@ -198,13 +210,25 @@ void Paxos::handle_collect(MMonPaxos *collect) // do we have an accepted but uncommitted value? // (it'll be at last_committed+1) bufferlist bl; - if (get_store()->exists(get_name(), last_committed+1)) { + if (collect->last_committed == last_committed && + get_store()->exists(get_name(), last_committed+1)) { get_store()->get(get_name(), last_committed+1, bl); assert(bl.length() > 0); dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 << " (" << bl.length() << " bytes)" << dendl; last->values[last_committed+1] = bl; - last->uncommitted_pn = accepted_pn; + + version_t v = get_store()->get(get_name(), "pending_v"); + version_t pn = get_store()->get(get_name(), "pending_pn"); + if (v && pn && v == last_committed + 1) { + last->uncommitted_pn = pn; + } else { + // previously we didn't record which pn a value was accepted + // under! use the pn value we just had... :( + dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn + << " and crossing our fingers" << dendl; + last->uncommitted_pn = previous_pn; + } } // send reply @@ -370,9 +394,13 @@ void Paxos::handle_last(MMonPaxos *last) return; } + assert(g_conf->paxos_kill_at != 1); + // store any committed values if any are specified in the message store_state(last); + assert(g_conf->paxos_kill_at != 2); + // do they accept your pn? if (last->pn > accepted_pn) { // no, try again. @@ -390,15 +418,23 @@ void Paxos::handle_last(MMonPaxos *last) << num_last << " peons" << dendl; // did this person send back an accepted but uncommitted value? - if (last->uncommitted_pn && - last->uncommitted_pn > uncommitted_pn) { - uncommitted_v = last->last_committed+1; - uncommitted_pn = last->uncommitted_pn; - uncommitted_value = last->values[uncommitted_v]; - dout(10) << "we learned an uncommitted value for " << uncommitted_v - << " pn " << uncommitted_pn - << " " << uncommitted_value.length() << " bytes" - << dendl; + if (last->uncommitted_pn) { + if (last->uncommitted_pn > uncommitted_pn && + last->last_committed >= last_committed && + last->last_committed + 1 >= uncommitted_v) { + uncommitted_v = last->last_committed+1; + uncommitted_pn = last->uncommitted_pn; + uncommitted_value = last->values[uncommitted_v]; + dout(10) << "we learned an uncommitted value for " << uncommitted_v + << " pn " << uncommitted_pn + << " " << uncommitted_value.length() << " bytes" + << dendl; + } else { + dout(10) << "ignoring uncommitted value for " << (last->last_committed+1) + << " pn " << last->uncommitted_pn + << " " << last->values[last->last_committed+1].length() << " bytes" + << dendl; + } } // is that everyone? @@ -502,6 +538,10 @@ void Paxos::begin(bufferlist& v) MonitorDBStore::Transaction t; t.put(get_name(), last_committed+1, new_value); + // note which pn this pending value is for. + t.put(get_name(), "pending_v", last_committed + 1); + t.put(get_name(), "pending_pn", accepted_pn); + dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t.dump(&f); @@ -516,6 +556,8 @@ void Paxos::begin(bufferlist& v) get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 3); + if (mon->get_quorum().size() == 1) { // we're alone, take it easy commit(); @@ -566,6 +608,8 @@ void Paxos::handle_begin(MMonPaxos *begin) assert(begin->pn == accepted_pn); assert(begin->last_committed == last_committed); + assert(g_conf->paxos_kill_at != 4); + // set state. state = STATE_UPDATING; lease_expire = utime_t(); // cancel lease @@ -578,6 +622,10 @@ void Paxos::handle_begin(MMonPaxos *begin) MonitorDBStore::Transaction t; t.put(get_name(), v, begin->values[v]); + // note which pn this pending value is for. + t.put(get_name(), "pending_v", v); + t.put(get_name(), "pending_pn", accepted_pn); + dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); t.dump(&f); @@ -586,6 +634,8 @@ void Paxos::handle_begin(MMonPaxos *begin) get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 5); + // reply MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, ceph_clock_now(g_ceph_context)); @@ -620,6 +670,8 @@ void Paxos::handle_accept(MMonPaxos *accept) accepted.insert(from); dout(10) << " now " << accepted << " have accepted" << dendl; + assert(g_conf->paxos_kill_at != 6); + // new majority? if (accepted.size() == (unsigned)mon->monmap->size()/2+1) { // yay, commit! @@ -643,6 +695,8 @@ void Paxos::handle_accept(MMonPaxos *accept) // yay! extend_lease(); + assert(g_conf->paxos_kill_at != 10); + finish_round(); // wake people up @@ -673,6 +727,8 @@ void Paxos::commit() // leader still got a majority and committed with out us.) lease_expire = utime_t(); // cancel lease + assert(g_conf->paxos_kill_at != 7); + MonitorDBStore::Transaction t; // commit locally @@ -692,6 +748,8 @@ void Paxos::commit() get_store()->apply_transaction(t); + assert(g_conf->paxos_kill_at != 8); + // refresh first_committed; this txn may have trimmed. first_committed = get_store()->get(get_name(), "first_committed"); @@ -713,6 +771,8 @@ void Paxos::commit() mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); } + assert(g_conf->paxos_kill_at != 9); + // get ready for a new round. new_value.clear(); diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index cab27f289a8..69419e64ab9 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -290,8 +290,9 @@ private: */ version_t accepted_pn; /** - * @todo This has something to do with the last_committed version. Not sure - * about what it entails, tbh. + * The last_committed epoch of the leader at the time we accepted the last pn. + * + * This has NO SEMANTIC MEANING, and is there only for the debug output. */ version_t accepted_pn_from; /** @@ -1114,7 +1115,7 @@ public: * @param t The transaction to which we will append the operations * @param bl A bufferlist containing an encoded transaction */ - void decode_append_transaction(MonitorDBStore::Transaction& t, + static void decode_append_transaction(MonitorDBStore::Transaction& t, bufferlist& bl) { MonitorDBStore::Transaction vt; bufferlist::iterator it = bl.begin(); diff --git a/src/os/FDCache.h b/src/os/FDCache.h index cf07f860aa5..f0f40e7bbf4 100644 --- a/src/os/FDCache.h +++ b/src/os/FDCache.h @@ -28,6 +28,7 @@ * FD Cache */ class FDCache : public md_config_obs_t { +public: /** * FD * @@ -47,8 +48,10 @@ class FDCache : public md_config_obs_t { } }; +private: SharedLRU<hobject_t, FD> registry; CephContext *cct; + public: FDCache(CephContext *cct) : cct(cct) { assert(cct); diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 10f2b1f2aad..17105c11d69 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -220,7 +220,8 @@ int FileStore::lfn_open(coll_t cid, r = get_index(cid, index); } Mutex::Locker l(fdcache_lock); - *outfd = fdcache.lookup(oid); + if (!replaying) + *outfd = fdcache.lookup(oid); if (*outfd) { return 0; } @@ -258,7 +259,10 @@ int FileStore::lfn_open(coll_t cid, goto fail; } } - *outfd = fdcache.add(oid, fd); + if (!replaying) + *outfd = fdcache.add(oid, fd); + else + *outfd = FDRef(new FDCache::FD(fd)); return 0; fail: @@ -3060,7 +3064,8 @@ int FileStore::_write(coll_t cid, const hobject_t& oid, r = bl.length(); // flush? - wbthrottle.queue_wb(fd, oid, offset, len, replica); + if (!replaying) + wbthrottle.queue_wb(fd, oid, offset, len, replica); lfn_close(fd); out: diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 464ed770df2..3f226cec95d 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4739,11 +4739,12 @@ bool OSDService::prepare_to_stop() if (state != NOT_STOPPING) return false; - if (get_osdmap()->is_up(whoami)) { + OSDMapRef osdmap = get_osdmap(); + if (osdmap && osdmap->is_up(whoami)) { state = PREPARING_TO_STOP; monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(), - get_osdmap()->get_inst(whoami), - get_osdmap()->get_epoch(), + osdmap->get_inst(whoami), + osdmap->get_epoch(), false )); utime_t now = ceph_clock_now(g_ceph_context); diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc index 6ba08362dad..dac1f33fd91 100644 --- a/src/osd/PGLog.cc +++ b/src/osd/PGLog.cc @@ -375,7 +375,6 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead } assert(p->version > newhead); dout(10) << "rewind_divergent_log future divergent " << *p << dendl; - log.unindex(*p); } log.head = newhead; @@ -383,6 +382,7 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead if (info.last_complete > newhead) info.last_complete = newhead; + log.index(); for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d) merge_old_entry(t, *d, info, remove_snap); @@ -505,7 +505,6 @@ void PGLog::merge_log(ObjectStore::Transaction& t, break; dout(10) << "merge_log divergent " << oe << dendl; divergent.push_front(oe); - log.unindex(oe); log.log.pop_back(); } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 9c8d42dbf3c..298d38d6ace 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -7767,6 +7767,14 @@ void ReplicatedPG::_scrub_finish() #undef dout_prefix #define dout_prefix *_dout << pg->gen_prefix() +ReplicatedPG::SnapTrimmer::~SnapTrimmer() +{ + while (!repops.empty()) { + (*repops.begin())->put(); + repops.erase(repops.begin()); + } +} + void ReplicatedPG::SnapTrimmer::log_enter(const char *state_name) { dout(20) << "enter " << state_name << dendl; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 0d4867f6e6d..9dafe23faa1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -982,6 +982,7 @@ private: bool need_share_pg_info; bool requeue; SnapTrimmer(ReplicatedPG *pg) : pg(pg), need_share_pg_info(false), requeue(false) {} + ~SnapTrimmer(); void log_enter(const char *state_name); void log_exit(const char *state_name, utime_t duration); } snap_trimmer_machine; diff --git a/src/rgw/rgw_auth_s3.cc b/src/rgw/rgw_auth_s3.cc index bdd458e68b6..c93de7cd58a 100644 --- a/src/rgw/rgw_auth_s3.cc +++ b/src/rgw/rgw_auth_s3.cc @@ -190,8 +190,14 @@ bool rgw_create_s3_canonical_header(req_info& info, utime_t *header_time, string map<string, string>& meta_map = info.x_meta_map; map<string, string>& sub_resources = info.args.get_sub_resources(); + string request_uri; + if (info.effective_uri.empty()) + request_uri = info.request_uri; + else + request_uri = info.effective_uri; + rgw_create_s3_canonical_header(info.method, content_md5, content_type, date.c_str(), - meta_map, info.request_uri.c_str(), sub_resources, + meta_map, request_uri.c_str(), sub_resources, dest); return true; diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index aea396bf3de..8a281775d07 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -109,7 +109,12 @@ void req_info::rebuild_from(req_info& src) { method = src.method; script_uri = src.script_uri; - request_uri = src.request_uri; + if (src.effective_uri.empty()) { + request_uri = src.request_uri; + } else { + request_uri = src.effective_uri; + } + effective_uri.clear(); host = src.host; x_meta_map = src.x_meta_map; diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 1d3596d4418..7f224a798f5 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -764,6 +764,7 @@ struct req_info { const char *method; string script_uri; string request_uri; + string effective_uri; string request_params; req_info(CephContext *cct, RGWEnv *_env); @@ -780,7 +781,7 @@ struct req_state { int format; ceph::Formatter *formatter; string decoded_uri; - string effective_uri; + string relative_uri; const char *length; uint64_t content_length; map<string, string> generic_attrs; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 45477486ccc..7760a2f5c52 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1654,6 +1654,25 @@ int RGWCopyObj::init_common() return 0; } +static void copy_obj_progress_cb(off_t ofs, void *param) +{ + RGWCopyObj *op = static_cast<RGWCopyObj *>(param); + op->progress_cb(ofs); +} + +void RGWCopyObj::progress_cb(off_t ofs) +{ + if (!s->cct->_conf->rgw_copy_obj_progress) + return; + + if (ofs - last_ofs < s->cct->_conf->rgw_copy_obj_progress_every_bytes) + return; + + send_partial_response(ofs); + + last_ofs = ofs; +} + void RGWCopyObj::execute() { rgw_obj src_obj, dst_obj; @@ -1685,7 +1704,9 @@ void RGWCopyObj::execute() replace_attrs, attrs, RGW_OBJ_CATEGORY_MAIN, &s->req_id, /* use req_id as tag */ - &s->err); + &s->err, + copy_obj_progress_cb, (void *)this + ); } int RGWGetACLs::verify_permission() diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index e107b90a155..5da2e4f472c 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -438,6 +438,8 @@ protected: string client_id; string op_id; + off_t last_ofs; + int init_common(); @@ -460,6 +462,7 @@ public: ret = 0; mtime = 0; replace_attrs = false; + last_ofs = 0; } virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) { @@ -468,9 +471,11 @@ public: } int verify_permission(); void execute(); + void progress_cb(off_t ofs); virtual int init_dest_policy() { return 0; } virtual int get_params() = 0; + virtual void send_partial_response(off_t ofs) {} virtual void send_response() = 0; virtual const string name() { return "copy_obj"; } virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0c7b22a42d3..8af03b03a8f 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2397,9 +2397,16 @@ class RGWRadosPutObj : public RGWGetDataCB rgw_obj obj; RGWPutObjProcessor_Atomic *processor; RGWOpStateSingleOp *opstate; + void (*progress_cb)(off_t, void *); + void *progress_data; public: - RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops) : processor(p), opstate(_ops) {} + RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops, + void (*_progress_cb)(off_t, void *), void *_progress_data) : processor(p), opstate(_ops), + progress_cb(_progress_cb), + progress_data(_progress_data) {} int handle_data(bufferlist& bl, off_t ofs, off_t len) { + progress_cb(ofs, progress_data); + void *handle; int ret = processor->handle_data(bl, ofs, &handle); if (ret < 0) @@ -2477,7 +2484,9 @@ int RGWRados::copy_obj(void *ctx, map<string, bufferlist>& attrs, RGWObjCategory category, string *ptag, - struct rgw_err *err) + struct rgw_err *err, + void (*progress_cb)(off_t, void *), + void *progress_data) { int ret; uint64_t total_len, obj_size; @@ -2545,7 +2554,7 @@ int RGWRados::copy_obj(void *ctx, ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; return ret; } - RGWRadosPutObj cb(&processor, &opstate); + RGWRadosPutObj cb(&processor, &opstate, progress_cb, progress_data); string etag; map<string, string> req_headers; time_t set_mtime; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index c9924e0dc56..bcc40900299 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1121,7 +1121,9 @@ public: map<std::string, bufferlist>& attrs, RGWObjCategory category, string *ptag, - struct rgw_err *err); + struct rgw_err *err, + void (*progress_cb)(off_t, void *), + void *progress_data); int copy_obj_data(void *ctx, void *handle, off_t end, diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 0f9e61d1740..e4933a67a39 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -1242,7 +1242,7 @@ RGWHandler *RGWREST::get_handler(RGWRados *store, struct req_state *s, RGWClient if (*init_error < 0) return NULL; - RGWRESTMgr *m = mgr.get_resource_mgr(s, s->decoded_uri, &s->effective_uri); + RGWRESTMgr *m = mgr.get_resource_mgr(s, s->decoded_uri, &s->relative_uri); if (!m) { *init_error = -ERR_METHOD_NOT_ALLOWED; return NULL; diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 2075e535525..ea80b5b84f8 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -403,6 +403,7 @@ int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uin new_info.script_uri = "/"; new_info.script_uri.append(resource); new_info.request_uri = new_info.script_uri; + new_info.effective_uri = new_info.effective_uri; map<string, string>& m = new_env.get_map(); map<string, bufferlist>::iterator bliter; @@ -568,6 +569,7 @@ int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, map<string, string>& ex new_info.script_uri = "/"; new_info.script_uri.append(resource); new_info.request_uri = new_info.script_uri; + new_info.effective_uri = new_info.effective_uri; new_info.init_meta_info(NULL); diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 66f6652ec6a..6c1738218e6 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1300,15 +1300,33 @@ int RGWCopyObj_ObjStore_S3::get_params() return 0; } -void RGWCopyObj_ObjStore_S3::send_response() +void RGWCopyObj_ObjStore_S3::send_partial_response(off_t ofs) { - if (ret) + if (!sent_header) { + if (ret) set_req_state_err(s, ret); - dump_errno(s); + dump_errno(s); + + end_header(s, "binary/octet-stream"); + if (ret == 0) { + s->formatter->open_object_section("CopyObjectResult"); + } + sent_header = true; + } else { + /* Send progress field. Note that this diverge from the original S3 + * spec. We do this in order to keep connection alive. + */ + s->formatter->dump_int("Progress", (uint64_t)ofs); + } + rgw_flush_formatter(s, s->formatter); +} + +void RGWCopyObj_ObjStore_S3::send_response() +{ + if (!sent_header) + send_partial_response(0); - end_header(s, "binary/octet-stream"); if (ret == 0) { - s->formatter->open_object_section("CopyObjectResult"); dump_time(s, "LastModified", &mtime); map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ETAG); if (iter != attrs.end()) { @@ -1801,7 +1819,7 @@ int RGWHandler_ObjStore_S3::init_from_header(struct req_state *s, int default_fo string req; string first; - const char *req_name = s->effective_uri.c_str(); + const char *req_name = s->relative_uri.c_str(); const char *p; if (*req_name == '?') { diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index e2a1b0b92eb..a0af4eac9fd 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -143,12 +143,14 @@ public: }; class RGWCopyObj_ObjStore_S3 : public RGWCopyObj_ObjStore { + bool sent_header; public: - RGWCopyObj_ObjStore_S3() {} + RGWCopyObj_ObjStore_S3() : sent_header(false) {} ~RGWCopyObj_ObjStore_S3() {} int init_dest_policy(); int get_params(); + void send_partial_response(off_t ofs); void send_response(); }; diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index 157158e7ed7..b4f830830f9 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -288,6 +288,8 @@ int RGWCreateBucket_ObjStore_SWIFT::get_params() { policy.create_default(s->user.user_id, s->user.display_name); + location_constraint = store->region.api_name; + return 0; } @@ -475,13 +477,40 @@ int RGWCopyObj_ObjStore_SWIFT::get_params() return 0; } +void RGWCopyObj_ObjStore_SWIFT::send_partial_response(off_t ofs) +{ + if (!sent_header) { + if (!ret) + ret = STATUS_CREATED; + set_req_state_err(s, ret); + dump_errno(s); + end_header(s); + + /* Send progress information. Note that this diverge from the original swift + * spec. We do this in order to keep connection alive. + */ + if (ret == 0) { + s->formatter->open_array_section("progress"); + } + sent_header = true; + } else { + s->formatter->dump_int("ofs", (uint64_t)ofs); + } + rgw_flush_formatter(s, s->formatter); +} + void RGWCopyObj_ObjStore_SWIFT::send_response() { - if (!ret) - ret = STATUS_CREATED; - set_req_state_err(s, ret); - dump_errno(s); - end_header(s); + if (!sent_header) { + if (!ret) + ret = STATUS_CREATED; + set_req_state_err(s, ret); + dump_errno(s); + end_header(s); + } else { + s->formatter->close_section(); + rgw_flush_formatter(s, s->formatter); + } } int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) @@ -829,11 +858,16 @@ int RGWHandler_ObjStore_SWIFT::init_from_header(struct req_state *s) s->bucket_name_str = first; s->bucket_name = strdup(s->bucket_name_str.c_str()); + + s->info.effective_uri = "/" + s->bucket_name_str; + if (req.size()) { s->object_str = req; s->object = strdup(s->object_str.c_str()); + s->info.effective_uri.append("/" + s->object_str); } + return 0; } diff --git a/src/rgw/rgw_rest_swift.h b/src/rgw/rgw_rest_swift.h index e4b6f0bccee..1c23ab29204 100644 --- a/src/rgw/rgw_rest_swift.h +++ b/src/rgw/rgw_rest_swift.h @@ -100,13 +100,15 @@ public: }; class RGWCopyObj_ObjStore_SWIFT : public RGWCopyObj_ObjStore { + bool sent_header; public: - RGWCopyObj_ObjStore_SWIFT() {} + RGWCopyObj_ObjStore_SWIFT() : sent_header(false) {} ~RGWCopyObj_ObjStore_SWIFT() {} int init_dest_policy(); int get_params(); void send_response(); + void send_partial_response(off_t ofs); }; class RGWGetACLs_ObjStore_SWIFT : public RGWGetACLs_ObjStore { diff --git a/src/test/osd/TestPGLog.cc b/src/test/osd/TestPGLog.cc index d8ec8d03df2..e0863f726a0 100644 --- a/src/test/osd/TestPGLog.cc +++ b/src/test/osd/TestPGLog.cc @@ -82,6 +82,10 @@ TEST_F(PGLogTest, rewind_divergent_log) { hobject_t divergent_object; eversion_t divergent_version; eversion_t newhead; + + hobject_t divergent; + divergent.hash = 0x9; + { pg_log_entry_t e; @@ -90,16 +94,16 @@ TEST_F(PGLogTest, rewind_divergent_log) { log.tail = e.version; log.log.push_back(e); e.version = newhead = eversion_t(1, 4); - e.soid.hash = 0x9; + e.soid = divergent; e.op = pg_log_entry_t::MODIFY; log.log.push_back(e); - log.index(); e.version = divergent_version = eversion_t(1, 5); - e.soid.hash = 0x9; + e.soid = divergent; divergent_object = e.soid; e.op = pg_log_entry_t::DELETE; log.log.push_back(e); log.head = e.version; + log.index(); info.last_update = log.head; info.last_complete = log.head; @@ -118,6 +122,7 @@ TEST_F(PGLogTest, rewind_divergent_log) { rewind_divergent_log(t, newhead, info, remove_snap, dirty_info, dirty_big_info); + EXPECT_TRUE(log.objects.count(divergent)); EXPECT_TRUE(missing.is_missing(divergent_object)); EXPECT_EQ(1U, log.objects.count(divergent_object)); EXPECT_EQ(2U, log.log.size()); diff --git a/src/tools/ceph-monstore-tool.cc b/src/tools/ceph-monstore-tool.cc index ae608a302f2..f361266aff0 100644 --- a/src/tools/ceph-monstore-tool.cc +++ b/src/tools/ceph-monstore-tool.cc @@ -31,6 +31,7 @@ #include "global/global_init.h" #include "os/LevelDBStore.h" #include "mon/MonitorDBStore.h" +#include "mon/Paxos.h" #include "common/Formatter.h" namespace po = boost::program_options; @@ -246,6 +247,19 @@ int main(int argc, char **argv) { goto done; } bl.write_fd(fd); + } else if (cmd == "dump-paxos") { + for (version_t v = dstart; v <= dstop; ++v) { + bufferlist bl; + st.get("paxos", v, bl); + if (bl.length() == 0) + break; + cout << "\n--- " << v << " ---" << std::endl; + MonitorDBStore::Transaction tx; + Paxos::decode_append_transaction(tx, bl); + JSONFormatter f(true); + tx.dump(&f); + f.flush(cout); + } } else if (cmd == "dump-trace") { if (tfile.empty()) { std::cerr << "Need trace_file" << std::endl; |