summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-09-19 20:15:04 -0700
committerSamuel Just <sam.just@inktank.com>2012-12-06 22:51:56 -0800
commit5f8a3634c49d96e7d38039a294dc291948ce60f4 (patch)
tree503f5c451ef046f39f9d15a227dabf071f2970d1
parent9981bee565dae5921f45df80fe2744dc7fd50db2 (diff)
downloadceph-5f8a3634c49d96e7d38039a294dc291948ce60f4.tar.gz
PG: split ops for child objects into child
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/OSD.cc11
-rw-r--r--src/osd/OSD.h23
-rw-r--r--src/osd/PG.cc71
-rw-r--r--src/osd/PG.h4
-rw-r--r--src/osd/ReplicatedPG.cc4
-rw-r--r--src/osd/ReplicatedPG.h1
6 files changed, 111 insertions, 3 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index ec3035c0a66..7e8506a5b6c 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -5994,7 +5994,10 @@ void OSD::OpWQ::_process(PGRef pg)
OpRequestRef op;
{
Mutex::Locker l(qlock);
- assert(pg_for_processing.count(&*pg));
+ if (!pg_for_processing.count(&*pg)) {
+ pg->unlock();
+ return;
+ }
assert(pg_for_processing[&*pg].size());
op = pg_for_processing[&*pg].front();
pg_for_processing[&*pg].pop_front();
@@ -6005,6 +6008,12 @@ void OSD::OpWQ::_process(PGRef pg)
pg->unlock();
}
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+ osd->op_wq.dequeue(pg, dequeued);
+}
+
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index a87431a69d1..7afa1f9ac21 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -184,6 +184,8 @@ public:
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
ClassHandler *&class_handler;
+ void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+
// -- superblock --
Mutex publish_lock, pre_publish_lock;
OSDSuperblock superblock;
@@ -644,9 +646,26 @@ private:
return op.first == pg;
}
};
- void dequeue(PG *pg) {
+ void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
lock();
- pqueue.remove_by_filter(Pred(pg));
+ if (!dequeued) {
+ pqueue.remove_by_filter(Pred(pg));
+ pg_for_processing.erase(pg);
+ } else {
+ list<pair<PGRef, OpRequestRef> > _dequeued;
+ pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+ i != _dequeued.end();
+ ++i) {
+ dequeued->push_back(i->second);
+ }
+ if (pg_for_processing.count(pg)) {
+ dequeued->splice(
+ dequeued->begin(),
+ pg_for_processing[pg]);
+ pg_for_processing.erase(pg);
+ }
+ }
unlock();
}
bool _empty() {
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 2323b988f70..36deae95fa4 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1948,6 +1948,56 @@ void PG::IndexedLog::split_into(
index();
}
+static void split_list(
+ list<OpRequestRef> *from,
+ list<OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (list<OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(*i, match, bits)) {
+ to->push_back(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+static void split_replay_queue(
+ map<eversion_t, OpRequestRef> *from,
+ map<eversion_t, OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(i->second, match, bits)) {
+ to->insert(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+void PG::split_ops(PG *child, unsigned split_bits) {
+ unsigned match = child->info.pgid.m_seed;
+ assert(waiting_for_map.empty());
+ assert(waiting_for_all_missing.empty());
+ assert(waiting_for_missing_object.empty());
+ assert(waiting_for_degraded_object.empty());
+ assert(waiting_for_ack.empty());
+ assert(waiting_for_ondisk.empty());
+ split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
+
+ osd->dequeue_pg(this, &waiting_for_active);
+ split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits);
+}
+
void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
{
child->osdmap_ref = osdmap_ref;
@@ -1983,6 +2033,9 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
// History
child->past_intervals = past_intervals;
+
+ split_ops(child, split_bits);
+ _split_into(child_pgid, child, split_bits);
}
void PG::defer_recovery()
@@ -4791,6 +4844,24 @@ bool PG::can_discard_request(OpRequestRef op)
return true;
}
+bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
+{
+ unsigned mask = ~((~0)<<bits);
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
+ case MSG_OSD_SUBOP:
+ return false;
+ case MSG_OSD_SUBOPREPLY:
+ return false;
+ case MSG_OSD_PG_SCAN:
+ return false;
+ case MSG_OSD_PG_BACKFILL:
+ return false;
+ }
+ return false;
+}
+
bool PG::must_delay_request(OpRequestRef op)
{
switch (op->request->get_type()) {
diff --git a/src/osd/PG.h b/src/osd/PG.h
index cc2e112f48d..f0e57eb120f 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -658,6 +658,7 @@ protected:
waiting_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
+ void split_ops(PG *child, unsigned split_bits);
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
void requeue_ops(list<OpRequestRef> &l);
@@ -795,6 +796,7 @@ public:
void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
loff_t get_log_write_pos() {
return 0;
@@ -1748,6 +1750,8 @@ public:
bool must_delay_request(OpRequestRef op);
+ static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
+
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 76ad5089493..d3ea2a51935 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -6063,6 +6063,10 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
unlock();
}
+void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+ assert(repop_queue.empty());
+}
/*
* pg status change notification
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 5abc8e53657..fbc1b65571c 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -930,6 +930,7 @@ protected:
virtual void _scrub_finish();
object_stat_collection_t scrub_cstat;
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits);
void apply_and_flush_repops(bool requeue);
void calc_trim_to();