diff options
author | Sage Weil <sage.weil@dreamhost.com> | 2011-11-21 13:23:59 -0800 |
---|---|---|
committer | Sage Weil <sage.weil@dreamhost.com> | 2011-11-21 13:23:59 -0800 |
commit | b47347bd7c377037f7fbc199f0c88b447c9626d1 (patch) | |
tree | f8213c789df896291d988b9184e64549cf12e40f | |
parent | 70dfe8e9a075cb003f9c9e07bc295c3c5b3e1288 (diff) | |
download | ceph-b47347bd7c377037f7fbc199f0c88b447c9626d1.tar.gz |
osd: protect handle_osd_map requeueing with queue lock
pending_ops was protected by osd_lock, but it tracks something in the
queue, which has it's own lock. Messy. Also, useless, since
wait_for_no_ops had a single caller in shutdown() that op_wq.drain() can
do for us.
Rip it out, and track queue size under the queue lock.
Fixes: #1727
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
-rw-r--r-- | src/osd/OSD.cc | 75 | ||||
-rw-r--r-- | src/osd/OSD.h | 22 |
2 files changed, 38 insertions, 59 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 492f7881626..aeba6b8a513 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -527,6 +527,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, heartbeat_dispatcher(this), stat_lock("OSD::stat_lock"), finished_lock("OSD::finished_lock"), + op_queue_len(0), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), @@ -558,8 +559,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, map_in_progress_cond = new Cond(); - pending_ops = 0; - waiting_for_no_ops = false; } OSD::~OSD() @@ -817,7 +816,7 @@ int OSD::shutdown() command_tp.stop(); // finish ops - wait_for_no_ops(); + op_wq.drain(); dout(10) << "no ops" << dendl; recovery_tp.stop(); @@ -3173,20 +3172,18 @@ void OSD::handle_osd_map(MOSDMap *m) op_wq.lock(); list<Message*> rq; - while (!op_queue.empty()) { - PG *pg = op_queue.back(); + while (true) { + PG *pg = op_wq._dequeue(); + if (!pg) + break; pg->lock(); - op_queue.pop_back(); - pending_ops--; - Message *mess = pg->op_queue.back(); - pg->op_queue.pop_back(); + Message *mess = pg->op_queue.front(); + pg->op_queue.pop_front(); pg->unlock(); pg->put(); dout(15) << " will requeue " << *mess << dendl; - rq.push_front(mess); + rq.push_back(mess); } - assert(pending_ops == 0); // we paused the wq, and just emptied out the queue - logger->set(l_osd_opq, pending_ops); push_waiters(rq); // requeue under osd_lock! op_wq.unlock(); @@ -5423,12 +5420,30 @@ void OSD::enqueue_op(PG *pg, Message *op) // add to pg's op_queue pg->op_queue.push_back(op); - pending_ops++; - logger->set(l_osd_opq, pending_ops); op_wq.queue(pg); } +bool OSD::OpWQ::_enqueue(PG *pg) +{ + pg->get(); + osd->op_queue.push_back(pg); + osd->op_queue_len++; + osd->logger->set(l_osd_opq, osd->op_queue_len); + return true; +} + +PG *OSD::OpWQ::_dequeue() +{ + if (osd->op_queue.empty()) + return NULL; + PG *pg = osd->op_queue.front(); + osd->op_queue.pop_front(); + osd->op_queue_len--; + osd->logger->set(l_osd_opq, osd->op_queue_len); + return pg; +} + /* * requeue ops at _front_ of queue. these are previously queued * operations that need to get requeued ahead of anything the dispatch @@ -5440,6 +5455,9 @@ void OSD::requeue_ops(PG *pg, list<Message*>& ls) dout(15) << *pg << " requeue_ops " << ls << dendl; assert(pg->is_locked()); + // you can't call this on pg->op_queue! + assert(&ls != &pg->op_queue); + // set current queue contents aside.. list<Message*> orig_queue; orig_queue.swap(pg->op_queue); @@ -5476,9 +5494,7 @@ void OSD::dequeue_op(PG *pg) op = pg->op_queue.front(); pg->op_queue.pop_front(); - dout(10) << "dequeue_op " << *op << " pg " << *pg - << ", " << (pending_ops-1) << " more pending" - << dendl; + dout(10) << "dequeue_op " << *op << " pg " << *pg << dendl; // share map? // do this preemptively while we hold osd_lock and pg->lock @@ -5509,30 +5525,7 @@ void OSD::dequeue_op(PG *pg) //scrub_wq.queue(pg); // finish - osd_lock.Lock(); - { - dout(10) << "dequeue_op " << op << " finish" << dendl; - assert(pending_ops > 0); - - pending_ops--; - logger->set(l_osd_opq, pending_ops); - if (pending_ops == 0 && waiting_for_no_ops) - no_pending_ops.Signal(); - } - osd_lock.Unlock(); -} - -void OSD::wait_for_no_ops() -{ - if (pending_ops > 0) { - dout(7) << "wait_for_no_ops - waiting for " << pending_ops << dendl; - waiting_for_no_ops = true; - while (pending_ops > 0) - no_pending_ops.Wait(osd_lock); - waiting_for_no_ops = false; - assert(pending_ops == 0); - } - dout(7) << "wait_for_no_ops - none" << dendl; + dout(10) << "dequeue_op " << op << " finish" << dendl; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7d94035a717..8a5bffaaaf5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -324,30 +324,21 @@ private: // -- op queue -- deque<PG*> op_queue; - + int op_queue_len; + struct OpWQ : public ThreadPool::WorkQueue<PG> { OSD *osd; OpWQ(OSD *o, time_t ti, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {} - bool _enqueue(PG *pg) { - pg->get(); - osd->op_queue.push_back(pg); - return true; - } + bool _enqueue(PG *pg); void _dequeue(PG *pg) { assert(0); } bool _empty() { return osd->op_queue.empty(); } - PG *_dequeue() { - if (osd->op_queue.empty()) - return NULL; - PG *pg = osd->op_queue.front(); - osd->op_queue.pop_front(); - return pg; - } + PG *_dequeue(); void _process(PG *pg) { osd->dequeue_op(pg); } @@ -356,11 +347,6 @@ private: } } op_wq; - int pending_ops; - bool waiting_for_no_ops; - Cond no_pending_ops; - - void wait_for_no_ops(); void enqueue_op(PG *pg, Message *op); void requeue_ops(PG *pg, list<Message*>& ls); void dequeue_op(PG *pg); |