diff options
author | Samuel Just <sam.just@inktank.com> | 2012-09-19 20:15:04 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2012-12-06 22:51:56 -0800 |
commit | 5f8a3634c49d96e7d38039a294dc291948ce60f4 (patch) | |
tree | 503f5c451ef046f39f9d15a227dabf071f2970d1 | |
parent | 9981bee565dae5921f45df80fe2744dc7fd50db2 (diff) | |
download | ceph-5f8a3634c49d96e7d38039a294dc291948ce60f4.tar.gz |
PG: split ops for child objects into child
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/OSD.cc | 11 | ||||
-rw-r--r-- | src/osd/OSD.h | 23 | ||||
-rw-r--r-- | src/osd/PG.cc | 71 | ||||
-rw-r--r-- | src/osd/PG.h | 4 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 4 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 1 |
6 files changed, 111 insertions, 3 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ec3035c0a66..7e8506a5b6c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5994,7 +5994,10 @@ void OSD::OpWQ::_process(PGRef pg) OpRequestRef op; { Mutex::Locker l(qlock); - assert(pg_for_processing.count(&*pg)); + if (!pg_for_processing.count(&*pg)) { + pg->unlock(); + return; + } assert(pg_for_processing[&*pg].size()); op = pg_for_processing[&*pg].front(); pg_for_processing[&*pg].pop_front(); @@ -6005,6 +6008,12 @@ void OSD::OpWQ::_process(PGRef pg) pg->unlock(); } + +void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) +{ + osd->op_wq.dequeue(pg, dequeued); +} + /* * NOTE: dequeue called in worker thread, with pg lock */ diff --git a/src/osd/OSD.h b/src/osd/OSD.h index a87431a69d1..7afa1f9ac21 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -184,6 +184,8 @@ public: ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq; ClassHandler *&class_handler; + void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued); + // -- superblock -- Mutex publish_lock, pre_publish_lock; OSDSuperblock superblock; @@ -644,9 +646,26 @@ private: return op.first == pg; } }; - void dequeue(PG *pg) { + void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) { lock(); - pqueue.remove_by_filter(Pred(pg)); + if (!dequeued) { + pqueue.remove_by_filter(Pred(pg)); + pg_for_processing.erase(pg); + } else { + list<pair<PGRef, OpRequestRef> > _dequeued; + pqueue.remove_by_filter(Pred(pg), &_dequeued); + for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin(); + i != _dequeued.end(); + ++i) { + dequeued->push_back(i->second); + } + if (pg_for_processing.count(pg)) { + dequeued->splice( + dequeued->begin(), + pg_for_processing[pg]); + pg_for_processing.erase(pg); + } + } unlock(); } bool _empty() { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 2323b988f70..36deae95fa4 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1948,6 +1948,56 @@ void PG::IndexedLog::split_into( index(); } +static void split_list( + list<OpRequestRef> *from, + list<OpRequestRef> *to, + unsigned match, + unsigned bits) +{ + for (list<OpRequestRef>::iterator i = from->begin(); + i != from->end(); + ) { + if (PG::split_request(*i, match, bits)) { + to->push_back(*i); + from->erase(i++); + } else { + ++i; + } + } +} + +static void split_replay_queue( + map<eversion_t, OpRequestRef> *from, + map<eversion_t, OpRequestRef> *to, + unsigned match, + unsigned bits) +{ + for (map<eversion_t, OpRequestRef>::iterator i = from->begin(); + i != from->end(); + ) { + if (PG::split_request(i->second, match, bits)) { + to->insert(*i); + from->erase(i++); + } else { + ++i; + } + } +} + +void PG::split_ops(PG *child, unsigned split_bits) { + unsigned match = child->info.pgid.m_seed; + assert(waiting_for_map.empty()); + assert(waiting_for_all_missing.empty()); + assert(waiting_for_missing_object.empty()); + assert(waiting_for_degraded_object.empty()); + assert(waiting_for_ack.empty()); + assert(waiting_for_ondisk.empty()); + split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits); + + osd->dequeue_pg(this, &waiting_for_active); + split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits); +} + void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) { child->osdmap_ref = osdmap_ref; @@ -1983,6 +2033,9 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) // History child->past_intervals = past_intervals; + + split_ops(child, split_bits); + _split_into(child_pgid, child, split_bits); } void PG::defer_recovery() @@ -4791,6 +4844,24 @@ bool PG::can_discard_request(OpRequestRef op) return true; } +bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits) +{ + unsigned mask = ~((~0)<<bits); + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match; + case MSG_OSD_SUBOP: + return false; + case MSG_OSD_SUBOPREPLY: + return false; + case MSG_OSD_PG_SCAN: + return false; + case MSG_OSD_PG_BACKFILL: + return false; + } + return false; +} + bool PG::must_delay_request(OpRequestRef op) { switch (op->request->get_type()) { diff --git a/src/osd/PG.h b/src/osd/PG.h index cc2e112f48d..f0e57eb120f 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -658,6 +658,7 @@ protected: waiting_for_degraded_object; map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk; map<eversion_t,OpRequestRef> replay_queue; + void split_ops(PG *child, unsigned split_bits); void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m); void requeue_ops(list<OpRequestRef> &l); @@ -795,6 +796,7 @@ public: void finish_recovery_op(const hobject_t& soid, bool dequeue=false); void split_into(pg_t child_pgid, PG *child, unsigned split_bits); + virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0; loff_t get_log_write_pos() { return 0; @@ -1748,6 +1750,8 @@ public: bool must_delay_request(OpRequestRef op); + static bool split_request(OpRequestRef op, unsigned match, unsigned bits); + bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); bool old_peering_evt(CephPeeringEvtRef evt) { return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested()); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 76ad5089493..d3ea2a51935 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6063,6 +6063,10 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs) unlock(); } +void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits) +{ + assert(repop_queue.empty()); +} /* * pg status change notification diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5abc8e53657..fbc1b65571c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -930,6 +930,7 @@ protected: virtual void _scrub_finish(); object_stat_collection_t scrub_cstat; + virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits); void apply_and_flush_repops(bool requeue); void calc_trim_to(); |