diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-06-27 13:56:19 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-06-27 14:01:00 +0100 |
| commit | 2170396194b3c0cbab6146d508f7bd5e669bdc39 (patch) | |
| tree | 93819f8c48bb51f1e72ebb08afdd2f909e377f23 | |
| parent | 4e0211db31e04330960b19903b1b23c2507f52b5 (diff) | |
| download | rabbitmq-server-git-2170396194b3c0cbab6146d508f7bd5e669bdc39.tar.gz | |
Pass all members to synchronous Ra operations
As this will make these more work reliably when leadership is in flux.
Also ensure that servers started and added after the initial declare
uses the same configuration.
| -rw-r--r-- | src/rabbit_fifo.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 75 |
2 files changed, 65 insertions, 32 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d9566ea8f7..cfb1ac7fda 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -64,7 +64,9 @@ make_credit/4, make_purge/0, make_purge_nodes/1, - make_update_config/1 + make_update_config/1, + + from_log/2 ]). %% command records representing all the protocol actions that are supported @@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) -> [{_, Use}] -> Use end. +from_log(Log, State0) -> + lists:foldl( + fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) -> + Meta = Meta0#{index => Idx, + term => Term}, + case apply(Meta, Cmd, S0) of + {S, _, E} when is_list(E) -> + {S, Effs ++ E}; + {S, _, E} -> + {S, Effs ++ [E]}; + {S, _} -> + {S, Effs} + end; + (_, Acc) -> + Acc + end, {State0, []}, Log). + + %%% Internal messages_ready(#?MODULE{messages = M, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6fc5fcd83f..95c8a56ad4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -119,24 +119,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> - RaMachine = ra_machine(NewQ), - ServerIds = [{RaName, Node} || Node <- Nodes], - ClusterName = RaName, TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), - RaConfs = [begin - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), - FName = rabbit_misc:rs(QName), - #{cluster_name => ClusterName, - id => ServerId, - uid => UId, - friendly_name => FName, - metrics_key => QName, - initial_members => ServerIds, - log_init_args => #{uid => UId}, - tick_timeout => TickTimeout, - machine => RaMachine} - end || ServerId <- ServerIds], - + RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) + || ServerId <- members(NewQ)], case ra:start_cluster(RaConfs) of {ok, _, _} -> rabbit_event:notify(queue_created, @@ -324,7 +309,6 @@ reductions(Name) -> recover(Queues) -> [begin {Name, _} = amqqueue:get_pid(Q0), - Nodes = amqqueue:get_quorum_nodes(Q0), case ra:restart_server({Name, node()}) of ok -> % queue was restarted, good @@ -334,10 +318,12 @@ recover(Queues) -> Err1 == name_not_registered -> % queue was never started on this node % so needs to be started from scratch. - Machine = ra_machine(Q0), - RaNodes = [{Name, Node} || Node <- Nodes], - case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of - ok -> ok; + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout), + case ra:start_server(Conf) of + ok -> + ok; Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" " be started ~w", [Name, Err2]), @@ -723,15 +709,17 @@ add_member(VHost, Name, Node) -> end. add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> - {RaName, _} = ServerRef = amqqueue:get_pid(Q), + {RaName, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), - QNodes = amqqueue:get_quorum_nodes(Q), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + Members = members(Q), + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q, ServerId, TickTimeout), + case ra:start_server(Conf) of ok -> - case ra:add_member(ServerRef, ServerId) of + case ra:add_member(Members, ServerId) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -770,16 +758,18 @@ delete_member(VHost, Name, Node) -> E end. + delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case amqqueue:get_quorum_nodes(Q) of - [Node] -> + case members(Q) of + [{_, Node}] -> + %% deleting the last member is not allowed {error, last_node}; - _ -> - case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of + Members -> + case ra:leave_and_delete_server(Members, ServerId) of ok -> Fun = fun(Q1) -> amqqueue:set_quorum_nodes( @@ -1143,3 +1133,26 @@ select_quorum_nodes(0, _, Selected) -> select_quorum_nodes(Size, Rest, Selected) -> S = lists:nth(rand:uniform(length(Rest)), Rest), select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). + +%% member with the current leader first +members(Q) when ?amqqueue_is_quorum(Q) -> + {RaName, LeaderNode} = amqqueue:get_pid(Q), + Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)), + [{RaName, N} || N <- [LeaderNode | Nodes]]. + +make_ra_conf(Q, ServerId, TickTimeout) -> + QName = amqqueue:get_name(Q), + RaMachine = ra_machine(Q), + [{ClusterName, _} | _] = Members = members(Q), + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + FName = rabbit_misc:rs(QName), + #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + friendly_name => FName, + metrics_key => QName, + initial_members => Members, + log_init_args => #{uid => UId}, + tick_timeout => TickTimeout, + machine => RaMachine}. + |
