diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-12 17:27:34 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-12 17:27:34 +0000 |
commit | 7e4a33708b1c3b05c4282136b4eb81113628e5a8 (patch) | |
tree | 1de5d191c2bc0c2f8d111944b3d89a1bcb250a69 | |
parent | 115b951b9cc718fd40aa560e49319f57c762f05d (diff) | |
download | rabbitmq-server-git-stream-coordinator-delete-stream-fix.tar.gz |
Stream coordinator: reset reply_to for delete_stream commandstream-coordinator-delete-stream-fix
So that a reply is sent to the caller immediately after the command has
been processed as intended. Previously it was possible if reply_to was
already set that a reply never was sent to the caller and the caller
times out. This should improve some flakyness in the rabbit_stream_queue suite
as well.
Strictly this is a change that introduces indeterminism in the coordinator
state machine as during an upgrade different members may run different code
for this command. But as this state only affects side effects (replies) and
the state for the streams affected will shortly be removed this is very
unlikely to cause any real issues.
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 11 |
1 files changed, 4 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 10eccefd5f..54a0e7be09 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -922,7 +922,9 @@ update_stream0(#{system_time := _Ts} = _Meta, M#member{target = deleted} end, Members0), Stream0#stream{members = Members, - % reply_to = maps:get(from, Meta, undefined), + %% reset reply_to here to ensure a reply + %% is returned as the command has been accepted + reply_to = undefined, target = deleted}; update_stream0(#{system_time := _Ts} = _Meta, {add_replica, _StreamId, #{node := Node}}, @@ -1260,12 +1262,7 @@ evaluate_stream(#{index := Idx} = Meta, Action = {aux, {delete_member, StreamId, LeaderNode, make_writer_conf(Writer0, Stream0)}}, Writer = Writer0#member{current = {deleting, Idx}}, - Effs = case From of - undefined -> - [Action | Effs0]; - _ -> - wrap_reply(From, {ok, 0}) ++ [Action | Effs0] - end, + Effs = [Action | Effs0], Stream = Stream0#stream{reply_to = undefined}, eval_replicas(Meta, Writer, Replicas, Stream, Effs); {#member{state = {down, Epoch}, |