summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-06-27 13:56:19 +0100
committerkjnilsson <knilsson@pivotal.io>2019-06-27 14:01:00 +0100
commit2170396194b3c0cbab6146d508f7bd5e669bdc39 (patch)
tree93819f8c48bb51f1e72ebb08afdd2f909e377f23
parent4e0211db31e04330960b19903b1b23c2507f52b5 (diff)
downloadrabbitmq-server-git-2170396194b3c0cbab6146d508f7bd5e669bdc39.tar.gz
Pass all members to synchronous Ra operations
As this will make these more work reliably when leadership is in flux. Also ensure that servers started and added after the initial declare uses the same configuration.
-rw-r--r--src/rabbit_fifo.erl22
-rw-r--r--src/rabbit_quorum_queue.erl75
2 files changed, 65 insertions, 32 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d9566ea8f7..cfb1ac7fda 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -64,7 +64,9 @@
make_credit/4,
make_purge/0,
make_purge_nodes/1,
- make_update_config/1
+ make_update_config/1,
+
+ from_log/2
]).
%% command records representing all the protocol actions that are supported
@@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) ->
[{_, Use}] -> Use
end.
+from_log(Log, State0) ->
+ lists:foldl(
+ fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) ->
+ Meta = Meta0#{index => Idx,
+ term => Term},
+ case apply(Meta, Cmd, S0) of
+ {S, _, E} when is_list(E) ->
+ {S, Effs ++ E};
+ {S, _, E} ->
+ {S, Effs ++ [E]};
+ {S, _} ->
+ {S, Effs}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {State0, []}, Log).
+
+
%%% Internal
messages_ready(#?MODULE{messages = M,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6fc5fcd83f..95c8a56ad4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -119,24 +119,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
- RaMachine = ra_machine(NewQ),
- ServerIds = [{RaName, Node} || Node <- Nodes],
- ClusterName = RaName,
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
- RaConfs = [begin
- UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
- FName = rabbit_misc:rs(QName),
- #{cluster_name => ClusterName,
- id => ServerId,
- uid => UId,
- friendly_name => FName,
- metrics_key => QName,
- initial_members => ServerIds,
- log_init_args => #{uid => UId},
- tick_timeout => TickTimeout,
- machine => RaMachine}
- end || ServerId <- ServerIds],
-
+ RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
+ || ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
rabbit_event:notify(queue_created,
@@ -324,7 +309,6 @@ reductions(Name) ->
recover(Queues) ->
[begin
{Name, _} = amqqueue:get_pid(Q0),
- Nodes = amqqueue:get_quorum_nodes(Q0),
case ra:restart_server({Name, node()}) of
ok ->
% queue was restarted, good
@@ -334,10 +318,12 @@ recover(Queues) ->
Err1 == name_not_registered ->
% queue was never started on this node
% so needs to be started from scratch.
- Machine = ra_machine(Q0),
- RaNodes = [{Name, Node} || Node <- Nodes],
- case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
- ok -> ok;
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval,
+ ?TICK_TIMEOUT),
+ Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout),
+ case ra:start_server(Conf) of
+ ok ->
+ ok;
Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
" be started ~w", [Name, Err2]),
@@ -723,15 +709,17 @@ add_member(VHost, Name, Node) ->
end.
add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
- {RaName, _} = ServerRef = amqqueue:get_pid(Q),
+ {RaName, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
- QNodes = amqqueue:get_quorum_nodes(Q),
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
- case ra:start_server(RaName, ServerId, ra_machine(Q),
- [{RaName, N} || N <- QNodes]) of
+ Members = members(Q),
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval,
+ ?TICK_TIMEOUT),
+ Conf = make_ra_conf(Q, ServerId, TickTimeout),
+ case ra:start_server(Conf) of
ok ->
- case ra:add_member(ServerRef, ServerId) of
+ case ra:add_member(Members, ServerId) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
Q2 = amqqueue:set_quorum_nodes(
@@ -770,16 +758,18 @@ delete_member(VHost, Name, Node) ->
E
end.
+
delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q),
ServerId = {RaName, Node},
- case amqqueue:get_quorum_nodes(Q) of
- [Node] ->
+ case members(Q) of
+ [{_, Node}] ->
+
%% deleting the last member is not allowed
{error, last_node};
- _ ->
- case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of
+ Members ->
+ case ra:leave_and_delete_server(Members, ServerId) of
ok ->
Fun = fun(Q1) ->
amqqueue:set_quorum_nodes(
@@ -1143,3 +1133,26 @@ select_quorum_nodes(0, _, Selected) ->
select_quorum_nodes(Size, Rest, Selected) ->
S = lists:nth(rand:uniform(length(Rest)), Rest),
select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
+
+%% member with the current leader first
+members(Q) when ?amqqueue_is_quorum(Q) ->
+ {RaName, LeaderNode} = amqqueue:get_pid(Q),
+ Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
+ [{RaName, N} || N <- [LeaderNode | Nodes]].
+
+make_ra_conf(Q, ServerId, TickTimeout) ->
+ QName = amqqueue:get_name(Q),
+ RaMachine = ra_machine(Q),
+ [{ClusterName, _} | _] = Members = members(Q),
+ UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
+ FName = rabbit_misc:rs(QName),
+ #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ friendly_name => FName,
+ metrics_key => QName,
+ initial_members => Members,
+ log_init_args => #{uid => UId},
+ tick_timeout => TickTimeout,
+ machine => RaMachine}.
+