diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 147 |
1 files changed, 79 insertions, 68 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 9d234d7811..0b0df0d3b3 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -842,9 +842,20 @@ make_ra_conf(Node, Nodes) -> ra_event_formatter => Formatter}. -update_stream(#{system_time := _} = Meta, - {new_stream, StreamId, #{leader_node := LeaderNode, - queue := Q}}, undefined) -> +update_stream(Meta, Cmd, Stream) -> + try + update_stream0(Meta, Cmd, Stream) + catch + _:E:Stacktrace -> + rabbit_log:warning( + "~s failed to update stream:~n~p~n~p", + [?MODULE, E, Stacktrace]), + Stream + end. + +update_stream0(#{system_time := _} = Meta, + {new_stream, StreamId, #{leader_node := LeaderNode, + queue := Q}}, undefined) -> #{nodes := Nodes} = Conf = amqqueue:get_type_state(Q), %% this jumps straight to the state where all members %% have been stopped and a new writer has been chosen @@ -867,10 +878,10 @@ update_stream(#{system_time := _} = Meta, conf = Conf, members = Members, reply_to = maps:get(from, Meta, undefined)}; -update_stream(#{system_time := _Ts} = _Meta, - {delete_stream, _StreamId, #{}}, - #stream{members = Members0, - target = _} = Stream0) -> +update_stream0(#{system_time := _Ts} = _Meta, + {delete_stream, _StreamId, #{}}, + #stream{members = Members0, + target = _} = Stream0) -> Members = maps:map( fun (_, M) -> M#member{target = deleted} @@ -878,12 +889,12 @@ update_stream(#{system_time := _Ts} = _Meta, Stream0#stream{members = Members, % reply_to = maps:get(from, Meta, undefined), target = deleted}; -update_stream(#{system_time := _Ts} = _Meta, - {add_replica, _StreamId, #{node := Node}}, - #stream{members = Members0, - epoch = Epoch, - nodes = Nodes, - target = _} = Stream0) -> +update_stream0(#{system_time := _Ts} = _Meta, + {add_replica, _StreamId, #{node := Node}}, + #stream{members = Members0, + epoch = Epoch, + nodes = Nodes, + target = _} = Stream0) -> case maps:is_key(Node, Members0) of true -> Stream0; @@ -895,12 +906,12 @@ update_stream(#{system_time := _Ts} = _Meta, Stream0#stream{members = Members, nodes = lists:sort([Node | Nodes])} end; -update_stream(#{system_time := _Ts} = _Meta, - {delete_replica, _StreamId, #{node := Node}}, - #stream{members = Members0, - epoch = _Epoch, - nodes = Nodes, - target = _} = Stream0) -> +update_stream0(#{system_time := _Ts} = _Meta, + {delete_replica, _StreamId, #{node := Node}}, + #stream{members = Members0, + epoch = _Epoch, + nodes = Nodes, + target = _} = Stream0) -> case maps:is_key(Node, Members0) of true -> %% TODO: check of duplicate @@ -917,12 +928,12 @@ update_stream(#{system_time := _Ts} = _Meta, false -> Stream0 end; -update_stream(#{system_time := _Ts}, - {member_started, _StreamId, - #{epoch := E, - index := Idx, - pid := Pid} = Args}, #stream{epoch = E, - members = Members} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {member_started, _StreamId, + #{epoch := E, + index := Idx, + pid := Pid} = Args}, #stream{epoch = E, + members = Members} = Stream0) -> Node = node(Pid), case maps:get(Node, Members, undefined) of #member{role = {_, E}, @@ -942,10 +953,10 @@ update_stream(#{system_time := _Ts}, [?MODULE, Args, Member]), Stream0 end; -update_stream(#{system_time := _Ts}, - {member_deleted, _StreamId, #{node := Node}}, - #stream{nodes = Nodes, - members = Members0} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {member_deleted, _StreamId, #{node := Node}}, + #stream{nodes = Nodes, + members = Members0} = Stream0) -> case maps:take(Node, Members0) of {_, Members} when map_size(Members) == 0 -> undefined; @@ -959,15 +970,15 @@ update_stream(#{system_time := _Ts}, %% epochs? Stream0 end; -update_stream(#{system_time := _Ts}, - {member_stopped, _StreamId, - #{node := Node, - index := Idx, - epoch := StoppedEpoch, - tail := Tail}}, #stream{epoch = Epoch, - target = Target, - nodes = Nodes, - members = Members0} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {member_stopped, _StreamId, + #{node := Node, + index := Idx, + epoch := StoppedEpoch, + tail := Tail}}, #stream{epoch = Epoch, + target = Target, + nodes = Nodes, + members = Members0} = Stream0) -> IsLeaderInCurrent = case find_leader(Members0) of {#member{role = {writer, Epoch}, target = running, @@ -1046,9 +1057,9 @@ update_stream(#{system_time := _Ts}, _Member -> Stream0 end; -update_stream(#{system_time := _Ts}, - {mnesia_updated, _StreamId, #{epoch := E}}, - Stream0) -> +update_stream0(#{system_time := _Ts}, + {mnesia_updated, _StreamId, #{epoch := E}}, + Stream0) -> %% reset mnesia state case Stream0 of undefined -> @@ -1056,25 +1067,25 @@ update_stream(#{system_time := _Ts}, _ -> Stream0#stream{mnesia = {updated, E}} end; -update_stream(#{system_time := _Ts}, - {retention_updated, _StreamId, #{node := Node}}, - #stream{members = Members0, - conf = Conf} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {retention_updated, _StreamId, #{node := Node}}, + #stream{members = Members0, + conf = Conf} = Stream0) -> Members = maps:update_with(Node, fun (M) -> M#member{current = undefined, conf = Conf} end, Members0), Stream0#stream{members = Members}; -update_stream(#{system_time := _Ts}, - {action_failed, _StreamId, #{action := updating_mnesia}}, - #stream{mnesia = {_, E}} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {action_failed, _StreamId, #{action := updating_mnesia}}, + #stream{mnesia = {_, E}} = Stream0) -> Stream0#stream{mnesia = {updated, E}}; -update_stream(#{system_time := _Ts}, - {action_failed, _StreamId, - #{node := Node, - index := Idx, - action := Action, - epoch := _Epoch}}, #stream{members = Members0} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {action_failed, _StreamId, + #{node := Node, + index := Idx, + action := Action, + epoch := _Epoch}}, #stream{members = Members0} = Stream0) -> Members1 = maps:update_with(Node, fun (#member{current = {C, I}} = M) when C == Action andalso I == Idx -> @@ -1094,10 +1105,10 @@ update_stream(#{system_time := _Ts}, _ -> Stream0#stream{members = Members1} end; -update_stream(#{system_time := _Ts}, - {down, Pid, Reason}, - #stream{epoch = E, - members = Members0} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {down, Pid, Reason}, + #stream{epoch = E, + members = Members0} = Stream0) -> DownNode = node(Pid), case Members0 of #{DownNode := #member{role = {writer, E}, @@ -1128,12 +1139,12 @@ update_stream(#{system_time := _Ts}, _ -> Stream0 end; -update_stream(#{system_time := _Ts}, - {down, _Pid, _Reason}, undefined) -> +update_stream0(#{system_time := _Ts}, + {down, _Pid, _Reason}, undefined) -> undefined; -update_stream(#{system_time := _Ts} = _Meta, - {nodeup, Node}, - #stream{members = Members0} = Stream0) -> +update_stream0(#{system_time := _Ts} = _Meta, + {nodeup, Node}, + #stream{members = Members0} = Stream0) -> Members = maps:map( fun (_, #member{node = N, current = {sleeping, nodeup}} = M) @@ -1143,13 +1154,13 @@ update_stream(#{system_time := _Ts} = _Meta, M end, Members0), Stream0#stream{members = Members}; -update_stream(#{system_time := _Ts}, - {policy_changed, _StreamId, #{queue := Q}}, - #stream{conf = Conf0, - members = _Members0} = Stream0) -> +update_stream0(#{system_time := _Ts}, + {policy_changed, _StreamId, #{queue := Q}}, + #stream{conf = Conf0, + members = _Members0} = Stream0) -> Conf = rabbit_stream_queue:update_stream_conf(Q, Conf0), Stream0#stream{conf = Conf}; -update_stream(_Meta, _Cmd, undefined) -> +update_stream0(_Meta, _Cmd, undefined) -> undefined. eval_listeners(#stream{listeners = Listeners0, |