summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl61
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl25
2 files changed, 67 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index b4ce940313..5aad5542a5 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -341,26 +341,31 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
Meta = maps:without([term, machine_version], Meta0),
- Stream1 = update_stream(Meta, Cmd, Stream0),
- Reply = case Stream1 of
- #stream{reply_to = undefined} ->
- ok;
+ case filter_command(Meta, Cmd, Stream0) of
+ ok ->
+ Stream1 = update_stream(Meta, Cmd, Stream0),
+ Reply = case Stream1 of
+ #stream{reply_to = undefined} ->
+ ok;
+ _ ->
+ %% reply_to is set so we'll reply later
+ '$ra_no_reply'
+ end,
+ case Stream1 of
+ undefined ->
+ return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
+ Reply, []);
_ ->
- %% reply_to is set so we'll reply later
- '$ra_no_reply'
- end,
- case Stream1 of
- undefined ->
- return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
- Reply, []);
- _ ->
- {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
- {Stream3, Effects1} = eval_listeners(Stream2, Effects0),
- {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
- {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
- return(Meta,
- State0#?MODULE{streams = Streams0#{StreamId => Stream},
- monitors = Monitors}, Reply, Effects)
+ {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
+ {Stream3, Effects1} = eval_listeners(Stream2, Effects0),
+ {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
+ {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
+ return(Meta,
+ State0#?MODULE{streams = Streams0#{StreamId => Stream},
+ monitors = Monitors}, Reply, Effects)
+ end;
+ Reply ->
+ return(Meta, State0, Reply, [])
end;
apply(Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
@@ -874,6 +879,23 @@ make_ra_conf(Node, Nodes) ->
machine => {module, ?MODULE, #{}},
ra_event_formatter => Formatter}.
+filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{members = Members0}) ->
+ Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted ->
+ true;
+ (_, _) ->
+ false
+ end, Members0),
+ case maps:size(Members) =< 1 of
+ true ->
+ rabbit_log:warning(
+ "~s failed to delete ~p replica, last cluster member",
+ [?MODULE, Node]),
+ {error, last_stream_member};
+ false ->
+ ok
+ end;
+filter_command(_, _, _) ->
+ ok.
update_stream(Meta, Cmd, Stream) ->
try
@@ -1536,6 +1558,7 @@ set_running_to_stopped(Members) ->
(_, M) ->
M
end, Members).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index fe49ab333a..e87d460b77 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -53,6 +53,7 @@ groups() ->
leader_failover_dedupe,
add_replicas]},
{cluster_size_3_parallel, [parallel], [delete_replica,
+ delete_last_replica,
delete_classic_replica,
delete_quorum_replica,
consume_from_replica,
@@ -444,6 +445,30 @@ delete_replica(Config) ->
check_leader_and_replicas(Config, [Server0]),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+delete_last_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, [Server0, Server1, Server2]),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server2])),
+ %% check they're gone
+ check_leader_and_replicas(Config, [Server0]),
+ %% delete the last one
+ ?assertEqual({error, last_stream_member},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server0])),
+ %% It's still here
+ check_leader_and_replicas(Config, [Server0]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+
grow_coordinator_cluster(Config) ->
[Server0, Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),