diff options
author | Sage Weil <sage@inktank.com> | 2013-06-11 16:44:05 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-06-13 10:52:18 -0700 |
commit | de64bc50f27e463ebd3aec020d93f88174eedd28 (patch) | |
tree | f5569ab7ec09587b8159f3e9a464f7a3164e869c | |
parent | 26e16c008d86243e0f48687c5bede9f816f3f2b9 (diff) | |
download | ceph-de64bc50f27e463ebd3aec020d93f88174eedd28.tar.gz |
msgr: queue reset exactly once on any connection
Use the atomic pipe link removal as a signal that we are the one failing
the con and use that to queue the reset event.
This fixes the case where we have an open, the session gets set up via the
handle_accept callback, and then race with another connection and go into
wait + close, or just close. In that case, fault() needs to queue a reset
event to match the accept.
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/msg/Message.h | 4 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 7 |
2 files changed, 7 insertions, 4 deletions
diff --git a/src/msg/Message.h b/src/msg/Message.h index 0089751c7ed..2e3d59b886d 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -231,13 +231,15 @@ public: } return !failed; } - void clear_pipe(RefCountedObject *old_p) { + bool clear_pipe(RefCountedObject *old_p) { if (old_p == pipe) { Mutex::Locker l(lock); pipe->put(); pipe = NULL; failed = true; + return true; } + return false; } void reset_pipe(RefCountedObject *p) { Mutex::Locker l(lock); diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index bc980369a64..4eb3d266937 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -1171,6 +1171,8 @@ void Pipe::fault(bool onread) if (state == STATE_CLOSED || state == STATE_CLOSING) { ldout(msgr->cct,10) << "fault already closed|closing" << dendl; + if (connection_state->clear_pipe(this)) + msgr->dispatch_queue.queue_reset(connection_state.get()); return; } @@ -1206,9 +1208,8 @@ void Pipe::fault(bool onread) // disconnect from Connection, and mark it failed. future messages // will be dropped. assert(connection_state); - connection_state->clear_pipe(this); - - msgr->dispatch_queue.queue_reset(connection_state.get()); + if (connection_state->clear_pipe(this)) + msgr->dispatch_queue.queue_reset(connection_state.get()); return; } |