summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-04-22 12:34:45 +0100
committerkjnilsson <knilsson@pivotal.io>2021-04-27 09:38:39 +0100
commita827275a4325b1de6dbb02d5e76ad3b761d2fc41 (patch)
tree6653bd2015234655b62e73d0e73340b00541eaeb
parenta17dde45bc309509046615a30f371fd775f66c06 (diff)
downloadrabbitmq-server-git-stream-add-replica-check.tar.gz
Streams: safer replica additionstream-add-replica-check
Disallow replica additions if any of the existing replicas are more than 10 seconds out of date.
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl65
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl4
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl68
3 files changed, 117 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 9d234d7811..bcf19d28a5 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -55,6 +55,7 @@
-include("rabbit_stream_coordinator.hrl").
-include("amqqueue.hrl").
+-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
-type state() :: #?MODULE{}.
-type args() :: #{index := ra:index(),
@@ -142,8 +143,38 @@ delete_stream(Q, ActingUser)
Err
end.
-add_replica(StreamId, Node) ->
- process_command({add_replica, StreamId, #{node => Node}}).
+-spec add_replica(amqqueue:amqqueue(), node()) ->
+ ok | {error, term()}.
+add_replica(Q, Node) when ?is_amqqueue(Q) ->
+ %% performing safety check
+ %% if any replica is stale then we should not allow
+ %% further replicas to be added
+ Pid = amqqueue:get_pid(Q),
+ try
+ ReplState0 = osiris_writer:query_replication_state(Pid),
+ {{_, InitTs}, ReplState} = maps:take(node(Pid), ReplState0),
+ {MaxTs, MinTs} = maps:fold(fun (_, {_, Ts}, {Max, Min}) ->
+ {max(Ts, Max), min(Ts, Min)}
+ end, {InitTs, InitTs}, ReplState),
+ case (MaxTs - MinTs) > ?REPLICA_FRESHNESS_LIMIT_MS of
+ true ->
+ {error, {disallowed, out_of_sync_replica}};
+ false ->
+ Name = rabbit_misc:rs(amqqueue:get_name(Q)),
+ rabbit_log:info("~s : adding replica ~s to ~s Replication State: ~w",
+ [?MODULE, Node, Name, ReplState0]),
+ StreamId = maps:get(name, amqqueue:get_type_state(Q)),
+ case process_command({add_replica, StreamId, #{node => Node}}) of
+ {ok, Result, _} ->
+ Result;
+ Err ->
+ Err
+ end
+ end
+ catch
+ _:Error ->
+ {error, Error}
+ end.
delete_replica(StreamId, Node) ->
process_command({delete_replica, StreamId, #{node => Node}}).
@@ -416,8 +447,8 @@ apply(Meta, {nodeup, Node} = Cmd,
return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams}, ok, Effects);
apply(Meta, UnkCmd, State) ->
- rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W",
- [UnkCmd, 10]),
+ rabbit_log:debug("~s: unknown command ~W",
+ [?MODULE, UnkCmd, 10]),
return(Meta, State, {error, unknown_command}, []).
return(#{index := Idx}, State, Reply, Effects) ->
@@ -544,36 +575,38 @@ handle_aux(leader, _, {start_writer, StreamId,
handle_aux(leader, _, {start_replica, StreamId,
#{epoch := Epoch, node := Node} = Args, Conf},
Aux, LogState, _) ->
- rabbit_log:debug("rabbit_stream_coordinator: running action: 'start_replica'"
- " for ~s on node ~w in epoch ~b", [StreamId, Node, Epoch]),
+ rabbit_log:debug("~s: running action: 'start_replica'"
+ " for ~s on node ~w in epoch ~b",
+ [?MODULE, StreamId, Node, Epoch]),
ActionFun = fun () -> phase_start_replica(StreamId, Args, Conf) end,
run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
handle_aux(leader, _, {stop, StreamId, #{node := Node,
epoch := Epoch} = Args, Conf},
Aux, LogState, _) ->
- rabbit_log:debug("rabbit_stream_coordinator: running action: 'stop'"
- " for ~s on node ~w in epoch ~b", [StreamId, Node, Epoch]),
+ rabbit_log:debug("~s: running action: 'stop'"
+ " for ~s on node ~w in epoch ~b",
+ [?MODULE, StreamId, Node, Epoch]),
ActionFun = fun () -> phase_stop_member(StreamId, Args, Conf) end,
run_action(stopping, StreamId, Args, ActionFun, Aux, LogState);
handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
- rabbit_log:debug("rabbit_stream_coordinator: running action: 'update_mnesia'"
- " for ~s", [StreamId]),
+ rabbit_log:debug("~s: running action: 'update_mnesia'"
+ " for ~s", [?MODULE, StreamId]),
ActionFun = fun () -> phase_update_mnesia(StreamId, Args, Conf) end,
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState);
handle_aux(leader, _, {update_retention, StreamId, Args, _Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
- rabbit_log:debug("rabbit_stream_coordinator: running action: 'update_retention'"
- " for ~s", [StreamId]),
+ rabbit_log:debug("~s: running action: 'update_retention'"
+ " for ~s", [?MODULE, StreamId]),
ActionFun = fun () -> phase_update_retention(StreamId, Args) end,
run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState);
handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
- rabbit_log:debug("rabbit_stream_coordinator: running action: 'delete_member'"
- " for ~s ~s", [StreamId, Node]),
+ rabbit_log:debug("~s: running action: 'delete_member'"
+ " for ~s ~s", [?MODULE, StreamId, Node]),
ActionFun = fun () -> phase_delete_member(StreamId, Args, Conf) end,
run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState);
handle_aux(leader, _, fail_active_actions,
@@ -594,9 +627,9 @@ handle_aux(leader, _, {down, Pid, Reason},
%% An action has failed - report back to the state machine
case maps:get(Pid, Monitors0, undefined) of
{StreamId, Action, #{node := Node, epoch := Epoch} = Args} ->
- rabbit_log:warning("Error while executing action for stream queue ~s, "
+ rabbit_log:warning("~s: error while executing action for stream queue ~s, "
" node ~s, epoch ~b Err: ~w",
- [StreamId, Node, Epoch, Reason]),
+ [?MODULE, StreamId, Node, Epoch, Reason]),
Monitors = maps:remove(Pid, Monitors0),
Cmd = {action_failed, StreamId, Args#{action => Action}},
send_self_command(Cmd),
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 05977c3a58..57f3330691 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -642,9 +642,7 @@ add_replica(VHost, Name, Node) ->
false ->
{error, node_not_running};
true ->
- #{name := StreamId} = amqqueue:get_type_state(Q),
- {ok, Reply, _} = rabbit_stream_coordinator:add_replica(StreamId, Node),
- Reply
+ rabbit_stream_coordinator:add_replica(Q, Node)
end;
E ->
E
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 78e6070311..fe49ab333a 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -50,7 +50,8 @@ groups() ->
delete_down_replica,
replica_recovery,
leader_failover,
- leader_failover_dedupe]},
+ leader_failover_dedupe,
+ add_replicas]},
{cluster_size_3_parallel, [parallel], [delete_replica,
delete_classic_replica,
delete_quorum_replica,
@@ -294,6 +295,71 @@ delete_queue(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+add_replicas(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-initial-cluster-size">>, long, 1}])),
+
+ %% TODO: add lots of data so that replica is still out of sync when
+ %% second request comes in
+ NumMsgs = 1000,
+ Data = crypto:strong_rand_bytes(1000),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, Data) || _ <- lists:seq(1, NumMsgs)],
+ %% should be sufficient for the next message to fall in the next
+ %% chunk
+ timer:sleep(100),
+ publish(Ch, Q, <<"last">>),
+ amqp_channel:wait_for_confirms(Ch, 30),
+ timer:sleep(1000),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+
+ timer:sleep(1000),
+
+ %% it is almost impossible to reliably catch this situation.
+ %% increasing number of messages published and the data size could help
+ % ?assertMatch({error, {disallowed, out_of_sync_replica}} ,
+ ?assertMatch(ok ,
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server2])),
+ timer:sleep(1000),
+ %% validate we can read the last entry
+ qos(Ch, 10, false),
+ amqp_channel:subscribe(
+ Ch, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = <<"last">>}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 60000 ->
+ flush(),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+ exit(deliver_timeout)
+ end,
+ % ?assertMatch({error, {disallowed, out_of_sync_replica}} ,
+ % rpc:call(Server0, rabbit_stream_queue, add_replica,
+ % [<<"/">>, Q, Server2])),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+ ok.
+
add_replica(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),