summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-12-04 14:52:22 -0800
committerSage Weil <sage@inktank.com>2012-12-04 14:52:22 -0800
commitf3bd3564fa26efa5c5d6664464d247fe974db902 (patch)
treebd677641b5967d642bf76117747e75295770743b
parent727c37a7122da3b647e79cede7fc5ba4bcd377d7 (diff)
parent8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3 (diff)
downloadceph-f3bd3564fa26efa5c5d6664464d247fe974db902.tar.gz
Merge branch 'wip-msgr-delay-queue' into next
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/msg/Pipe.cc92
-rw-r--r--src/msg/Pipe.h58
3 files changed, 133 insertions, 20 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 0cdb16c3ebf..eedb6d6a2f5 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
+OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
+OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
+OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index e273ad49f2b..1ebf2854473 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -54,6 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
+ delay_thread(NULL),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
sd(-1), port(0),
@@ -94,6 +95,7 @@ Pipe::~Pipe()
if (connection_state)
connection_state->put();
delete session_security;
+ delete delay_thread;
}
void Pipe::handle_ack(uint64_t seq)
@@ -127,6 +129,16 @@ void Pipe::start_reader()
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
+void Pipe::maybe_start_delay_thread()
+{
+ if (!delay_thread &&
+ msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+ delay_thread = new DelayedDelivery(this);
+ delay_thread->create();
+ }
+}
+
void Pipe::start_writer()
{
assert(pipe_lock.is_locked());
@@ -146,14 +158,54 @@ void Pipe::join_reader()
reader_needs_join = false;
}
+void Pipe::DelayedDelivery::discard()
+{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ pipe->msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+}
-void Pipe::queue_received(Message *m, int priority)
+void Pipe::DelayedDelivery::flush()
{
- assert(pipe_lock.is_locked());
- in_q->enqueue(m, priority, conn_id);
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
}
+void *Pipe::DelayedDelivery::entry()
+{
+ Mutex::Locker locker(delay_lock);
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl;
+ while (!stop_delayed_delivery) {
+ if (delay_queue.empty()) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond.Wait(delay_lock);
+ continue;
+ }
+ utime_t release = delay_queue.front().first;
+ if (release > ceph_clock_now(pipe->msgr->cct)) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
+ delay_cond.WaitUntil(delay_lock, release);
+ continue;
+ }
+ Message *m = delay_queue.front().second;
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
+ return NULL;
+}
int Pipe::accept()
{
@@ -503,7 +555,11 @@ int Pipe::accept()
// make existing Connection reference us
existing->connection_state->reset_pipe(this);
-
+
+ // flush/queue any existing delayed messages
+ if (existing->delay_thread)
+ existing->delay_thread->flush();
+
// steal incoming queue
uint64_t replaced_conn_id = conn_id;
conn_id = existing->conn_id;
@@ -587,6 +643,9 @@ int Pipe::accept()
}
ldout(msgr->cct,20) << "accept done" << dendl;
pipe_lock.Unlock();
+
+ maybe_start_delay_thread();
+
return 0; // success.
fail_unlocked:
@@ -936,6 +995,7 @@ int Pipe::connect()
ldout(msgr->cct,20) << "connect starting reader" << dendl;
start_reader();
}
+ maybe_start_delay_thread();
delete authorizer;
return 0;
}
@@ -1020,7 +1080,6 @@ void Pipe::discard_out_queue()
out_q.clear();
}
-
void Pipe::fault(bool onread)
{
const md_config_t *conf = msgr->cct->_conf;
@@ -1057,6 +1116,8 @@ void Pipe::fault(bool onread)
msgr->lock.Unlock();
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
// disconnect from Connection, and mark it failed. future messages
@@ -1068,6 +1129,10 @@ void Pipe::fault(bool onread)
return;
}
+ // queue delayed items immediately
+ if (delay_thread)
+ delay_thread->flush();
+
// requeue sent items
requeue_sent();
@@ -1075,7 +1140,7 @@ void Pipe::fault(bool onread)
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
- }
+ }
if (state != STATE_CONNECTING) {
if (policy.server) {
@@ -1122,6 +1187,8 @@ void Pipe::was_session_reset()
ldout(msgr->cct,10) << "was_session_reset" << dendl;
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
msgr->dispatch_queue.queue_remote_reset(connection_state);
@@ -1243,7 +1310,18 @@ void Pipe::reader()
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
- queue_received(m);
+
+ if (delay_thread) {
+ utime_t release;
+ if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = m->get_recv_stamp();
+ release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+ }
+ delay_thread->queue(release, m);
+ } else {
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ }
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index cc92b2f6f8b..1bcc8263f4a 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -60,6 +60,46 @@ class DispatchQueue;
} writer_thread;
friend class Writer;
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+ class DelayedDelivery: public Thread {
+ Pipe *pipe;
+ std::deque< pair<utime_t,Message*> > delay_queue;
+ Mutex delay_lock;
+ Cond delay_cond;
+ bool stop_delayed_delivery;
+
+ public:
+ DelayedDelivery(Pipe *p)
+ : pipe(p),
+ delay_lock("Pipe::DelayedDelivery::delay_lock"),
+ stop_delayed_delivery(false) { }
+ ~DelayedDelivery() {
+ discard();
+ }
+ void *entry();
+ void queue(utime_t release, Message *m) {
+ Mutex::Locker l(delay_lock);
+ delay_queue.push_back(make_pair(release, m));
+ delay_cond.Signal();
+ }
+ void discard();
+ void flush();
+ void stop() {
+ delay_lock.Lock();
+ stop_delayed_delivery = true;
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ }
+ } *delay_thread;
+ friend class DelayedDelivery;
+
public:
Pipe(SimpleMessenger *r, int st, Connection *con);
~Pipe();
@@ -166,25 +206,13 @@ class DispatchQueue;
void start_reader();
void start_writer();
+ void maybe_start_delay_thread();
void join_reader();
// public constructors
static const Pipe& Server(int s);
static const Pipe& Client(const entity_addr_t& pi);
- //we have two queue_received's to allow local signal delivery
- // via Message * (that doesn't actually point to a Message)
- void queue_received(Message *m, int priority);
-
- void queue_received(Message *m) {
- // this is just to make sure that a changeset is working
- // properly; if you start using the refcounting more and have
- // multiple people hanging on to a message, ditch the assert!
- assert(m->nref.read() == 1);
-
- queue_received(m, m->get_priority());
- }
-
__u32 get_out_seq() { return out_seq; }
bool is_queued() { return !out_q.empty() || keepalive; }
@@ -208,6 +236,10 @@ class DispatchQueue;
writer_thread.join();
if (reader_thread.is_started())
reader_thread.join();
+ if (delay_thread) {
+ delay_thread->stop();
+ delay_thread->join();
+ }
}
void stop();