diff options
author | Josh Durgin <josh.durgin@inktank.com> | 2013-04-10 10:45:07 -0700 |
---|---|---|
committer | Josh Durgin <josh.durgin@inktank.com> | 2013-04-10 10:45:07 -0700 |
commit | 1ef0ecfcb40970641784c1ba214dd5a72ff6d5e3 (patch) | |
tree | 4a044a8ac71e161ab91224424c8eb4ae77557721 | |
parent | dc1af54cc50b7d042d3d3bf92e51297ec96d9743 (diff) | |
parent | 3888a12385aa6fcf35c9cdce9ad82a2cdd3377b7 (diff) | |
download | ceph-1ef0ecfcb40970641784c1ba214dd5a72ff6d5e3.tar.gz |
Merge branch 'next'
-rw-r--r-- | src/ceph_mon.cc | 6 | ||||
-rw-r--r-- | src/ceph_osd.cc | 14 | ||||
-rw-r--r-- | src/common/ceph_argparse.cc | 52 | ||||
-rw-r--r-- | src/common/ceph_argparse.h | 5 | ||||
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/global/global_init.cc | 1 | ||||
-rw-r--r-- | src/mds/Server.cc | 3 | ||||
-rw-r--r-- | src/mon/MDSMonitor.cc | 3 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 2 | ||||
-rw-r--r-- | src/mon/MonmapMonitor.cc | 3 | ||||
-rw-r--r-- | src/mon/OSDMonitor.cc | 3 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 7 | ||||
-rw-r--r-- | src/msg/Message.h | 50 | ||||
-rw-r--r-- | src/msg/Messenger.h | 13 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 46 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 13 | ||||
-rw-r--r-- | src/os/FileJournal.cc | 4 | ||||
-rw-r--r-- | src/os/FileJournal.h | 17 | ||||
-rw-r--r-- | src/osdc/Journaler.cc | 18 | ||||
-rw-r--r-- | src/osdc/Journaler.h | 4 | ||||
-rw-r--r-- | src/test/cli-integration/rbd/formatted-output.t | 16 | ||||
-rw-r--r-- | src/test/mon/test_mon_workloadgen.cc | 4 |
22 files changed, 199 insertions, 86 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 4a4df8942e9..72354e18876 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -407,15 +407,15 @@ int main(int argc, const char **argv) // throttle client traffic Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes", g_conf->mon_client_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL); // throttle daemon traffic // NOTE: actual usage on the leader may multiply by the number of // monitors if they forward large update messages from daemons. Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes", g_conf->mon_daemon_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler); - messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL); + messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL); cout << "starting " << g_conf->name << " rank " << rank << " at " << ipaddr diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 5a90abd6125..33a107c1dc0 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -338,9 +338,12 @@ int main(int argc, const char **argv) "(no journal)" : g_conf->osd_journal) << std::endl; - boost::scoped_ptr<Throttle> client_throttler( + boost::scoped_ptr<Throttle> client_byte_throttler( new Throttle(g_ceph_context, "osd_client_bytes", g_conf->osd_client_message_size_cap)); + boost::scoped_ptr<Throttle> client_msg_throttler( + new Throttle(g_ceph_context, "osd_client_messages", + g_conf->osd_client_message_cap)); uint64_t supported = CEPH_FEATURE_UID | @@ -349,9 +352,9 @@ int main(int argc, const char **argv) CEPH_FEATURE_MSG_AUTH; client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - client_messenger->set_policy_throttler( - entity_name_t::TYPE_CLIENT, - client_throttler.get()); // default, actually + client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + client_byte_throttler.get(), + client_msg_throttler.get()); client_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | @@ -462,7 +465,8 @@ int main(int argc, const char **argv) delete messenger_hbclient; delete messenger_hbserver; delete cluster_messenger; - client_throttler.reset(); + client_byte_throttler.reset(); + client_msg_throttler.reset(); g_ceph_context->put(); // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. diff --git a/src/common/ceph_argparse.cc b/src/common/ceph_argparse.cc index 889c713d736..2ff520e45c9 100644 --- a/src/common/ceph_argparse.cc +++ b/src/common/ceph_argparse.cc @@ -252,7 +252,8 @@ bool ceph_argparse_binary_flag(std::vector<const char*> &args, } static bool va_ceph_argparse_witharg(std::vector<const char*> &args, - std::vector<const char*>::iterator &i, std::string *ret, va_list ap) + std::vector<const char*>::iterator &i, std::string *ret, bool cli, + va_list ap) { const char *first = *i; char tmp[strlen(first)+1]; @@ -278,8 +279,12 @@ static bool va_ceph_argparse_witharg(std::vector<const char*> &args, else if (first[strlen_a] == '\0') { // find second part (or not) if (i+1 == args.end()) { - cerr << "Option " << *i << " requires an argument." << std::endl; - _exit(1); + if (cli) { + cerr << "Option " << *i << " requires an argument." << std::endl; + _exit(1); + } else { + return false; + } } i = args.erase(i); *ret = *i; @@ -296,11 +301,21 @@ bool ceph_argparse_witharg(std::vector<const char*> &args, bool r; va_list ap; va_start(ap, ret); - r = va_ceph_argparse_witharg(args, i, ret, ap); + r = va_ceph_argparse_witharg(args, i, ret, false, ap); va_end(ap); return r; } +bool ceph_argparse_witharg_daemon(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, std::string *ret, ...) +{ + bool r; + va_list ap; + va_start(ap, ret); + r = va_ceph_argparse_witharg(args, i, ret, false, ap); + va_end(ap); + return r; +} bool ceph_argparse_withint(std::vector<const char*> &args, std::vector<const char*>::iterator &i, int *ret, std::ostream *oss, ...) @@ -309,7 +324,30 @@ bool ceph_argparse_withint(std::vector<const char*> &args, va_list ap; std::string str; va_start(ap, oss); - r = va_ceph_argparse_witharg(args, i, &str, ap); + r = va_ceph_argparse_witharg(args, i, &str, true, ap); + va_end(ap); + if (!r) { + return false; + } + + std::string err; + int myret = strict_strtol(str.c_str(), 10, &err); + *ret = myret; + if (!err.empty()) { + *oss << err; + } + return true; +} + +bool ceph_argparse_withint_daemon(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, int *ret, + std::ostream *oss, ...) +{ + bool r; + va_list ap; + std::string str; + va_start(ap, oss); + r = va_ceph_argparse_witharg(args, i, &str, false, ap); va_end(ap); if (!r) { return false; @@ -332,7 +370,7 @@ bool ceph_argparse_withlonglong(std::vector<const char*> &args, va_list ap; std::string str; va_start(ap, oss); - r = va_ceph_argparse_witharg(args, i, &str, ap); + r = va_ceph_argparse_witharg(args, i, &str, false, ap); va_end(ap); if (!r) { return false; @@ -355,7 +393,7 @@ bool ceph_argparse_withfloat(std::vector<const char*> &args, va_list ap; std::string str; va_start(ap, oss); - r = va_ceph_argparse_witharg(args, i, &str, ap); + r = va_ceph_argparse_witharg(args, i, &str, false, ap); va_end(ap); if (!r) { return false; diff --git a/src/common/ceph_argparse.h b/src/common/ceph_argparse.h index 3ef0251abeb..5249704c0b8 100644 --- a/src/common/ceph_argparse.h +++ b/src/common/ceph_argparse.h @@ -57,6 +57,8 @@ bool ceph_argparse_flag(std::vector<const char*> &args, std::vector<const char*>::iterator &i, ...); bool ceph_argparse_witharg(std::vector<const char*> &args, std::vector<const char*>::iterator &i, std::string *ret, ...); +bool ceph_argparse_witharg_daemon(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, std::string *ret, ...); bool ceph_argparse_binary_flag(std::vector<const char*> &args, std::vector<const char*>::iterator &i, int *ret, std::ostream *oss, ...); @@ -66,6 +68,9 @@ extern CephInitParameters ceph_argparse_early_args extern bool ceph_argparse_withint(std::vector<const char*> &args, std::vector<const char*>::iterator &i, int *ret, std::ostream *oss, ...); +extern bool ceph_argparse_withint_daemon(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, int *ret, + std::ostream *oss, ...); extern bool ceph_argparse_withfloat(std::vector<const char*> &args, std::vector<const char*>::iterator &i, float *ret, std::ostream *oss, ...); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index cb2fd391fc9..19f3082ec4b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -349,6 +349,7 @@ OPTION(osd_journal_size, OPT_INT, 5120) // in mb OPTION(osd_max_write_size, OPT_INT, 90) OPTION(osd_max_pgls, OPT_U64, 1024) // max number of pgls entries to return OPTION(osd_client_message_size_cap, OPT_U64, 500*1024L*1024L) // client data allowed in-memory (in bytes) +OPTION(osd_client_message_cap, OPT_U64, 100) // num client messages allowed in-memory OPTION(osd_pg_bits, OPT_INT, 6) // bits per osd OPTION(osd_pgp_bits, OPT_INT, 6) // bits per osd OPTION(osd_crush_chooseleaf_type, OPT_INT, 1) // 1 = host diff --git a/src/global/global_init.cc b/src/global/global_init.cc index 43ce0909565..0f7179c39df 100644 --- a/src/global/global_init.cc +++ b/src/global/global_init.cc @@ -148,6 +148,7 @@ void global_init_daemonize(CephContext *cct, int flags) return; // stop log thread + g_ceph_context->_log->flush(); g_ceph_context->_log->stop(); int ret = daemon(1, 1); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 293640e4870..dc7ea23f763 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -2684,11 +2684,12 @@ public: mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR); + mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool); + MClientReply *reply = new MClientReply(mdr->client_request, 0); reply->set_extra_bl(mdr->reply_extra_bl); mds->server->reply_request(mdr, reply); - mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool); assert(g_conf->mds_kill_openc_at != 1); } }; diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 0ca0f78bf45..3ea6d860039 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -548,7 +548,8 @@ bool MDSMonitor::preprocess_command(MMonCommand *m) for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) break; - else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL)) + else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format", + (char*)NULL)) format = val; else if (!epoch) { long l = parse_pos_long(*i++, &ss); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index d8d4579f67b..a49d08a02af 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2483,7 +2483,7 @@ void Monitor::handle_command(MMonCommand *m) JSONFormatter *jf = NULL; for (vector<const char*>::iterator i = args.begin(); i != args.end();) { string val; - if (ceph_argparse_witharg(args, i, &val, + if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format", (char*)NULL)) { format = val; } else { diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc index af8d7299227..f1c6dfa3325 100644 --- a/src/mon/MonmapMonitor.cc +++ b/src/mon/MonmapMonitor.cc @@ -222,7 +222,8 @@ bool MonmapMonitor::preprocess_command(MMonCommand *m) for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) break; - else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL)) + else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format", + (char*)NULL)) format = val; else if (!epoch) { long l = parse_pos_long(*i++, &ss); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index a9d68075cd4..40103ec402f 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -1894,7 +1894,8 @@ bool OSDMonitor::preprocess_command(MMonCommand *m) for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) break; - else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL)) + else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format", + (char*)NULL)) format = val; else if (!epoch) { long l = parse_pos_long(*i++, &ss); diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index c05897215cc..fe50b34da8c 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -1096,7 +1096,8 @@ bool PGMonitor::preprocess_command(MMonCommand *m) for (std::vector<const char*>::iterator i = args.begin()+1; i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) { break; - } else if (ceph_argparse_witharg(args, i, &val, "-f", "--format", (char*)NULL)) { + } else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", + "--format", (char*)NULL)) { format = val; } else { what = *i++; @@ -1572,14 +1573,14 @@ int PGMonitor::dump_stuck_pg_stats(ostream& ss, i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) { break; - } else if (ceph_argparse_witharg(args, i, &val, + } else if (ceph_argparse_witharg_daemon(args, i, &val, "-f", "--format", (char*)NULL)) { if (val != "json" && val != "plain") { ss << "format must be json or plain"; return -EINVAL; } format = val; - } else if (ceph_argparse_withint(args, i, &seconds, &err, + } else if (ceph_argparse_withint_daemon(args, i, &seconds, &err, "-t", "--threshold", (char*)NULL)) { if (!err.str().empty()) { ss << err.str(); diff --git a/src/msg/Message.h b/src/msg/Message.h index 1bf28e36f2d..33d26b2e7da 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -299,7 +299,10 @@ protected: // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. - Throttle *throttler; + Throttle *byte_throttler; + + // release a count back to this throttler when we are destroyed + Throttle *msg_throttler; // keep track of how big this message was when we reserved space in // the msgr dispatch_throttler, so that we can properly release it @@ -313,14 +316,16 @@ protected: public: Message() : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; Message(int t, int version=1, int compat_version=0) : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); header.type = t; @@ -340,8 +345,10 @@ protected: assert(nref.read() == 0); if (connection) connection->put(); - if (throttler) - throttler->put(payload.length() + middle.length() + data.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length() + data.length()); + if (msg_throttler) + msg_throttler->put(); } public: Connection *get_connection() { return connection; } @@ -350,8 +357,10 @@ public: connection->put(); connection = c; } - void set_throttler(Throttle *t) { throttler = t; } - Throttle *get_throttler() { return throttler; } + void set_byte_throttler(Throttle *t) { byte_throttler = t; } + Throttle *get_byte_throttler() { return byte_throttler; } + void set_message_throttler(Throttle *t) { msg_throttler = t; } + Throttle *get_message_throttler() { return msg_throttler; } void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; } @@ -369,39 +378,48 @@ public: */ void clear_payload() { - if (throttler) throttler->put(payload.length() + middle.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length()); payload.clear(); middle.clear(); } void clear_data() { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data.clear(); } bool empty_payload() { return payload.length() == 0; } bufferlist& get_payload() { return payload; } void set_payload(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); payload.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } void set_middle(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); middle.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } bufferlist& get_middle() { return middle; } void set_data(const bufferlist &d) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data = d; - if (throttler) throttler->take(data.length()); + if (byte_throttler) + byte_throttler->take(data.length()); } bufferlist& get_data() { return data; } void claim_data(bufferlist& bl) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); bl.claim(data); } off_t get_data_len() { return data.length(); } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 7205940c118..b08fdaa7f30 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -74,7 +74,8 @@ public: * the associated Connection(s). When reading in a new Message, the Messenger * will call throttler->throttle() for the size of the new Message. */ - Throttle *throttler; + Throttle *throttler_bytes; + Throttle *throttler_messages; /// Specify features supported locally by the endpoint. uint64_t features_supported; @@ -82,12 +83,16 @@ public: uint64_t features_required; Policy() - : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL), + : lossy(false), server(false), standby(false), resetcheck(true), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(0) {} private: Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req) - : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL), + : lossy(l), server(s), standby(st), resetcheck(r), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(req) {} @@ -266,7 +271,7 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy the Messenger. */ - virtual void set_policy_throttler(int type, Throttle *t) = 0; + virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; /** * Set the default send priority * diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index ae94a6a340c..f4100bc483b 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -311,6 +311,10 @@ int Pipe::accept() // http://ceph.newdream.net/wiki/Messaging_protocol int reply_tag = 0; uint64_t existing_seq = -1; + + // used for reading in the remote acked seq on connect + uint64_t newly_acked_seq = 0; + while (1) { if (tcp_read((char*)&connect, sizeof(connect)) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; @@ -639,7 +643,6 @@ int Pipe::accept() } if (reply_tag == CEPH_MSGR_TAG_SEQ) { - uint64_t newly_acked_seq = 0; if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_registered; @@ -648,10 +651,10 @@ int Pipe::accept() ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; goto fail_registered; } - discard_requeued_up_to(newly_acked_seq); } pipe_lock.Lock(); + discard_requeued_up_to(newly_acked_seq); if (state != STATE_CLOSED) { ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl; start_writer(); @@ -1661,14 +1664,20 @@ int Pipe::read_message(Message **pm) Message *message; utime_t recv_stamp = ceph_clock_now(msgr->cct); + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->get(); + } + uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { - bool waited_on_throttle = false; - if (policy.throttler) { - ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - waited_on_throttle = policy.throttler->get(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->get(message_size); } // throttle total bytes waiting for dispatch. do this _after_ the @@ -1678,7 +1687,7 @@ int Pipe::read_message(Message **pm) ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " << msgr->dispatch_throttler.get_current() << "/" << msgr->dispatch_throttler.get_max() << dendl; - waited_on_throttle |= msgr->dispatch_throttler.get(message_size); + msgr->dispatch_throttler.get(message_size); } utime_t throttle_stamp = ceph_clock_now(msgr->cct); @@ -1807,7 +1816,8 @@ int Pipe::read_message(Message **pm) } } - message->set_throttler(policy.throttler); + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. @@ -1822,12 +1832,18 @@ int Pipe::read_message(Message **pm) out_dethrottle: // release bytes reserved from the throttlers on failure + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } if (message_size) { - if (policy.throttler) { - ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - policy.throttler->put(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(message_size); } msgr->dispatch_throttle_release(message_size); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index cc946e3d25a..d837a4496ae 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -162,12 +162,15 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy SimpleMessenger. */ - void set_policy_throttler(int type, Throttle *t) { + void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) { Mutex::Locker l(policy_lock); - if (policy_map.count(type)) - policy_map[type].throttler = t; - else - default_policy.throttler = t; + if (policy_map.count(type)) { + policy_map[type].throttler_bytes = byte_throttle; + policy_map[type].throttler_messages = msg_throttle; + } else { + default_policy.throttler_bytes = byte_throttle; + default_policy.throttler_messages = msg_throttle; + } } /** * Bind the SimpleMessenger to a specific address. If bind_addr diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index f5e8f34137e..29139c12bca 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -389,9 +389,7 @@ int FileJournal::create() header.alignment = 16; // at least stay word aligned on 64bit machines... header.start = get_top(); - /* FileStore::mkfs initializes the fs op sequence file at 1. Therefore, - * the first entry written must be at sequence 2.*/ - header.start_seq = 2; + header.start_seq = 0; print_header(); diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 7b1777928cd..0e826fb4940 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -109,7 +109,22 @@ public: int64_t max_size; // max size of journal ring buffer int64_t start; // offset of first entry uint64_t committed_up_to; // committed up to - uint64_t start_seq; // entry at header.start + + /** + * start_seq + * + * entry at header.start has sequence >= start_seq + * + * Generally, the entry at header.start will have sequence + * start_seq if it exists. The only exception is immediately + * after journal creation since the first sequence number is + * not known. + * + * If the first read on open fails, we can assume corruption + * if start_seq > committed_up_thru because the entry would have + * a sequence >= start_seq and therefore > committed_up_thru. + */ + uint64_t start_seq; header_t() : flags(0), block_size(0), alignment(0), max_size(0), start(0), diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 544ac8c49c4..cd9b9edc4c7 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -64,7 +64,6 @@ void Journaler::set_layout(ceph_file_layout *l) if (periods < 2) periods = 2; // we need at least 2 periods to make progress. fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods; - prefetch_from = fetch_len / 2; } @@ -835,6 +834,7 @@ void Journaler::_issue_read(uint64_t len) void Journaler::_prefetch() { + ldout(cct, 10) << "_prefetch" << dendl; // prefetch uint64_t pf; if (temp_fetch_len) { @@ -847,9 +847,11 @@ void Journaler::_prefetch() uint64_t raw_target = read_pos + pf; - // only read full log segments + // read full log segments, so increase if necessary uint64_t period = get_layout_period(); - uint64_t target = raw_target - (raw_target % period); + uint64_t remainder = raw_target % period; + uint64_t adjustment = remainder ? period - remainder : 0; + uint64_t target = raw_target + adjustment; // don't read past the log tail if (target > write_pos) @@ -883,7 +885,9 @@ bool Journaler::_is_readable() read_buf.length() >= sizeof(s) + s) return true; // yep, next entry is ready. - // darn it! + ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() + << ", but need " << s + sizeof(s) + << " for next entry; fetch_len is " << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { @@ -902,12 +906,14 @@ bool Journaler::_is_readable() return false; } - uint64_t need = (sizeof(s)+s-read_buf.length()); + uint64_t need = sizeof(s) + s; if (need > fetch_len) { + temp_fetch_len = sizeof(s) + s; ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << " for len " << s << " entry" << dendl; - temp_fetch_len = need; } + + ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; return false; } diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 81a39d21a5b..dfc2115c18b 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -194,7 +194,6 @@ private: uint64_t fetch_len; // how much to read at a time uint64_t temp_fetch_len; - uint64_t prefetch_from; // how far from end do we read next chunk // for wait_for_readable() Context *on_readable; @@ -251,7 +250,7 @@ public: prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), safe_pos(0), waiting_for_zero(false), read_pos(0), requested_pos(0), received_pos(0), - fetch_len(0), temp_fetch_len(0), prefetch_from(0), + fetch_len(0), temp_fetch_len(0), on_readable(0), on_write_error(NULL), expire_pos(0), trimming_pos(0), trimmed_pos(0) { @@ -273,7 +272,6 @@ public: requested_pos = 0; received_pos = 0; fetch_len = 0; - prefetch_from = 0; assert(!on_readable); expire_pos = 0; trimming_pos = 0; diff --git a/src/test/cli-integration/rbd/formatted-output.t b/src/test/cli-integration/rbd/formatted-output.t index ef0b7042c8a..7be9c594a44 100644 --- a/src/test/cli-integration/rbd/formatted-output.t +++ b/src/test/cli-integration/rbd/formatted-output.t @@ -19,7 +19,7 @@ clone $ rbd snap protect bar@snap $ rbd clone bar@snap data/child $ rbd snap create data/child@snap - $ rbd flatten data/child > /dev/null + $ rbd flatten data/child 2> /dev/null lock ==== @@ -687,10 +687,10 @@ whenever it is run. grep -v to ignore it, but still work on other distros. # cleanup $ rbd snap remove data/child@snap $ rbd snap unprotect bar@snap - $ rbd snap purge bar > /dev/null - $ rbd snap purge foo > /dev/null - $ rbd rm data/child > /dev/null - $ rbd rm foo > /dev/null - $ rbd rm bar > /dev/null - $ rbd rm quux > /dev/null - $ rbd rm baz > /dev/null + $ rbd snap purge bar 2> /dev/null + $ rbd snap purge foo 2> /dev/null + $ rbd rm data/child 2> /dev/null + $ rbd rm foo 2> /dev/null + $ rbd rm bar 2> /dev/null + $ rbd rm quux 2> /dev/null + $ rbd rm baz 2> /dev/null diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 216e6288b1f..07f999180a3 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -366,8 +366,8 @@ class OSDStub : public TestStub messenger->set_default_policy( Messenger::Policy::stateless_server(supported, 0)); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, - &throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + &throttler, NULL); messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | CEPH_FEATURE_PGID64 | |