summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage.weil@dreamhost.com>2011-11-21 13:23:59 -0800
committerSage Weil <sage.weil@dreamhost.com>2011-11-21 13:23:59 -0800
commitb47347bd7c377037f7fbc199f0c88b447c9626d1 (patch)
treef8213c789df896291d988b9184e64549cf12e40f
parent70dfe8e9a075cb003f9c9e07bc295c3c5b3e1288 (diff)
downloadceph-b47347bd7c377037f7fbc199f0c88b447c9626d1.tar.gz
osd: protect handle_osd_map requeueing with queue lock
pending_ops was protected by osd_lock, but it tracks something in the queue, which has it's own lock. Messy. Also, useless, since wait_for_no_ops had a single caller in shutdown() that op_wq.drain() can do for us. Rip it out, and track queue size under the queue lock. Fixes: #1727 Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
-rw-r--r--src/osd/OSD.cc75
-rw-r--r--src/osd/OSD.h22
2 files changed, 38 insertions, 59 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 492f7881626..aeba6b8a513 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -527,6 +527,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
+ op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
@@ -558,8 +559,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
map_in_progress_cond = new Cond();
- pending_ops = 0;
- waiting_for_no_ops = false;
}
OSD::~OSD()
@@ -817,7 +816,7 @@ int OSD::shutdown()
command_tp.stop();
// finish ops
- wait_for_no_ops();
+ op_wq.drain();
dout(10) << "no ops" << dendl;
recovery_tp.stop();
@@ -3173,20 +3172,18 @@ void OSD::handle_osd_map(MOSDMap *m)
op_wq.lock();
list<Message*> rq;
- while (!op_queue.empty()) {
- PG *pg = op_queue.back();
+ while (true) {
+ PG *pg = op_wq._dequeue();
+ if (!pg)
+ break;
pg->lock();
- op_queue.pop_back();
- pending_ops--;
- Message *mess = pg->op_queue.back();
- pg->op_queue.pop_back();
+ Message *mess = pg->op_queue.front();
+ pg->op_queue.pop_front();
pg->unlock();
pg->put();
dout(15) << " will requeue " << *mess << dendl;
- rq.push_front(mess);
+ rq.push_back(mess);
}
- assert(pending_ops == 0); // we paused the wq, and just emptied out the queue
- logger->set(l_osd_opq, pending_ops);
push_waiters(rq); // requeue under osd_lock!
op_wq.unlock();
@@ -5423,12 +5420,30 @@ void OSD::enqueue_op(PG *pg, Message *op)
// add to pg's op_queue
pg->op_queue.push_back(op);
- pending_ops++;
- logger->set(l_osd_opq, pending_ops);
op_wq.queue(pg);
}
+bool OSD::OpWQ::_enqueue(PG *pg)
+{
+ pg->get();
+ osd->op_queue.push_back(pg);
+ osd->op_queue_len++;
+ osd->logger->set(l_osd_opq, osd->op_queue_len);
+ return true;
+}
+
+PG *OSD::OpWQ::_dequeue()
+{
+ if (osd->op_queue.empty())
+ return NULL;
+ PG *pg = osd->op_queue.front();
+ osd->op_queue.pop_front();
+ osd->op_queue_len--;
+ osd->logger->set(l_osd_opq, osd->op_queue_len);
+ return pg;
+}
+
/*
* requeue ops at _front_ of queue. these are previously queued
* operations that need to get requeued ahead of anything the dispatch
@@ -5440,6 +5455,9 @@ void OSD::requeue_ops(PG *pg, list<Message*>& ls)
dout(15) << *pg << " requeue_ops " << ls << dendl;
assert(pg->is_locked());
+ // you can't call this on pg->op_queue!
+ assert(&ls != &pg->op_queue);
+
// set current queue contents aside..
list<Message*> orig_queue;
orig_queue.swap(pg->op_queue);
@@ -5476,9 +5494,7 @@ void OSD::dequeue_op(PG *pg)
op = pg->op_queue.front();
pg->op_queue.pop_front();
- dout(10) << "dequeue_op " << *op << " pg " << *pg
- << ", " << (pending_ops-1) << " more pending"
- << dendl;
+ dout(10) << "dequeue_op " << *op << " pg " << *pg << dendl;
// share map?
// do this preemptively while we hold osd_lock and pg->lock
@@ -5509,30 +5525,7 @@ void OSD::dequeue_op(PG *pg)
//scrub_wq.queue(pg);
// finish
- osd_lock.Lock();
- {
- dout(10) << "dequeue_op " << op << " finish" << dendl;
- assert(pending_ops > 0);
-
- pending_ops--;
- logger->set(l_osd_opq, pending_ops);
- if (pending_ops == 0 && waiting_for_no_ops)
- no_pending_ops.Signal();
- }
- osd_lock.Unlock();
-}
-
-void OSD::wait_for_no_ops()
-{
- if (pending_ops > 0) {
- dout(7) << "wait_for_no_ops - waiting for " << pending_ops << dendl;
- waiting_for_no_ops = true;
- while (pending_ops > 0)
- no_pending_ops.Wait(osd_lock);
- waiting_for_no_ops = false;
- assert(pending_ops == 0);
- }
- dout(7) << "wait_for_no_ops - none" << dendl;
+ dout(10) << "dequeue_op " << op << " finish" << dendl;
}
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 7d94035a717..8a5bffaaaf5 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -324,30 +324,21 @@ private:
// -- op queue --
deque<PG*> op_queue;
-
+ int op_queue_len;
+
struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
OpWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {}
- bool _enqueue(PG *pg) {
- pg->get();
- osd->op_queue.push_back(pg);
- return true;
- }
+ bool _enqueue(PG *pg);
void _dequeue(PG *pg) {
assert(0);
}
bool _empty() {
return osd->op_queue.empty();
}
- PG *_dequeue() {
- if (osd->op_queue.empty())
- return NULL;
- PG *pg = osd->op_queue.front();
- osd->op_queue.pop_front();
- return pg;
- }
+ PG *_dequeue();
void _process(PG *pg) {
osd->dequeue_op(pg);
}
@@ -356,11 +347,6 @@ private:
}
} op_wq;
- int pending_ops;
- bool waiting_for_no_ops;
- Cond no_pending_ops;
-
- void wait_for_no_ops();
void enqueue_op(PG *pg, Message *op);
void requeue_ops(PG *pg, list<Message*>& ls);
void dequeue_op(PG *pg);