diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-02-24 17:01:17 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-02-24 17:01:17 +0000 |
| commit | cc01c0025278e209b980dd3eb8a5971932a2498e (patch) | |
| tree | dd196186ede0e8e5cc4ed1abe9f44c6b763d7da9 /src | |
| parent | 227e69480c22fdea58d5c388ec98ecc8ecd4e8ec (diff) | |
| download | rabbitmq-server-git-cc01c0025278e209b980dd3eb8a5971932a2498e.tar.gz | |
Split QQ remove member into two operations
To avoid not updating the amqqueue record if the server data delete part
failed but the Raft cluster was still updated. Also add a function to
repair the quorum queue queue type nodes in the amqqrecord if it should
diverge from what the Ra cluster thinks.
[#171434221]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 51 |
1 files changed, 46 insertions, 5 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 7e9974b27a..972716a396 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -46,6 +46,9 @@ filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). -export([is_policy_applicable/2]). +-export([repair_amqqueue_nodes/1, + repair_amqqueue_nodes/2 + ]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -376,7 +379,38 @@ repair_leader_record(QName, Self) -> end, ok. - +repair_amqqueue_nodes(VHost, QueueName) -> + QName = #resource{virtual_host = VHost, name = QueueName, kind = queue}, + repair_amqqueue_nodes(QName). + +-spec repair_amqqueue_nodes(rabbit_types:r('queue') | amqqueue:amqqueue()) -> + ok | repaired. +repair_amqqueue_nodes(QName = #resource{}) -> + {ok, Q0} = rabbit_amqqueue:lookup(QName), + repair_amqqueue_nodes(Q0); +repair_amqqueue_nodes(Q0) -> + QName = amqqueue:get_name(Q0), + Leader = amqqueue:get_pid(Q0), + {ok, Members, _} = ra:members(Leader), + RaNodes = [N || {_, N} <- Members], + #{nodes := Nodes} = amqqueue:get_type_state(Q0), + case lists:sort(RaNodes) =:= lists:sort(Nodes) of + true -> + %% up to date + ok; + false -> + %% update amqqueue record + Fun = fun (Q) -> + TS0 = amqqueue:get_type_state(Q), + TS = TS0#{nodes => RaNodes}, + amqqueue:set_type_state(Q, TS) + end, + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_amqqueue:update(QName, Fun) + end), + repaired + end. reductions(Name) -> try @@ -899,8 +933,8 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% deleting the last member is not allowed {error, last_node}; Members -> - case ra:leave_and_delete_server(Members, ServerId) of - ok -> + case ra:remove_member(Members, ServerId) of + {ok, _, _Leader} -> Fun = fun(Q1) -> update_type_state( Q1, @@ -910,8 +944,15 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), - ok; - timeout -> + case ra:force_delete_server(ServerId) of + ok -> + ok; + {error, _} = Err -> + Err; + Err -> + {error, Err} + end; + {timeout, _} -> {error, timeout}; E -> E |
