summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2012-11-21 10:54:06 -0800
committerSage Weil <sage@inktank.com>2012-11-29 16:09:44 -0800
commit90f66980bfb1f2541dcb11be2c358a9832a291b1 (patch)
tree82b7372bf5a5c1e354e7d7f73aa3861609aca912
parent77711ddee37de154c0d5d452c4f84dae36eb3e3a (diff)
downloadceph-90f66980bfb1f2541dcb11be2c358a9832a291b1.tar.gz
messenger: add the shell of a system to delay incoming Message delivery
When ms_inject_delay_type matches that of the incoming Connection, the Pipe sets up a delay queue that it shuttles all Messages through. This lets us check cleanup and some notification code but doesn't actually generate any delays. Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/msg/Pipe.cc56
-rw-r--r--src/msg/Pipe.h32
3 files changed, 90 insertions, 1 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 8699d789164..a067934c2f1 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -106,6 +106,9 @@ OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
+OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
+OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
+OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index e273ad49f2b..695be3b3e8e 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -54,6 +54,8 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
+ dispatch_thread(NULL), delay_queue(NULL),
+ delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
sd(-1), port(0),
@@ -85,6 +87,15 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
+
+ if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
+ != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+ dispatch_thread = new DelayedDelivery(this);
+ delay_queue = new std::deque< Message * >();
+ delay_lock = new Mutex("delay_lock");
+ delay_cond = new Cond();
+ }
}
Pipe::~Pipe()
@@ -94,6 +105,14 @@ Pipe::~Pipe()
if (connection_state)
connection_state->put();
delete session_security;
+ if (dispatch_thread) {
+ delete dispatch_thread;
+ assert(delay_queue->empty());
+ delete delay_queue;
+ assert(!delay_lock->is_locked());
+ delete delay_lock;
+ delete delay_cond;
+ }
}
void Pipe::handle_ack(uint64_t seq)
@@ -125,6 +144,13 @@ void Pipe::start_reader()
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+ if (dispatch_thread && stop_delayed_delivery) {
+ lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
+ delay_lock->Lock();
+ stop_delayed_delivery = false;
+ dispatch_thread->create();
+ delay_lock->Unlock();
+ }
}
void Pipe::start_writer()
@@ -150,10 +176,32 @@ void Pipe::join_reader()
void Pipe::queue_received(Message *m, int priority)
{
assert(pipe_lock.is_locked());
+ if (delay_queue) {
+ lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
+ Mutex::Locker locker(*delay_lock);
+ delay_queue->push_back(m);
+ delay_cond->Signal();
+ return;
+ }
in_q->enqueue(m, priority, conn_id);
}
-
+void Pipe::delayed_delivery() {
+ Mutex::Locker locker(*delay_lock);
+ if (delay_queue->empty())
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond->Wait(*delay_lock);
+ while (!stop_delayed_delivery) {
+ Message *m = delay_queue->front();
+ lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl;
+ delay_queue->pop_front();
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ if (delay_queue->empty()) {
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
+ delay_cond->Wait(*delay_lock);
+ }
+ }
+}
int Pipe::accept()
{
@@ -1141,6 +1189,12 @@ void Pipe::stop()
state = STATE_CLOSED;
cond.Signal();
shutdown_socket();
+ if (dispatch_thread) {
+ lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl;
+ Mutex::Locker locker(*delay_lock);
+ stop_delayed_delivery = true;
+ delay_cond->Signal();
+ }
}
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index cc92b2f6f8b..648e2e87c9f 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -60,6 +60,29 @@ class DispatchQueue;
} writer_thread;
friend class Writer;
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+ class DelayedDelivery: public Thread {
+ Pipe *pipe;
+ public:
+ DelayedDelivery(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->delayed_delivery(); return 0; }
+ };
+ friend class DelayedDelivery;
+
+ DelayedDelivery *dispatch_thread;
+ // TODO: clean up the delay_queue better on shutdown
+ std::deque< Message * > *delay_queue;
+ Mutex *delay_lock;
+ Cond *delay_cond;
+ bool stop_delayed_delivery;
+
public:
Pipe(SimpleMessenger *r, int st, Connection *con);
~Pipe();
@@ -185,6 +208,8 @@ class DispatchQueue;
queue_received(m, m->get_priority());
}
+ void delayed_delivery();
+
__u32 get_out_seq() { return out_seq; }
bool is_queued() { return !out_q.empty() || keepalive; }
@@ -208,6 +233,13 @@ class DispatchQueue;
writer_thread.join();
if (reader_thread.is_started())
reader_thread.join();
+ if (dispatch_thread && dispatch_thread->is_started()) {
+ delay_lock->Lock();
+ stop_delayed_delivery = true;
+ delay_cond->Signal();
+ delay_lock->Unlock();
+ dispatch_thread->join();
+ }
}
void stop();