diff options
author | Greg Farnum <greg@inktank.com> | 2012-11-21 10:54:06 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-11-29 16:09:44 -0800 |
commit | 90f66980bfb1f2541dcb11be2c358a9832a291b1 (patch) | |
tree | 82b7372bf5a5c1e354e7d7f73aa3861609aca912 | |
parent | 77711ddee37de154c0d5d452c4f84dae36eb3e3a (diff) | |
download | ceph-90f66980bfb1f2541dcb11be2c358a9832a291b1.tar.gz |
messenger: add the shell of a system to delay incoming Message delivery
When ms_inject_delay_type matches that of the incoming Connection,
the Pipe sets up a delay queue that it shuttles all Messages through.
This lets us check cleanup and some notification code but doesn't
actually generate any delays.
Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 56 | ||||
-rw-r--r-- | src/msg/Pipe.h | 32 |
3 files changed, 90 insertions, 1 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 8699d789164..a067934c2f1 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) OPTION(ms_inject_socket_failures, OPT_U64, 0) +OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed +OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds +OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id") OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index e273ad49f2b..695be3b3e8e 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -54,6 +54,8 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) : reader_thread(this), writer_thread(this), + dispatch_thread(NULL), delay_queue(NULL), + delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true), msgr(r), conn_id(r->dispatch_queue.get_id()), sd(-1), port(0), @@ -85,6 +87,15 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms if (msgr->timeout == 0) msgr->timeout = -1; + + if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) + != string::npos) { + lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; + dispatch_thread = new DelayedDelivery(this); + delay_queue = new std::deque< Message * >(); + delay_lock = new Mutex("delay_lock"); + delay_cond = new Cond(); + } } Pipe::~Pipe() @@ -94,6 +105,14 @@ Pipe::~Pipe() if (connection_state) connection_state->put(); delete session_security; + if (dispatch_thread) { + delete dispatch_thread; + assert(delay_queue->empty()); + delete delay_queue; + assert(!delay_lock->is_locked()); + delete delay_lock; + delete delay_cond; + } } void Pipe::handle_ack(uint64_t seq) @@ -125,6 +144,13 @@ void Pipe::start_reader() } reader_running = true; reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); + if (dispatch_thread && stop_delayed_delivery) { + lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl; + delay_lock->Lock(); + stop_delayed_delivery = false; + dispatch_thread->create(); + delay_lock->Unlock(); + } } void Pipe::start_writer() @@ -150,10 +176,32 @@ void Pipe::join_reader() void Pipe::queue_received(Message *m, int priority) { assert(pipe_lock.is_locked()); + if (delay_queue) { + lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl; + Mutex::Locker locker(*delay_lock); + delay_queue->push_back(m); + delay_cond->Signal(); + return; + } in_q->enqueue(m, priority, conn_id); } - +void Pipe::delayed_delivery() { + Mutex::Locker locker(*delay_lock); + if (delay_queue->empty()) + lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond->Wait(*delay_lock); + while (!stop_delayed_delivery) { + Message *m = delay_queue->front(); + lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl; + delay_queue->pop_front(); + in_q->enqueue(m, m->get_priority(), conn_id); + if (delay_queue->empty()) { + lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl; + delay_cond->Wait(*delay_lock); + } + } +} int Pipe::accept() { @@ -1141,6 +1189,12 @@ void Pipe::stop() state = STATE_CLOSED; cond.Signal(); shutdown_socket(); + if (dispatch_thread) { + lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl; + Mutex::Locker locker(*delay_lock); + stop_delayed_delivery = true; + delay_cond->Signal(); + } } diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index cc92b2f6f8b..648e2e87c9f 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -60,6 +60,29 @@ class DispatchQueue; } writer_thread; friend class Writer; + /** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * in_q (SimpleMessenger::dispatch_queue). + * Please note that this probably has issues with Pipe shutdown and + * replacement semantics. I've tried, but no guarantees. + */ + class DelayedDelivery: public Thread { + Pipe *pipe; + public: + DelayedDelivery(Pipe *p) : pipe(p) {} + void *entry() { pipe->delayed_delivery(); return 0; } + }; + friend class DelayedDelivery; + + DelayedDelivery *dispatch_thread; + // TODO: clean up the delay_queue better on shutdown + std::deque< Message * > *delay_queue; + Mutex *delay_lock; + Cond *delay_cond; + bool stop_delayed_delivery; + public: Pipe(SimpleMessenger *r, int st, Connection *con); ~Pipe(); @@ -185,6 +208,8 @@ class DispatchQueue; queue_received(m, m->get_priority()); } + void delayed_delivery(); + __u32 get_out_seq() { return out_seq; } bool is_queued() { return !out_q.empty() || keepalive; } @@ -208,6 +233,13 @@ class DispatchQueue; writer_thread.join(); if (reader_thread.is_started()) reader_thread.join(); + if (dispatch_thread && dispatch_thread->is_started()) { + delay_lock->Lock(); + stop_delayed_delivery = true; + delay_cond->Signal(); + delay_lock->Unlock(); + dispatch_thread->join(); + } } void stop(); |