diff options
-rw-r--r-- | src/msg/Pipe.cc | 27 | ||||
-rw-r--r-- | src/msg/Pipe.h | 1 |
2 files changed, 23 insertions, 5 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index e49658412c0..609df8e7dd2 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -54,7 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) : reader_thread(this), writer_thread(this), - dispatch_thread(NULL), delay_queue(NULL), + dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL), delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true), msgr(r), conn_id(r->dispatch_queue.get_id()), @@ -100,6 +100,8 @@ Pipe::~Pipe() delete dispatch_thread; assert(delay_queue->empty()); delete delay_queue; + assert(delay_until->empty()); + delete delay_until; assert(!delay_lock->is_locked()); delete delay_lock; delete delay_cond; @@ -141,6 +143,7 @@ void Pipe::start_reader() lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; dispatch_thread = new DelayedDelivery(this); delay_queue = new std::deque< Message * >(); + delay_until = new std::deque< utime_t>(); delay_lock = new Mutex("delay_lock"); delay_cond = new Cond(); } else @@ -184,6 +187,9 @@ void Pipe::queue_received(Message *m, int priority) lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl; Mutex::Locker locker(*delay_lock); delay_queue->push_back(m); + utime_t delay = ceph_clock_now(msgr->cct); + delay += 1.0; + delay_until->push_back(delay); delay_cond->Signal(); return; } @@ -192,13 +198,23 @@ void Pipe::queue_received(Message *m, int priority) void Pipe::delayed_delivery() { Mutex::Locker locker(*delay_lock); - if (delay_queue->empty()) - lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl; - delay_cond->Wait(*delay_lock); while (!stop_delayed_delivery) { + if (delay_queue->empty()) { + lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond->Wait(*delay_lock); + continue; + } + if (delay_until->front() > ceph_clock_now(msgr->cct)) { + lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front() + << " delay passes" << dendl; + delay_cond->WaitUntil(*delay_lock, delay_until->front()); + continue; + } Message *m = delay_queue->front(); - lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl; + lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until " + << delay_until->front() << " has passed" << dendl; delay_queue->pop_front(); + delay_until->pop_front(); in_q->enqueue(m, m->get_priority(), conn_id); if (delay_queue->empty()) { lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl; @@ -1200,6 +1216,7 @@ void Pipe::stop() while (!delay_queue->empty()) { delay_queue->front()->put(); delay_queue->pop_front(); + delay_until->pop_front(); } delay_cond->Signal(); } diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 648e2e87c9f..58df8e76d24 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -79,6 +79,7 @@ class DispatchQueue; DelayedDelivery *dispatch_thread; // TODO: clean up the delay_queue better on shutdown std::deque< Message * > *delay_queue; + std::deque< utime_t > *delay_until; Mutex *delay_lock; Cond *delay_cond; bool stop_delayed_delivery; |