diff options
author | Sage Weil <sage@inktank.com> | 2012-12-04 14:52:22 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-12-04 14:52:22 -0800 |
commit | f3bd3564fa26efa5c5d6664464d247fe974db902 (patch) | |
tree | bd677641b5967d642bf76117747e75295770743b | |
parent | 727c37a7122da3b647e79cede7fc5ba4bcd377d7 (diff) | |
parent | 8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3 (diff) | |
download | ceph-f3bd3564fa26efa5c5d6664464d247fe974db902.tar.gz |
Merge branch 'wip-msgr-delay-queue' into next
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 92 | ||||
-rw-r--r-- | src/msg/Pipe.h | 58 |
3 files changed, 133 insertions, 20 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 0cdb16c3ebf..eedb6d6a2f5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) OPTION(ms_inject_socket_failures, OPT_U64, 0) +OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed +OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds +OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id") OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index e273ad49f2b..1ebf2854473 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -54,6 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) : reader_thread(this), writer_thread(this), + delay_thread(NULL), msgr(r), conn_id(r->dispatch_queue.get_id()), sd(-1), port(0), @@ -94,6 +95,7 @@ Pipe::~Pipe() if (connection_state) connection_state->put(); delete session_security; + delete delay_thread; } void Pipe::handle_ack(uint64_t seq) @@ -127,6 +129,16 @@ void Pipe::start_reader() reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); } +void Pipe::maybe_start_delay_thread() +{ + if (!delay_thread && + msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) { + lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; + delay_thread = new DelayedDelivery(this); + delay_thread->create(); + } +} + void Pipe::start_writer() { assert(pipe_lock.is_locked()); @@ -146,14 +158,54 @@ void Pipe::join_reader() reader_needs_join = false; } +void Pipe::DelayedDelivery::discard() +{ + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + pipe->msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } +} -void Pipe::queue_received(Message *m, int priority) +void Pipe::DelayedDelivery::flush() { - assert(pipe_lock.is_locked()); - in_q->enqueue(m, priority, conn_id); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } } +void *Pipe::DelayedDelivery::entry() +{ + Mutex::Locker locker(delay_lock); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl; + while (!stop_delayed_delivery) { + if (delay_queue.empty()) { + lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond.Wait(delay_lock); + continue; + } + utime_t release = delay_queue.front().first; + if (release > ceph_clock_now(pipe->msgr->cct)) { + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; + delay_cond.WaitUntil(delay_lock, release); + continue; + } + Message *m = delay_queue.front().second; + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; + return NULL; +} int Pipe::accept() { @@ -503,7 +555,11 @@ int Pipe::accept() // make existing Connection reference us existing->connection_state->reset_pipe(this); - + + // flush/queue any existing delayed messages + if (existing->delay_thread) + existing->delay_thread->flush(); + // steal incoming queue uint64_t replaced_conn_id = conn_id; conn_id = existing->conn_id; @@ -587,6 +643,9 @@ int Pipe::accept() } ldout(msgr->cct,20) << "accept done" << dendl; pipe_lock.Unlock(); + + maybe_start_delay_thread(); + return 0; // success. fail_unlocked: @@ -936,6 +995,7 @@ int Pipe::connect() ldout(msgr->cct,20) << "connect starting reader" << dendl; start_reader(); } + maybe_start_delay_thread(); delete authorizer; return 0; } @@ -1020,7 +1080,6 @@ void Pipe::discard_out_queue() out_q.clear(); } - void Pipe::fault(bool onread) { const md_config_t *conf = msgr->cct->_conf; @@ -1057,6 +1116,8 @@ void Pipe::fault(bool onread) msgr->lock.Unlock(); in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); // disconnect from Connection, and mark it failed. future messages @@ -1068,6 +1129,10 @@ void Pipe::fault(bool onread) return; } + // queue delayed items immediately + if (delay_thread) + delay_thread->flush(); + // requeue sent items requeue_sent(); @@ -1075,7 +1140,7 @@ void Pipe::fault(bool onread) ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; return; - } + } if (state != STATE_CONNECTING) { if (policy.server) { @@ -1122,6 +1187,8 @@ void Pipe::was_session_reset() ldout(msgr->cct,10) << "was_session_reset" << dendl; in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); msgr->dispatch_queue.queue_remote_reset(connection_state); @@ -1243,7 +1310,18 @@ void Pipe::reader() ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; - queue_received(m); + + if (delay_thread) { + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); + } else { + in_q->enqueue(m, m->get_priority(), conn_id); + } } else if (tag == CEPH_MSGR_TAG_CLOSE) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index cc92b2f6f8b..1bcc8263f4a 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -60,6 +60,46 @@ class DispatchQueue; } writer_thread; friend class Writer; + /** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * in_q (SimpleMessenger::dispatch_queue). + * Please note that this probably has issues with Pipe shutdown and + * replacement semantics. I've tried, but no guarantees. + */ + class DelayedDelivery: public Thread { + Pipe *pipe; + std::deque< pair<utime_t,Message*> > delay_queue; + Mutex delay_lock; + Cond delay_cond; + bool stop_delayed_delivery; + + public: + DelayedDelivery(Pipe *p) + : pipe(p), + delay_lock("Pipe::DelayedDelivery::delay_lock"), + stop_delayed_delivery(false) { } + ~DelayedDelivery() { + discard(); + } + void *entry(); + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard(); + void flush(); + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + } *delay_thread; + friend class DelayedDelivery; + public: Pipe(SimpleMessenger *r, int st, Connection *con); ~Pipe(); @@ -166,25 +206,13 @@ class DispatchQueue; void start_reader(); void start_writer(); + void maybe_start_delay_thread(); void join_reader(); // public constructors static const Pipe& Server(int s); static const Pipe& Client(const entity_addr_t& pi); - //we have two queue_received's to allow local signal delivery - // via Message * (that doesn't actually point to a Message) - void queue_received(Message *m, int priority); - - void queue_received(Message *m) { - // this is just to make sure that a changeset is working - // properly; if you start using the refcounting more and have - // multiple people hanging on to a message, ditch the assert! - assert(m->nref.read() == 1); - - queue_received(m, m->get_priority()); - } - __u32 get_out_seq() { return out_seq; } bool is_queued() { return !out_q.empty() || keepalive; } @@ -208,6 +236,10 @@ class DispatchQueue; writer_thread.join(); if (reader_thread.is_started()) reader_thread.join(); + if (delay_thread) { + delay_thread->stop(); + delay_thread->join(); + } } void stop(); |