diff options
author | Sage Weil <sage@inktank.com> | 2013-04-03 21:30:51 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-04-06 08:17:01 -0700 |
commit | f7070e956859357fefede892886db98a7958d71a (patch) | |
tree | f695b0beca6c86ed586f7a4457a674fbca634abd | |
parent | 79b71441f8c2a1b282fa0e85badcb7d410c8005d (diff) | |
download | ceph-f7070e956859357fefede892886db98a7958d71a.tar.gz |
msgr: add second per-message throttler to message policy
We already have a throttler that lets of limit the amount of memory
consumed by messages from a given source. Currently this is based only
on the size of the message payload. Add a second throttler that limits
the number of messages so that we can effectively throttle small requests
as well.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/ceph_mon.cc | 6 | ||||
-rw-r--r-- | src/ceph_osd.cc | 6 | ||||
-rw-r--r-- | src/msg/Message.h | 50 | ||||
-rw-r--r-- | src/msg/Messenger.h | 13 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 39 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 13 | ||||
-rw-r--r-- | src/test/mon/test_mon_workloadgen.cc | 4 |
7 files changed, 85 insertions, 46 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 4a4df8942e9..72354e18876 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -407,15 +407,15 @@ int main(int argc, const char **argv) // throttle client traffic Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes", g_conf->mon_client_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL); // throttle daemon traffic // NOTE: actual usage on the leader may multiply by the number of // monitors if they forward large update messages from daemons. Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes", g_conf->mon_daemon_bytes); - messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler); - messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL); + messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL); cout << "starting " << g_conf->name << " rank " << rank << " at " << ipaddr diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 5a90abd6125..d7735a7a83a 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -349,9 +349,9 @@ int main(int argc, const char **argv) CEPH_FEATURE_MSG_AUTH; client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - client_messenger->set_policy_throttler( - entity_name_t::TYPE_CLIENT, - client_throttler.get()); // default, actually + client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + client_throttler.get(), + NULL); client_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | diff --git a/src/msg/Message.h b/src/msg/Message.h index 1bf28e36f2d..33d26b2e7da 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -299,7 +299,10 @@ protected: // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. - Throttle *throttler; + Throttle *byte_throttler; + + // release a count back to this throttler when we are destroyed + Throttle *msg_throttler; // keep track of how big this message was when we reserved space in // the msgr dispatch_throttler, so that we can properly release it @@ -313,14 +316,16 @@ protected: public: Message() : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; Message(int t, int version=1, int compat_version=0) : connection(NULL), - throttler(NULL), + byte_throttler(NULL), + msg_throttler(NULL), dispatch_throttle_size(0) { memset(&header, 0, sizeof(header)); header.type = t; @@ -340,8 +345,10 @@ protected: assert(nref.read() == 0); if (connection) connection->put(); - if (throttler) - throttler->put(payload.length() + middle.length() + data.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length() + data.length()); + if (msg_throttler) + msg_throttler->put(); } public: Connection *get_connection() { return connection; } @@ -350,8 +357,10 @@ public: connection->put(); connection = c; } - void set_throttler(Throttle *t) { throttler = t; } - Throttle *get_throttler() { return throttler; } + void set_byte_throttler(Throttle *t) { byte_throttler = t; } + Throttle *get_byte_throttler() { return byte_throttler; } + void set_message_throttler(Throttle *t) { msg_throttler = t; } + Throttle *get_message_throttler() { return msg_throttler; } void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; } @@ -369,39 +378,48 @@ public: */ void clear_payload() { - if (throttler) throttler->put(payload.length() + middle.length()); + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length()); payload.clear(); middle.clear(); } void clear_data() { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data.clear(); } bool empty_payload() { return payload.length() == 0; } bufferlist& get_payload() { return payload; } void set_payload(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); payload.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } void set_middle(bufferlist& bl) { - if (throttler) throttler->put(payload.length()); + if (byte_throttler) + byte_throttler->put(payload.length()); middle.claim(bl); - if (throttler) throttler->take(payload.length()); + if (byte_throttler) + byte_throttler->take(payload.length()); } bufferlist& get_middle() { return middle; } void set_data(const bufferlist &d) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); data = d; - if (throttler) throttler->take(data.length()); + if (byte_throttler) + byte_throttler->take(data.length()); } bufferlist& get_data() { return data; } void claim_data(bufferlist& bl) { - if (throttler) throttler->put(data.length()); + if (byte_throttler) + byte_throttler->put(data.length()); bl.claim(data); } off_t get_data_len() { return data.length(); } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 7205940c118..b08fdaa7f30 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -74,7 +74,8 @@ public: * the associated Connection(s). When reading in a new Message, the Messenger * will call throttler->throttle() for the size of the new Message. */ - Throttle *throttler; + Throttle *throttler_bytes; + Throttle *throttler_messages; /// Specify features supported locally by the endpoint. uint64_t features_supported; @@ -82,12 +83,16 @@ public: uint64_t features_required; Policy() - : lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL), + : lossy(false), server(false), standby(false), resetcheck(true), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(0) {} private: Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req) - : lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL), + : lossy(l), server(s), standby(st), resetcheck(r), + throttler_bytes(NULL), + throttler_messages(NULL), features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT), features_required(req) {} @@ -266,7 +271,7 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy the Messenger. */ - virtual void set_policy_throttler(int type, Throttle *t) = 0; + virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; /** * Set the default send priority * diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index ae94a6a340c..343f975225b 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -1661,14 +1661,20 @@ int Pipe::read_message(Message **pm) Message *message; utime_t recv_stamp = ceph_clock_now(msgr->cct); + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->get(); + } + uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { - bool waited_on_throttle = false; - if (policy.throttler) { - ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - waited_on_throttle = policy.throttler->get(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->get(message_size); } // throttle total bytes waiting for dispatch. do this _after_ the @@ -1678,7 +1684,7 @@ int Pipe::read_message(Message **pm) ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " << msgr->dispatch_throttler.get_current() << "/" << msgr->dispatch_throttler.get_max() << dendl; - waited_on_throttle |= msgr->dispatch_throttler.get(message_size); + msgr->dispatch_throttler.get(message_size); } utime_t throttle_stamp = ceph_clock_now(msgr->cct); @@ -1807,7 +1813,8 @@ int Pipe::read_message(Message **pm) } } - message->set_throttler(policy.throttler); + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. @@ -1822,12 +1829,18 @@ int Pipe::read_message(Message **pm) out_dethrottle: // release bytes reserved from the throttlers on failure + if (policy.throttler_messages) { + ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } if (message_size) { - if (policy.throttler) { - ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler " - << policy.throttler->get_current() << "/" - << policy.throttler->get_max() << dendl; - policy.throttler->put(message_size); + if (policy.throttler_bytes) { + ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(message_size); } msgr->dispatch_throttle_release(message_size); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index cc946e3d25a..d837a4496ae 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -162,12 +162,15 @@ public: * ownership of this pointer, but you must not destroy it before * you destroy SimpleMessenger. */ - void set_policy_throttler(int type, Throttle *t) { + void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) { Mutex::Locker l(policy_lock); - if (policy_map.count(type)) - policy_map[type].throttler = t; - else - default_policy.throttler = t; + if (policy_map.count(type)) { + policy_map[type].throttler_bytes = byte_throttle; + policy_map[type].throttler_messages = msg_throttle; + } else { + default_policy.throttler_bytes = byte_throttle; + default_policy.throttler_messages = msg_throttle; + } } /** * Bind the SimpleMessenger to a specific address. If bind_addr diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 216e6288b1f..07f999180a3 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -366,8 +366,8 @@ class OSDStub : public TestStub messenger->set_default_policy( Messenger::Policy::stateless_server(supported, 0)); - messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, - &throttler); + messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + &throttler, NULL); messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | CEPH_FEATURE_PGID64 | |