diff options
-rwxr-xr-x | qa/workunits/cephtool/test.sh | 8 | ||||
-rw-r--r-- | src/auth/KeyRing.cc | 3 | ||||
-rwxr-xr-x | src/ceph.in | 6 | ||||
-rw-r--r-- | src/client/Client.cc | 3 | ||||
-rw-r--r-- | src/common/cmdparse.cc | 6 | ||||
-rw-r--r-- | src/common/cmdparse.h | 6 | ||||
-rw-r--r-- | src/common/config_opts.h | 11 | ||||
-rw-r--r-- | src/mon/AuthMonitor.cc | 1 | ||||
-rw-r--r-- | src/mon/MonCommands.h | 7 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 7 | ||||
-rw-r--r-- | src/mon/OSDMonitor.cc | 17 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 1 | ||||
-rw-r--r-- | src/osd/OSD.cc | 20 | ||||
-rw-r--r-- | src/osd/OSD.h | 10 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 16 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 12 | ||||
-rw-r--r-- | src/osd/PG.cc | 6 | ||||
-rw-r--r-- | src/osd/PG.h | 14 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 42 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 20 | ||||
-rw-r--r-- | src/osdc/Objecter.cc | 3 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 2 |
22 files changed, 140 insertions, 81 deletions
diff --git a/qa/workunits/cephtool/test.sh b/qa/workunits/cephtool/test.sh index fe790c2ded8..7915e48a6ed 100755 --- a/qa/workunits/cephtool/test.sh +++ b/qa/workunits/cephtool/test.sh @@ -155,6 +155,14 @@ ceph osd scrub 0 ceph osd deep-scrub 0 ceph osd repair 0 +for f in noup nodown noin noout noscrub nodeep-scrub nobackfill norecover +do + ceph osd set $f + ceph osd unset $f +done +expect_false ceph osd set bogus +expect_false ceph osd unset bogus + ceph osd set noup ceph osd down 0 ceph osd dump | grep 'osd.0 down' diff --git a/src/auth/KeyRing.cc b/src/auth/KeyRing.cc index 56655392bae..5a8b2288e6a 100644 --- a/src/auth/KeyRing.cc +++ b/src/auth/KeyRing.cc @@ -157,8 +157,7 @@ void KeyRing::encode_formatted(string label, Formatter *f, bufferlist& bl) f->close_section(); /* auth_entities */ } f->close_section(); /* auth_dump */ - f->flush(os); - bl.append(os.str()); + f->flush(bl); } void KeyRing::decode_plaintext(bufferlist::iterator& bli) diff --git a/src/ceph.in b/src/ceph.in index 102796dcf6b..38150ed07d7 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -321,7 +321,7 @@ def admin_socket(asok_path, cmd, format=''): sigdict = parse_json_funcsigs(cmd_json, 'cli') valid_dict = validate_command(sigdict, cmd) if not valid_dict: - return -errno.EINVAL + raise RuntimeError('invalid command') if format: valid_dict['format'] = format @@ -520,6 +520,7 @@ def main(): print admin_socket(childargs[1], childargs[2:], format) except Exception as e: print >> sys.stderr, 'admin_socket: {0}'.format(e) + return errno.EINVAL return 0 else: # try resolve daemon name @@ -528,10 +529,11 @@ def main(): print admin_socket(path, childargs[2:], format) except Exception as e: print >> sys.stderr, 'admin_socket: {0}'.format(e) + return errno.EINVAL return 0 else: print >> sys.stderr, 'Daemon requires at least 2 arguments' - return 1 + return errno.EINVAL # handle any 'generic' ceph arguments that we didn't parse here global cluster_handle diff --git a/src/client/Client.cc b/src/client/Client.cc index af465cb78bc..4b10cf5c1ba 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -117,8 +117,7 @@ bool Client::CommandHook::call(std::string command, cmdmap_t& cmdmap, else assert(0 == "bad command registered"); m_client->client_lock.Unlock(); - f->flush(ss); - out.append(ss); + f->flush(out); delete f; return true; } diff --git a/src/common/cmdparse.cc b/src/common/cmdparse.cc index 370591cb3bb..16c62349c40 100644 --- a/src/common/cmdparse.cc +++ b/src/common/cmdparse.cc @@ -27,7 +27,7 @@ using namespace std; */ void -dump_cmd_to_json(JSONFormatter *f, const string& cmd) +dump_cmd_to_json(Formatter *f, const string& cmd) { // put whole command signature in an already-opened container // elements are: "name", meaning "the typeless name that means a literal" @@ -77,7 +77,7 @@ dump_cmd_to_json(JSONFormatter *f, const string& cmd) } void -dump_cmd_and_help_to_json(JSONFormatter *jf, +dump_cmd_and_help_to_json(Formatter *jf, const string& secname, const string& cmdsig, const string& helptext) @@ -91,7 +91,7 @@ dump_cmd_and_help_to_json(JSONFormatter *jf, } void -dump_cmddesc_to_json(JSONFormatter *jf, +dump_cmddesc_to_json(Formatter *jf, const string& secname, const string& cmdsig, const string& helptext, diff --git a/src/common/cmdparse.h b/src/common/cmdparse.h index 58c66b46052..10e43ab0abe 100644 --- a/src/common/cmdparse.h +++ b/src/common/cmdparse.h @@ -20,12 +20,12 @@ class CephContext; typedef boost::variant<std::string, bool, int64_t, double, std::vector<std::string> > cmd_vartype; typedef std::map<std::string, cmd_vartype> cmdmap_t; -void dump_cmd_to_json(ceph::JSONFormatter *f, const std::string& cmd); -void dump_cmd_and_help_to_json(ceph::JSONFormatter *f, +void dump_cmd_to_json(ceph::Formatter *f, const std::string& cmd); +void dump_cmd_and_help_to_json(ceph::Formatter *f, const std::string& secname, const std::string& cmd, const std::string& helptext); -void dump_cmddesc_to_json(JSONFormatter *jf, +void dump_cmddesc_to_json(ceph::Formatter *jf, const std::string& secname, const std::string& cmdsig, const std::string& helptext, diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 1c7a917602a..f67d0d1237d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -426,7 +426,8 @@ OPTION(osd_default_data_pool_replay_window, OPT_INT, 45) OPTION(osd_preserve_trimmed_log, OPT_BOOL, false) OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false) OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) -OPTION(osd_recovery_max_active, OPT_INT, 5) +OPTION(osd_recovery_max_active, OPT_INT, 60) +OPTION(osd_recovery_max_single_start, OPT_INT, 10) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message @@ -489,9 +490,13 @@ OPTION(osd_leveldb_log, OPT_STR, "") // enable OSD leveldb log file * osd_client_op_priority/osd_recovery_op_priority determines the ratio of * available io between client and recovery. Each option may be set between * 1..63. + * + * osd_recovery_op_warn_multiple scales the normal warning threshhold, + * osd_op_complaint_time, so that slow recovery ops won't cause noise */ -OPTION(osd_client_op_priority, OPT_INT, 63) -OPTION(osd_recovery_op_priority, OPT_INT, 10) +OPTION(osd_client_op_priority, OPT_U32, 63) +OPTION(osd_recovery_op_priority, OPT_U32, 10) +OPTION(osd_recovery_op_warn_multiple, OPT_U32, 16) // Max time to wait between notifying mon of shutdown and shutting down OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index 629451b5eac..f165b8c9fc7 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -626,7 +626,6 @@ bool AuthMonitor::preprocess_command(MMonCommand *m) } else if (prefix == "auth list") { if (f) { mon->key_server.encode_formatted("auth", f.get(), rdata); - f->flush(rdata); } else { mon->key_server.encode_plaintext(rdata); if (rdata.length() > 0) diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index aa7b8959f06..8e9c2bb333b 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -418,8 +418,7 @@ COMMAND("osd crush rule create-simple " \ "create crush rule <name> in <root> of type <type>", \ "osd", "rw", "cli,rest") COMMAND("osd crush rule rm " \ - "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ - "name=name,type=CephString", \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] ", \ "remove crush rule <name>", "osd", "rw", "cli,rest") COMMAND("osd setmaxosd " \ "name=newmax,type=CephInt,range=0", \ @@ -427,10 +426,10 @@ COMMAND("osd setmaxosd " \ COMMAND("osd pause", "pause osd", "osd", "rw", "cli,rest") COMMAND("osd unpause", "unpause osd", "osd", "rw", "cli,rest") COMMAND("osd set " \ - "name=key,type=CephChoices,strings=pause|noup|nodown|noout|noin|nobackfile|norecover", \ + "name=key,type=CephChoices,strings=pause|noup|nodown|noout|noin|nobackfill|norecover|noscrub|nodeep-scrub", \ "set <key>", "osd", "rw", "cli,rest") COMMAND("osd unset " \ - "name=key,type=CephChoices,strings=pause|noup|nodown|noout|noin|nobackfile|norecover", \ + "name=key,type=CephChoices,strings=pause|noup|nodown|noout|noin|nobackfill|norecover|noscrub|nodeep-scrub", \ "unset <key>", "osd", "rw", "cli,rest") COMMAND("osd cluster_snap", "take cluster snapshot (disabled)", \ "osd", "r", "") diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 118cf6f4a1e..a9d3e48a3be 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -1937,7 +1937,7 @@ void Monitor::handle_command(MMonCommand *m) cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); if (prefix == "get_command_descriptions") { int cmdnum = 0; - JSONFormatter *f = new JSONFormatter(); + Formatter *f = new_formatter("json"); f->open_object_section("command_descriptions"); for (MonCommand *cp = mon_commands; cp < &mon_commands[ARRAY_SIZE(mon_commands)]; cp++) { @@ -1952,9 +1952,8 @@ void Monitor::handle_command(MMonCommand *m) f->close_section(); // command_descriptions bufferlist rdata; - f->flush(ds); + f->flush(rdata); delete f; - rdata.append(ds); reply_command(m, 0, "", rdata, 0); return; } @@ -2016,13 +2015,13 @@ void Monitor::handle_command(MMonCommand *m) } if (prefix == "fsid") { - ds << monmap->fsid; if (f) { f->open_object_section("fsid"); f->dump_stream("fsid") << monmap->fsid; f->close_section(); f->flush(rdata); } else { + ds << monmap->fsid; rdata.append(ds); } reply_command(m, 0, "", rdata, 0); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index c6db052a591..07022aec73b 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -1966,8 +1966,9 @@ bool OSDMonitor::preprocess_command(MMonCommand *m) if (prefix == "osd stat") { osdmap.print_summary(f.get(), ds); if (f) - f->flush(ds); - rdata.append(ds); + f->flush(rdata); + else + rdata.append(ds); } else if (prefix == "osd dump" || prefix == "osd tree" || @@ -2080,9 +2081,7 @@ bool OSDMonitor::preprocess_command(MMonCommand *m) f->dump_string(p->first.c_str(), p->second); f->close_section(); f->close_section(); - f->flush(ds); - ds << "\n"; - rdata.append(ds); + f->flush(rdata); } else if (prefix == "osd map") { string poolstr, objstr, namespacestr; cmd_getval(g_ceph_context, cmdmap, "pool", poolstr); @@ -3072,6 +3071,10 @@ bool OSDMonitor::prepare_command(MMonCommand *m) return prepare_set_flag(m, CEPH_OSDMAP_NOBACKFILL); else if (key == "norecover") return prepare_set_flag(m, CEPH_OSDMAP_NORECOVER); + else if (key == "noscrub") + return prepare_set_flag(m, CEPH_OSDMAP_NOSCRUB); + else if (key == "nodeep-scrub") + return prepare_set_flag(m, CEPH_OSDMAP_NODEEP_SCRUB); } else if (prefix == "osd unset") { string key; @@ -3090,6 +3093,10 @@ bool OSDMonitor::prepare_command(MMonCommand *m) return prepare_unset_flag(m, CEPH_OSDMAP_NOBACKFILL); else if (key == "norecover") return prepare_unset_flag(m, CEPH_OSDMAP_NORECOVER); + else if (key == "noscrub") + return prepare_unset_flag(m, CEPH_OSDMAP_NOSCRUB); + else if (key == "nodeep-scrub") + return prepare_unset_flag(m, CEPH_OSDMAP_NODEEP_SCRUB); } else if (prefix == "osd cluster_snap") { // ** DISABLE THIS FOR NOW ** diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index ab4a5b30447..29c77dbe0ed 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -1358,7 +1358,6 @@ bool PGMonitor::preprocess_command(MMonCommand *m) } else { ds << pg_map; } - rdata.append(ds); r = 0; } else if (prefix == "pg getmap") { pg_map.encode(rdata); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 69c181862cc..1a77dae730a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3935,7 +3935,6 @@ void OSD::do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist f->close_section(); // command_descriptions f->flush(ds); - odata.append(ds); delete f; goto out; } @@ -6633,11 +6632,12 @@ bool OSD::_recover_now() return true; } -void OSD::do_recovery(PG *pg) +void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) { // see how many we should try to start. note that this is a bit racy. recovery_wq.lock(); - int max = g_conf->osd_recovery_max_active - recovery_ops_active; + int max = MAX(g_conf->osd_recovery_max_active - recovery_ops_active, + g_conf->osd_recovery_max_single_start); if (max > 0) { dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops)" << dendl; @@ -6653,7 +6653,7 @@ void OSD::do_recovery(PG *pg) recovery_wq.queue(pg); return; } else { - pg->lock(); + pg->lock_suspend_timeout(handle); if (pg->deleting || !(pg->is_active() && pg->is_primary())) { pg->unlock(); goto out; @@ -6665,7 +6665,7 @@ void OSD::do_recovery(PG *pg) #endif PG::RecoveryCtx rctx = create_context(); - int started = pg->start_recovery_ops(max, &rctx); + int started = pg->start_recovery_ops(max, &rctx, handle); dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl; /* @@ -7053,7 +7053,7 @@ void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle) if (!(pg_for_processing[&*pg].size())) pg_for_processing.erase(&*pg); } - osd->dequeue_op(pg, op); + osd->dequeue_op(pg, op, handle); pg->unlock(); } @@ -7066,7 +7066,9 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) /* * NOTE: dequeue called in worker thread, with pg lock */ -void OSD::dequeue_op(PGRef pg, OpRequestRef op) +void OSD::dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle) { utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp(); dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() @@ -7079,7 +7081,7 @@ void OSD::dequeue_op(PGRef pg, OpRequestRef op) op->mark_reached_pg(); - pg->do_request(op); + pg->do_request(op, handle); // finish dout(10) << "dequeue_op " << op << " finish" << dendl; @@ -7131,7 +7133,7 @@ void OSD::process_peering_events( ++i) { set<boost::intrusive_ptr<PG> > split_pgs; PG *pg = *i; - pg->lock(); + pg->lock_suspend_timeout(handle); curmap = service.get_osdmap(); if (pg->deleting) { pg->unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5196a1dc1f3..82a251d9a80 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -915,7 +915,9 @@ private: } op_wq; void enqueue_op(PG *pg, OpRequestRef op); - void dequeue_op(PGRef pg, OpRequestRef op); + void dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle); // -- peering queue -- struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> { @@ -1371,8 +1373,8 @@ protected: osd->recovery_queue.push_front(&pg->recovery_item); } } - void _process(PG *pg) { - osd->do_recovery(pg); + void _process(PG *pg, ThreadPool::TPHandle &handle) { + osd->do_recovery(pg, handle); pg->put("RecoveryWQ"); } void _clear() { @@ -1386,7 +1388,7 @@ protected: void start_recovery_op(PG *pg, const hobject_t& soid); void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); - void do_recovery(PG *pg); + void do_recovery(PG *pg, ThreadPool::TPHandle &handle); bool _recover_now(); // replay / delayed pg activation diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index a6cdc9ecffb..c694362a8a5 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -20,6 +20,22 @@ static ostream& _prefix(std::ostream* _dout) return *_dout << "--OSD::tracker-- "; } +OpRequest::OpRequest(Message *req, OpTracker *tracker) : + request(req), xitem(this), + rmw_flags(0), + warn_interval_multiplier(1), + lock("OpRequest::lock"), + tracker(tracker), + hit_flag_points(0), latest_flag_point(0), + seq(0) { + received_time = request->get_recv_stamp(); + tracker->register_inflight_op(&xitem); + if (req->get_priority() < g_conf->osd_client_op_priority) { + // don't warn as quickly for low priority ops + warn_interval_multiplier = g_conf->osd_recovery_op_warn_multiple; + } +} + void OpHistory::on_shutdown() { arrived.clear(); diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index a2014472432..e72f03d1d77 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -156,17 +156,7 @@ private: static const uint8_t flag_sub_op_sent = 1 << 4; static const uint8_t flag_commit_sent = 1 << 5; - OpRequest(Message *req, OpTracker *tracker) : - request(req), xitem(this), - rmw_flags(0), - warn_interval_multiplier(1), - lock("OpRequest::lock"), - tracker(tracker), - hit_flag_points(0), latest_flag_point(0), - seq(0) { - received_time = request->get_recv_stamp(); - tracker->register_inflight_op(&xitem); - } + OpRequest(Message *req, OpTracker *tracker); public: ~OpRequest() { assert(request); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 63e760e3b21..8e78eaa7a16 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1397,7 +1397,9 @@ void PG::queue_op(OpRequestRef op) osd->op_wq.queue(make_pair(PGRef(this), op)); } -void PG::do_request(OpRequestRef op) +void PG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) { // do any pending flush do_pending_flush(); @@ -1435,7 +1437,7 @@ void PG::do_request(OpRequestRef op) break; case MSG_OSD_PG_SCAN: - do_scan(op); + do_scan(op, handle); break; case MSG_OSD_PG_BACKFILL: diff --git a/src/osd/PG.h b/src/osd/PG.h index 8f572c75e19..d4679ce4fd8 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -645,7 +645,9 @@ public: virtual void check_local() = 0; - virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0; + virtual int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) = 0; void purge_strays(); @@ -1804,12 +1806,18 @@ public: // abstract bits - void do_request(OpRequestRef op); + void do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle + ); virtual void do_op(OpRequestRef op) = 0; virtual void do_sub_op(OpRequestRef op) = 0; virtual void do_sub_op_reply(OpRequestRef op) = 0; - virtual void do_scan(OpRequestRef op) = 0; + virtual void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle + ) = 0; virtual void do_backfill(OpRequestRef op) = 0; virtual void do_push(OpRequestRef op) = 0; virtual void do_pull(OpRequestRef op) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 658ea7cb746..ab9c8099a44 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1252,7 +1252,9 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) sub_op_modify_reply(op); } -void ReplicatedPG::do_scan(OpRequestRef op) +void ReplicatedPG::do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle) { MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request); assert(m->get_header().type == MSG_OSD_PG_SCAN); @@ -1278,7 +1280,9 @@ void ReplicatedPG::do_scan(OpRequestRef op) BackfillInterval bi; osr->flush(); - scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi); + scan_range( + m->begin, g_conf->osd_backfill_scan_min, + g_conf->osd_backfill_scan_max, &bi, handle); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); @@ -6875,7 +6879,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) } -int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) +int ReplicatedPG::start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) { int started = 0; assert(is_primary()); @@ -6898,15 +6904,15 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) if (num_missing == num_unfound) { // All of the missing objects we have are unfound. // Recover the replicas. - started = recover_replicas(max); + started = recover_replicas(max, handle); } if (!started) { // We still have missing objects that we should grab from replicas. - started += recover_primary(max); + started += recover_primary(max, handle); } if (!started && num_unfound != get_num_unfound()) { // second chance to recovery replicas - started = recover_replicas(max); + started = recover_replicas(max, handle); } bool deferred_backfill = false; @@ -6931,7 +6937,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) } deferred_backfill = true; } else { - started += recover_backfill(max - started); + started += recover_backfill(max - started, handle); } } @@ -6993,7 +6999,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) * do one recovery op. * return true if done, false if nothing left to do. */ -int ReplicatedPG::recover_primary(int max) +int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) { assert(is_primary()); @@ -7012,6 +7018,7 @@ int ReplicatedPG::recover_primary(int max) map<version_t, hobject_t>::const_iterator p = missing.rmissing.lower_bound(pg_log.get_log().last_requested); while (p != missing.rmissing.end()) { + handle.reset_tp_timeout(); hobject_t soid; version_t v = p->first; @@ -7204,7 +7211,7 @@ int ReplicatedPG::prep_object_replica_pushes( return 1; } -int ReplicatedPG::recover_replicas(int max) +int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) { dout(10) << __func__ << "(" << max << ")" << dendl; int started = 0; @@ -7226,6 +7233,7 @@ int ReplicatedPG::recover_replicas(int max) for (map<version_t, hobject_t>::const_iterator p = m.rmissing.begin(); p != m.rmissing.end() && started < max; ++p) { + handle.reset_tp_timeout(); const hobject_t soid(p->second); if (pushing.count(soid)) { @@ -7275,7 +7283,9 @@ int ReplicatedPG::recover_replicas(int max) * peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin, * backfill_info.begin, backfills_in_flight) */ -int ReplicatedPG::recover_backfill(int max) +int ReplicatedPG::recover_backfill( + int max, + ThreadPool::TPHandle &handle) { dout(10) << "recover_backfill (" << max << ")" << dendl; assert(backfill_target >= 0); @@ -7305,7 +7315,7 @@ int ReplicatedPG::recover_backfill(int max) dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl; backfill_info.clear(); osr->flush(); - scan_range(backfill_pos, local_min, local_max, &backfill_info); + scan_range(backfill_pos, local_min, local_max, &backfill_info, handle); int ops = 0; map<hobject_t, pair<eversion_t, eversion_t> > to_push; @@ -7319,7 +7329,8 @@ int ReplicatedPG::recover_backfill(int max) if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { osr->flush(); - scan_range(backfill_info.end, local_min, local_max, &backfill_info); + scan_range(backfill_info.end, local_min, local_max, &backfill_info, + handle); backfill_info.trim(); } backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin; @@ -7407,6 +7418,7 @@ int ReplicatedPG::recover_backfill(int max) for (map<hobject_t, eversion_t>::iterator i = to_remove.begin(); i != to_remove.end(); ++i) { + handle.reset_tp_timeout(); send_remove_op(i->first, i->second, backfill_target); } @@ -7414,6 +7426,7 @@ int ReplicatedPG::recover_backfill(int max) for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin(); i != to_push.end(); ++i) { + handle.reset_tp_timeout(); prep_backfill_object_push( i->first, i->second.first, i->second.second, backfill_target, &pushes); } @@ -7480,7 +7493,9 @@ void ReplicatedPG::prep_backfill_object_push( put_object_context(obc); } -void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi) +void ReplicatedPG::scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle) { assert(is_locked()); dout(10) << "scan_range from " << begin << dendl; @@ -7496,6 +7511,7 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva dout(20) << ls << dendl; for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) { + handle.reset_tp_timeout(); ObjectContext *obc = NULL; if (is_primary()) obc = _lookup_object_context(*p); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 7b70b4381ea..41c8106ea00 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -759,10 +759,13 @@ protected: void _clear_recovery_state(); void queue_for_recovery(); - int start_recovery_ops(int max, RecoveryCtx *prctx); - int recover_primary(int max); - int recover_replicas(int max); - int recover_backfill(int max); + int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle); + + int recover_primary(int max, ThreadPool::TPHandle &handle); + int recover_replicas(int max, ThreadPool::TPHandle &handle); + int recover_backfill(int max, ThreadPool::TPHandle &handle); /** * scan a (hash) range of objects in the current pg @@ -772,7 +775,10 @@ protected: * @max return no more than this many items * @bi [out] resulting map of objects to eversion_t's */ - void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi); + void scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle + ); void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, @@ -939,7 +945,9 @@ public: void do_pg_op(OpRequestRef op); void do_sub_op(OpRequestRef op); void do_sub_op_reply(OpRequestRef op); - void do_scan(OpRequestRef op); + void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle); void do_backfill(OpRequestRef op); void _do_push(OpRequestRef op); void _do_pull_response(OpRequestRef op); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2b1e7f9e41b..88336cacce3 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -2348,9 +2348,8 @@ bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap, m_objecter->client_lock.Lock(); m_objecter->dump_requests(f); m_objecter->client_lock.Unlock(); - f->flush(ss); + f->flush(out); delete f; - out.append(ss); return true; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index aba5cdf0ee2..222b79a7d2e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2551,7 +2551,7 @@ int RGWRados::copy_obj(void *ctx, conn = rest_master_conn; } else { map<string, RGWRESTConn *>::iterator iter = region_conn_map.find(src_bucket_info.region); - if (iter == zone_conn_map.end()) { + if (iter == region_conn_map.end()) { ldout(cct, 0) << "could not find region connection to region: " << source_zone << dendl; return -ENOENT; } |