summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2012-11-27 11:02:07 -0800
committerSage Weil <sage@inktank.com>2012-11-29 16:09:44 -0800
commit0e92f892047b907cb05ebfab11e121b21fa00813 (patch)
tree7d8a691bf3ade06367790a17ccec97be1a16121b
parent90f66980bfb1f2541dcb11be2c358a9832a291b1 (diff)
downloadceph-0e92f892047b907cb05ebfab11e121b21fa00813.tar.gz
msgr: move the delay queue initialization into start_reader
The Pipe doesn't know the peer type in the constructor. It doesn't always know in start_reader either, so this needs more work, but at least it knows more frequently than it did. Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/msg/Pipe.cc22
1 files changed, 13 insertions, 9 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 695be3b3e8e..104208c4379 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -87,15 +87,6 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
-
- if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
- != string::npos) {
- lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
- dispatch_thread = new DelayedDelivery(this);
- delay_queue = new std::deque< Message * >();
- delay_lock = new Mutex("delay_lock");
- delay_cond = new Cond();
- }
}
Pipe::~Pipe()
@@ -144,6 +135,19 @@ void Pipe::start_reader()
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+ if (!dispatch_thread &&
+ msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
+ != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+ dispatch_thread = new DelayedDelivery(this);
+ delay_queue = new std::deque< Message * >();
+ delay_lock = new Mutex("delay_lock");
+ delay_cond = new Cond();
+ } else
+ lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type)
+ << "; NOT injecting delays because it does not match "
+ << msgr->cct->_conf->ms_inject_delay_type << dendl;
+
if (dispatch_thread && stop_delayed_delivery) {
lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
delay_lock->Lock();