diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 61 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 25 |
2 files changed, 67 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index b4ce940313..5aad5542a5 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -341,26 +341,31 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd, monitors = Monitors0} = State0) -> Stream0 = maps:get(StreamId, Streams0, undefined), Meta = maps:without([term, machine_version], Meta0), - Stream1 = update_stream(Meta, Cmd, Stream0), - Reply = case Stream1 of - #stream{reply_to = undefined} -> - ok; + case filter_command(Meta, Cmd, Stream0) of + ok -> + Stream1 = update_stream(Meta, Cmd, Stream0), + Reply = case Stream1 of + #stream{reply_to = undefined} -> + ok; + _ -> + %% reply_to is set so we'll reply later + '$ra_no_reply' + end, + case Stream1 of + undefined -> + return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)}, + Reply, []); _ -> - %% reply_to is set so we'll reply later - '$ra_no_reply' - end, - case Stream1 of - undefined -> - return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)}, - Reply, []); - _ -> - {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []), - {Stream3, Effects1} = eval_listeners(Stream2, Effects0), - {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1), - {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2), - return(Meta, - State0#?MODULE{streams = Streams0#{StreamId => Stream}, - monitors = Monitors}, Reply, Effects) + {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []), + {Stream3, Effects1} = eval_listeners(Stream2, Effects0), + {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1), + {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2), + return(Meta, + State0#?MODULE{streams = Streams0#{StreamId => Stream}, + monitors = Monitors}, Reply, Effects) + end; + Reply -> + return(Meta, State0, Reply, []) end; apply(Meta, {down, Pid, Reason} = Cmd, #?MODULE{streams = Streams0, @@ -874,6 +879,23 @@ make_ra_conf(Node, Nodes) -> machine => {module, ?MODULE, #{}}, ra_event_formatter => Formatter}. +filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{members = Members0}) -> + Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted -> + true; + (_, _) -> + false + end, Members0), + case maps:size(Members) =< 1 of + true -> + rabbit_log:warning( + "~s failed to delete ~p replica, last cluster member", + [?MODULE, Node]), + {error, last_stream_member}; + false -> + ok + end; +filter_command(_, _, _) -> + ok. update_stream(Meta, Cmd, Stream) -> try @@ -1536,6 +1558,7 @@ set_running_to_stopped(Members) -> (_, M) -> M end, Members). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index fe49ab333a..e87d460b77 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -53,6 +53,7 @@ groups() -> leader_failover_dedupe, add_replicas]}, {cluster_size_3_parallel, [parallel], [delete_replica, + delete_last_replica, delete_classic_replica, delete_quorum_replica, consume_from_replica, @@ -444,6 +445,30 @@ delete_replica(Config) -> check_leader_and_replicas(Config, [Server0]), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +delete_last_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server2])), + %% check they're gone + check_leader_and_replicas(Config, [Server0]), + %% delete the last one + ?assertEqual({error, last_stream_member}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server0])), + %% It's still here + check_leader_and_replicas(Config, [Server0]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + grow_coordinator_cluster(Config) -> [Server0, Server1, _Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |