summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-06-11 16:44:05 -0700
committerSage Weil <sage@inktank.com>2013-06-13 10:52:18 -0700
commitde64bc50f27e463ebd3aec020d93f88174eedd28 (patch)
treef5569ab7ec09587b8159f3e9a464f7a3164e869c
parent26e16c008d86243e0f48687c5bede9f816f3f2b9 (diff)
downloadceph-de64bc50f27e463ebd3aec020d93f88174eedd28.tar.gz
msgr: queue reset exactly once on any connection
Use the atomic pipe link removal as a signal that we are the one failing the con and use that to queue the reset event. This fixes the case where we have an open, the session gets set up via the handle_accept callback, and then race with another connection and go into wait + close, or just close. In that case, fault() needs to queue a reset event to match the accept. Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/msg/Message.h4
-rw-r--r--src/msg/Pipe.cc7
2 files changed, 7 insertions, 4 deletions
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 0089751c7ed..2e3d59b886d 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -231,13 +231,15 @@ public:
}
return !failed;
}
- void clear_pipe(RefCountedObject *old_p) {
+ bool clear_pipe(RefCountedObject *old_p) {
if (old_p == pipe) {
Mutex::Locker l(lock);
pipe->put();
pipe = NULL;
failed = true;
+ return true;
}
+ return false;
}
void reset_pipe(RefCountedObject *p) {
Mutex::Locker l(lock);
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index bc980369a64..4eb3d266937 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -1171,6 +1171,8 @@ void Pipe::fault(bool onread)
if (state == STATE_CLOSED ||
state == STATE_CLOSING) {
ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
+ if (connection_state->clear_pipe(this))
+ msgr->dispatch_queue.queue_reset(connection_state.get());
return;
}
@@ -1206,9 +1208,8 @@ void Pipe::fault(bool onread)
// disconnect from Connection, and mark it failed. future messages
// will be dropped.
assert(connection_state);
- connection_state->clear_pipe(this);
-
- msgr->dispatch_queue.queue_reset(connection_state.get());
+ if (connection_state->clear_pipe(this))
+ msgr->dispatch_queue.queue_reset(connection_state.get());
return;
}