diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 65 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 68 |
3 files changed, 117 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 9d234d7811..bcf19d28a5 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -55,6 +55,7 @@ -include("rabbit_stream_coordinator.hrl"). -include("amqqueue.hrl"). +-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s -type state() :: #?MODULE{}. -type args() :: #{index := ra:index(), @@ -142,8 +143,38 @@ delete_stream(Q, ActingUser) Err end. -add_replica(StreamId, Node) -> - process_command({add_replica, StreamId, #{node => Node}}). +-spec add_replica(amqqueue:amqqueue(), node()) -> + ok | {error, term()}. +add_replica(Q, Node) when ?is_amqqueue(Q) -> + %% performing safety check + %% if any replica is stale then we should not allow + %% further replicas to be added + Pid = amqqueue:get_pid(Q), + try + ReplState0 = osiris_writer:query_replication_state(Pid), + {{_, InitTs}, ReplState} = maps:take(node(Pid), ReplState0), + {MaxTs, MinTs} = maps:fold(fun (_, {_, Ts}, {Max, Min}) -> + {max(Ts, Max), min(Ts, Min)} + end, {InitTs, InitTs}, ReplState), + case (MaxTs - MinTs) > ?REPLICA_FRESHNESS_LIMIT_MS of + true -> + {error, {disallowed, out_of_sync_replica}}; + false -> + Name = rabbit_misc:rs(amqqueue:get_name(Q)), + rabbit_log:info("~s : adding replica ~s to ~s Replication State: ~w", + [?MODULE, Node, Name, ReplState0]), + StreamId = maps:get(name, amqqueue:get_type_state(Q)), + case process_command({add_replica, StreamId, #{node => Node}}) of + {ok, Result, _} -> + Result; + Err -> + Err + end + end + catch + _:Error -> + {error, Error} + end. delete_replica(StreamId, Node) -> process_command({delete_replica, StreamId, #{node => Node}}). @@ -416,8 +447,8 @@ apply(Meta, {nodeup, Node} = Cmd, return(Meta, State#?MODULE{monitors = Monitors, streams = Streams}, ok, Effects); apply(Meta, UnkCmd, State) -> - rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W", - [UnkCmd, 10]), + rabbit_log:debug("~s: unknown command ~W", + [?MODULE, UnkCmd, 10]), return(Meta, State, {error, unknown_command}, []). return(#{index := Idx}, State, Reply, Effects) -> @@ -544,36 +575,38 @@ handle_aux(leader, _, {start_writer, StreamId, handle_aux(leader, _, {start_replica, StreamId, #{epoch := Epoch, node := Node} = Args, Conf}, Aux, LogState, _) -> - rabbit_log:debug("rabbit_stream_coordinator: running action: 'start_replica'" - " for ~s on node ~w in epoch ~b", [StreamId, Node, Epoch]), + rabbit_log:debug("~s: running action: 'start_replica'" + " for ~s on node ~w in epoch ~b", + [?MODULE, StreamId, Node, Epoch]), ActionFun = fun () -> phase_start_replica(StreamId, Args, Conf) end, run_action(starting, StreamId, Args, ActionFun, Aux, LogState); handle_aux(leader, _, {stop, StreamId, #{node := Node, epoch := Epoch} = Args, Conf}, Aux, LogState, _) -> - rabbit_log:debug("rabbit_stream_coordinator: running action: 'stop'" - " for ~s on node ~w in epoch ~b", [StreamId, Node, Epoch]), + rabbit_log:debug("~s: running action: 'stop'" + " for ~s on node ~w in epoch ~b", + [?MODULE, StreamId, Node, Epoch]), ActionFun = fun () -> phase_stop_member(StreamId, Args, Conf) end, run_action(stopping, StreamId, Args, ActionFun, Aux, LogState); handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf}, #aux{actions = _Monitors} = Aux, LogState, #?MODULE{streams = _Streams}) -> - rabbit_log:debug("rabbit_stream_coordinator: running action: 'update_mnesia'" - " for ~s", [StreamId]), + rabbit_log:debug("~s: running action: 'update_mnesia'" + " for ~s", [?MODULE, StreamId]), ActionFun = fun () -> phase_update_mnesia(StreamId, Args, Conf) end, run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState); handle_aux(leader, _, {update_retention, StreamId, Args, _Conf}, #aux{actions = _Monitors} = Aux, LogState, #?MODULE{streams = _Streams}) -> - rabbit_log:debug("rabbit_stream_coordinator: running action: 'update_retention'" - " for ~s", [StreamId]), + rabbit_log:debug("~s: running action: 'update_retention'" + " for ~s", [?MODULE, StreamId]), ActionFun = fun () -> phase_update_retention(StreamId, Args) end, run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState); handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf}, #aux{actions = _Monitors} = Aux, LogState, #?MODULE{streams = _Streams}) -> - rabbit_log:debug("rabbit_stream_coordinator: running action: 'delete_member'" - " for ~s ~s", [StreamId, Node]), + rabbit_log:debug("~s: running action: 'delete_member'" + " for ~s ~s", [?MODULE, StreamId, Node]), ActionFun = fun () -> phase_delete_member(StreamId, Args, Conf) end, run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState); handle_aux(leader, _, fail_active_actions, @@ -594,9 +627,9 @@ handle_aux(leader, _, {down, Pid, Reason}, %% An action has failed - report back to the state machine case maps:get(Pid, Monitors0, undefined) of {StreamId, Action, #{node := Node, epoch := Epoch} = Args} -> - rabbit_log:warning("Error while executing action for stream queue ~s, " + rabbit_log:warning("~s: error while executing action for stream queue ~s, " " node ~s, epoch ~b Err: ~w", - [StreamId, Node, Epoch, Reason]), + [?MODULE, StreamId, Node, Epoch, Reason]), Monitors = maps:remove(Pid, Monitors0), Cmd = {action_failed, StreamId, Args#{action => Action}}, send_self_command(Cmd), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 05977c3a58..57f3330691 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -642,9 +642,7 @@ add_replica(VHost, Name, Node) -> false -> {error, node_not_running}; true -> - #{name := StreamId} = amqqueue:get_type_state(Q), - {ok, Reply, _} = rabbit_stream_coordinator:add_replica(StreamId, Node), - Reply + rabbit_stream_coordinator:add_replica(Q, Node) end; E -> E diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 78e6070311..fe49ab333a 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -50,7 +50,8 @@ groups() -> delete_down_replica, replica_recovery, leader_failover, - leader_failover_dedupe]}, + leader_failover_dedupe, + add_replicas]}, {cluster_size_3_parallel, [parallel], [delete_replica, delete_classic_replica, delete_quorum_replica, @@ -294,6 +295,71 @@ delete_queue(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})). +add_replicas(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 1}])), + + %% TODO: add lots of data so that replica is still out of sync when + %% second request comes in + NumMsgs = 1000, + Data = crypto:strong_rand_bytes(1000), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, Data) || _ <- lists:seq(1, NumMsgs)], + %% should be sufficient for the next message to fall in the next + %% chunk + timer:sleep(100), + publish(Ch, Q, <<"last">>), + amqp_channel:wait_for_confirms(Ch, 30), + timer:sleep(1000), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + + timer:sleep(1000), + + %% it is almost impossible to reliably catch this situation. + %% increasing number of messages published and the data size could help + % ?assertMatch({error, {disallowed, out_of_sync_replica}} , + ?assertMatch(ok , + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server2])), + timer:sleep(1000), + %% validate we can read the last entry + qos(Ch, 10, false), + amqp_channel:subscribe( + Ch, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = <<"last">>}} -> + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 60000 -> + flush(), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + exit(deliver_timeout) + end, + % ?assertMatch({error, {disallowed, out_of_sync_replica}} , + % rpc:call(Server0, rabbit_stream_queue, add_replica, + % [<<"/">>, Q, Server2])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + ok. + add_replica(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |