diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 77 |
2 files changed, 66 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d9566ea8f7..cfb1ac7fda 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -64,7 +64,9 @@ make_credit/4, make_purge/0, make_purge_nodes/1, - make_update_config/1 + make_update_config/1, + + from_log/2 ]). %% command records representing all the protocol actions that are supported @@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) -> [{_, Use}] -> Use end. +from_log(Log, State0) -> + lists:foldl( + fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) -> + Meta = Meta0#{index => Idx, + term => Term}, + case apply(Meta, Cmd, S0) of + {S, _, E} when is_list(E) -> + {S, Effs ++ E}; + {S, _, E} -> + {S, Effs ++ [E]}; + {S, _} -> + {S, Effs} + end; + (_, Acc) -> + Acc + end, {State0, []}, Log). + + %%% Internal messages_ready(#?MODULE{messages = M, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 8c8f6dd4be..eb34b9db9f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -120,24 +120,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> - RaMachine = ra_machine(NewQ), - ServerIds = [{RaName, Node} || Node <- Nodes], - ClusterName = RaName, TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), - RaConfs = [begin - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), - FName = rabbit_misc:rs(QName), - #{cluster_name => ClusterName, - id => ServerId, - uid => UId, - friendly_name => FName, - metrics_key => QName, - initial_members => ServerIds, - log_init_args => #{uid => UId}, - tick_timeout => TickTimeout, - machine => RaMachine} - end || ServerId <- ServerIds], - + RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) + || ServerId <- members(NewQ)], case ra:start_cluster(RaConfs) of {ok, _, _} -> rabbit_event:notify(queue_created, @@ -325,7 +310,6 @@ reductions(Name) -> recover(Queues) -> [begin {Name, _} = amqqueue:get_pid(Q0), - Nodes = amqqueue:get_quorum_nodes(Q0), case ra:restart_server({Name, node()}) of ok -> % queue was restarted, good @@ -335,10 +319,12 @@ recover(Queues) -> Err1 == name_not_registered -> % queue was never started on this node % so needs to be started from scratch. - Machine = ra_machine(Q0), - RaNodes = [{Name, Node} || Node <- Nodes], - case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of - ok -> ok; + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout), + case ra:start_server(Conf) of + ok -> + ok; Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" " be started ~w", [Name, Err2]), @@ -724,15 +710,17 @@ add_member(VHost, Name, Node, Timeout) -> end. add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> - {RaName, _} = ServerRef = amqqueue:get_pid(Q), + {RaName, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), - QNodes = amqqueue:get_quorum_nodes(Q), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + Members = members(Q), + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + Conf = make_ra_conf(Q, ServerId, TickTimeout), + case ra:start_server(Conf) of ok -> - case ra:add_member(ServerRef, ServerId, Timeout) of + case ra:add_member(Members, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -745,7 +733,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> ok; {timeout, _} -> _ = ra:force_delete_server(ServerId), - _ = ra:remove_member(ServerRef, ServerId), + _ = ra:remove_member(Members, ServerId), {error, timeout}; E -> _ = ra:force_delete_server(ServerId), @@ -773,16 +761,18 @@ delete_member(VHost, Name, Node) -> E end. + delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case amqqueue:get_quorum_nodes(Q) of - [Node] -> + case members(Q) of + [{_, Node}] -> + %% deleting the last member is not allowed {error, last_node}; - _ -> - case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of + Members -> + case ra:leave_and_delete_server(Members, ServerId) of ok -> Fun = fun(Q1) -> amqqueue:set_quorum_nodes( @@ -1146,3 +1136,26 @@ select_quorum_nodes(0, _, Selected) -> select_quorum_nodes(Size, Rest, Selected) -> S = lists:nth(rand:uniform(length(Rest)), Rest), select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). + +%% member with the current leader first +members(Q) when ?amqqueue_is_quorum(Q) -> + {RaName, LeaderNode} = amqqueue:get_pid(Q), + Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)), + [{RaName, N} || N <- [LeaderNode | Nodes]]. + +make_ra_conf(Q, ServerId, TickTimeout) -> + QName = amqqueue:get_name(Q), + RaMachine = ra_machine(Q), + [{ClusterName, _} | _] = Members = members(Q), + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + FName = rabbit_misc:rs(QName), + #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + friendly_name => FName, + metrics_key => QName, + initial_members => Members, + log_init_args => #{uid => UId}, + tick_timeout => TickTimeout, + machine => RaMachine}. + |
