summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl22
-rw-r--r--src/rabbit_quorum_queue.erl77
2 files changed, 66 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d9566ea8f7..cfb1ac7fda 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -64,7 +64,9 @@
make_credit/4,
make_purge/0,
make_purge_nodes/1,
- make_update_config/1
+ make_update_config/1,
+
+ from_log/2
]).
%% command records representing all the protocol actions that are supported
@@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) ->
[{_, Use}] -> Use
end.
+from_log(Log, State0) ->
+ lists:foldl(
+ fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) ->
+ Meta = Meta0#{index => Idx,
+ term => Term},
+ case apply(Meta, Cmd, S0) of
+ {S, _, E} when is_list(E) ->
+ {S, Effs ++ E};
+ {S, _, E} ->
+ {S, Effs ++ [E]};
+ {S, _} ->
+ {S, Effs}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {State0, []}, Log).
+
+
%%% Internal
messages_ready(#?MODULE{messages = M,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 8c8f6dd4be..eb34b9db9f 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -120,24 +120,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
- RaMachine = ra_machine(NewQ),
- ServerIds = [{RaName, Node} || Node <- Nodes],
- ClusterName = RaName,
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
- RaConfs = [begin
- UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
- FName = rabbit_misc:rs(QName),
- #{cluster_name => ClusterName,
- id => ServerId,
- uid => UId,
- friendly_name => FName,
- metrics_key => QName,
- initial_members => ServerIds,
- log_init_args => #{uid => UId},
- tick_timeout => TickTimeout,
- machine => RaMachine}
- end || ServerId <- ServerIds],
-
+ RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
+ || ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
rabbit_event:notify(queue_created,
@@ -325,7 +310,6 @@ reductions(Name) ->
recover(Queues) ->
[begin
{Name, _} = amqqueue:get_pid(Q0),
- Nodes = amqqueue:get_quorum_nodes(Q0),
case ra:restart_server({Name, node()}) of
ok ->
% queue was restarted, good
@@ -335,10 +319,12 @@ recover(Queues) ->
Err1 == name_not_registered ->
% queue was never started on this node
% so needs to be started from scratch.
- Machine = ra_machine(Q0),
- RaNodes = [{Name, Node} || Node <- Nodes],
- case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
- ok -> ok;
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval,
+ ?TICK_TIMEOUT),
+ Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout),
+ case ra:start_server(Conf) of
+ ok ->
+ ok;
Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
" be started ~w", [Name, Err2]),
@@ -724,15 +710,17 @@ add_member(VHost, Name, Node, Timeout) ->
end.
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
- {RaName, _} = ServerRef = amqqueue:get_pid(Q),
+ {RaName, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
- QNodes = amqqueue:get_quorum_nodes(Q),
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
- case ra:start_server(RaName, ServerId, ra_machine(Q),
- [{RaName, N} || N <- QNodes]) of
+ Members = members(Q),
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval,
+ ?TICK_TIMEOUT),
+ Conf = make_ra_conf(Q, ServerId, TickTimeout),
+ case ra:start_server(Conf) of
ok ->
- case ra:add_member(ServerRef, ServerId, Timeout) of
+ case ra:add_member(Members, ServerId, Timeout) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
Q2 = amqqueue:set_quorum_nodes(
@@ -745,7 +733,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
ok;
{timeout, _} ->
_ = ra:force_delete_server(ServerId),
- _ = ra:remove_member(ServerRef, ServerId),
+ _ = ra:remove_member(Members, ServerId),
{error, timeout};
E ->
_ = ra:force_delete_server(ServerId),
@@ -773,16 +761,18 @@ delete_member(VHost, Name, Node) ->
E
end.
+
delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q),
ServerId = {RaName, Node},
- case amqqueue:get_quorum_nodes(Q) of
- [Node] ->
+ case members(Q) of
+ [{_, Node}] ->
+
%% deleting the last member is not allowed
{error, last_node};
- _ ->
- case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of
+ Members ->
+ case ra:leave_and_delete_server(Members, ServerId) of
ok ->
Fun = fun(Q1) ->
amqqueue:set_quorum_nodes(
@@ -1146,3 +1136,26 @@ select_quorum_nodes(0, _, Selected) ->
select_quorum_nodes(Size, Rest, Selected) ->
S = lists:nth(rand:uniform(length(Rest)), Rest),
select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
+
+%% member with the current leader first
+members(Q) when ?amqqueue_is_quorum(Q) ->
+ {RaName, LeaderNode} = amqqueue:get_pid(Q),
+ Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
+ [{RaName, N} || N <- [LeaderNode | Nodes]].
+
+make_ra_conf(Q, ServerId, TickTimeout) ->
+ QName = amqqueue:get_name(Q),
+ RaMachine = ra_machine(Q),
+ [{ClusterName, _} | _] = Members = members(Q),
+ UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
+ FName = rabbit_misc:rs(QName),
+ #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ friendly_name => FName,
+ metrics_key => QName,
+ initial_members => Members,
+ log_init_args => #{uid => UId},
+ tick_timeout => TickTimeout,
+ machine => RaMachine}.
+