summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl147
1 files changed, 79 insertions, 68 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 9d234d7811..0b0df0d3b3 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -842,9 +842,20 @@ make_ra_conf(Node, Nodes) ->
ra_event_formatter => Formatter}.
-update_stream(#{system_time := _} = Meta,
- {new_stream, StreamId, #{leader_node := LeaderNode,
- queue := Q}}, undefined) ->
+update_stream(Meta, Cmd, Stream) ->
+ try
+ update_stream0(Meta, Cmd, Stream)
+ catch
+ _:E:Stacktrace ->
+ rabbit_log:warning(
+ "~s failed to update stream:~n~p~n~p",
+ [?MODULE, E, Stacktrace]),
+ Stream
+ end.
+
+update_stream0(#{system_time := _} = Meta,
+ {new_stream, StreamId, #{leader_node := LeaderNode,
+ queue := Q}}, undefined) ->
#{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
%% this jumps straight to the state where all members
%% have been stopped and a new writer has been chosen
@@ -867,10 +878,10 @@ update_stream(#{system_time := _} = Meta,
conf = Conf,
members = Members,
reply_to = maps:get(from, Meta, undefined)};
-update_stream(#{system_time := _Ts} = _Meta,
- {delete_stream, _StreamId, #{}},
- #stream{members = Members0,
- target = _} = Stream0) ->
+update_stream0(#{system_time := _Ts} = _Meta,
+ {delete_stream, _StreamId, #{}},
+ #stream{members = Members0,
+ target = _} = Stream0) ->
Members = maps:map(
fun (_, M) ->
M#member{target = deleted}
@@ -878,12 +889,12 @@ update_stream(#{system_time := _Ts} = _Meta,
Stream0#stream{members = Members,
% reply_to = maps:get(from, Meta, undefined),
target = deleted};
-update_stream(#{system_time := _Ts} = _Meta,
- {add_replica, _StreamId, #{node := Node}},
- #stream{members = Members0,
- epoch = Epoch,
- nodes = Nodes,
- target = _} = Stream0) ->
+update_stream0(#{system_time := _Ts} = _Meta,
+ {add_replica, _StreamId, #{node := Node}},
+ #stream{members = Members0,
+ epoch = Epoch,
+ nodes = Nodes,
+ target = _} = Stream0) ->
case maps:is_key(Node, Members0) of
true ->
Stream0;
@@ -895,12 +906,12 @@ update_stream(#{system_time := _Ts} = _Meta,
Stream0#stream{members = Members,
nodes = lists:sort([Node | Nodes])}
end;
-update_stream(#{system_time := _Ts} = _Meta,
- {delete_replica, _StreamId, #{node := Node}},
- #stream{members = Members0,
- epoch = _Epoch,
- nodes = Nodes,
- target = _} = Stream0) ->
+update_stream0(#{system_time := _Ts} = _Meta,
+ {delete_replica, _StreamId, #{node := Node}},
+ #stream{members = Members0,
+ epoch = _Epoch,
+ nodes = Nodes,
+ target = _} = Stream0) ->
case maps:is_key(Node, Members0) of
true ->
%% TODO: check of duplicate
@@ -917,12 +928,12 @@ update_stream(#{system_time := _Ts} = _Meta,
false ->
Stream0
end;
-update_stream(#{system_time := _Ts},
- {member_started, _StreamId,
- #{epoch := E,
- index := Idx,
- pid := Pid} = Args}, #stream{epoch = E,
- members = Members} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {member_started, _StreamId,
+ #{epoch := E,
+ index := Idx,
+ pid := Pid} = Args}, #stream{epoch = E,
+ members = Members} = Stream0) ->
Node = node(Pid),
case maps:get(Node, Members, undefined) of
#member{role = {_, E},
@@ -942,10 +953,10 @@ update_stream(#{system_time := _Ts},
[?MODULE, Args, Member]),
Stream0
end;
-update_stream(#{system_time := _Ts},
- {member_deleted, _StreamId, #{node := Node}},
- #stream{nodes = Nodes,
- members = Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {member_deleted, _StreamId, #{node := Node}},
+ #stream{nodes = Nodes,
+ members = Members0} = Stream0) ->
case maps:take(Node, Members0) of
{_, Members} when map_size(Members) == 0 ->
undefined;
@@ -959,15 +970,15 @@ update_stream(#{system_time := _Ts},
%% epochs?
Stream0
end;
-update_stream(#{system_time := _Ts},
- {member_stopped, _StreamId,
- #{node := Node,
- index := Idx,
- epoch := StoppedEpoch,
- tail := Tail}}, #stream{epoch = Epoch,
- target = Target,
- nodes = Nodes,
- members = Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {member_stopped, _StreamId,
+ #{node := Node,
+ index := Idx,
+ epoch := StoppedEpoch,
+ tail := Tail}}, #stream{epoch = Epoch,
+ target = Target,
+ nodes = Nodes,
+ members = Members0} = Stream0) ->
IsLeaderInCurrent = case find_leader(Members0) of
{#member{role = {writer, Epoch},
target = running,
@@ -1046,9 +1057,9 @@ update_stream(#{system_time := _Ts},
_Member ->
Stream0
end;
-update_stream(#{system_time := _Ts},
- {mnesia_updated, _StreamId, #{epoch := E}},
- Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {mnesia_updated, _StreamId, #{epoch := E}},
+ Stream0) ->
%% reset mnesia state
case Stream0 of
undefined ->
@@ -1056,25 +1067,25 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0#stream{mnesia = {updated, E}}
end;
-update_stream(#{system_time := _Ts},
- {retention_updated, _StreamId, #{node := Node}},
- #stream{members = Members0,
- conf = Conf} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {retention_updated, _StreamId, #{node := Node}},
+ #stream{members = Members0,
+ conf = Conf} = Stream0) ->
Members = maps:update_with(Node, fun (M) ->
M#member{current = undefined,
conf = Conf}
end, Members0),
Stream0#stream{members = Members};
-update_stream(#{system_time := _Ts},
- {action_failed, _StreamId, #{action := updating_mnesia}},
- #stream{mnesia = {_, E}} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {action_failed, _StreamId, #{action := updating_mnesia}},
+ #stream{mnesia = {_, E}} = Stream0) ->
Stream0#stream{mnesia = {updated, E}};
-update_stream(#{system_time := _Ts},
- {action_failed, _StreamId,
- #{node := Node,
- index := Idx,
- action := Action,
- epoch := _Epoch}}, #stream{members = Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {action_failed, _StreamId,
+ #{node := Node,
+ index := Idx,
+ action := Action,
+ epoch := _Epoch}}, #stream{members = Members0} = Stream0) ->
Members1 = maps:update_with(Node,
fun (#member{current = {C, I}} = M)
when C == Action andalso I == Idx ->
@@ -1094,10 +1105,10 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0#stream{members = Members1}
end;
-update_stream(#{system_time := _Ts},
- {down, Pid, Reason},
- #stream{epoch = E,
- members = Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {down, Pid, Reason},
+ #stream{epoch = E,
+ members = Members0} = Stream0) ->
DownNode = node(Pid),
case Members0 of
#{DownNode := #member{role = {writer, E},
@@ -1128,12 +1139,12 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0
end;
-update_stream(#{system_time := _Ts},
- {down, _Pid, _Reason}, undefined) ->
+update_stream0(#{system_time := _Ts},
+ {down, _Pid, _Reason}, undefined) ->
undefined;
-update_stream(#{system_time := _Ts} = _Meta,
- {nodeup, Node},
- #stream{members = Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts} = _Meta,
+ {nodeup, Node},
+ #stream{members = Members0} = Stream0) ->
Members = maps:map(
fun (_, #member{node = N,
current = {sleeping, nodeup}} = M)
@@ -1143,13 +1154,13 @@ update_stream(#{system_time := _Ts} = _Meta,
M
end, Members0),
Stream0#stream{members = Members};
-update_stream(#{system_time := _Ts},
- {policy_changed, _StreamId, #{queue := Q}},
- #stream{conf = Conf0,
- members = _Members0} = Stream0) ->
+update_stream0(#{system_time := _Ts},
+ {policy_changed, _StreamId, #{queue := Q}},
+ #stream{conf = Conf0,
+ members = _Members0} = Stream0) ->
Conf = rabbit_stream_queue:update_stream_conf(Q, Conf0),
Stream0#stream{conf = Conf};
-update_stream(_Meta, _Cmd, undefined) ->
+update_stream0(_Meta, _Cmd, undefined) ->
undefined.
eval_listeners(#stream{listeners = Listeners0,