summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-02-25 19:06:34 +0300
committerGitHub <noreply@github.com>2020-02-25 19:06:34 +0300
commit5ce141c9f5e50aaa61880ee54aff7f07287d2707 (patch)
treec03ad96a8372b353244f18c76e49ca3b7c05f6b8 /src
parent1cc662cf0d02d34f6f7b71b046f3859b47464e2a (diff)
parent5e99e875363060f759e1391b8f8fa102fd7edcb5 (diff)
downloadrabbitmq-server-git-5ce141c9f5e50aaa61880ee54aff7f07287d2707.tar.gz
Merge pull request #2255 from rabbitmq/qq-remove-member-fix
Split QQ remove member into two operations
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_quorum_queue.erl51
1 files changed, 46 insertions, 5 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 7e9974b27a..972716a396 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -46,6 +46,9 @@
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
-export([is_policy_applicable/2]).
+-export([repair_amqqueue_nodes/1,
+ repair_amqqueue_nodes/2
+ ]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -376,7 +379,38 @@ repair_leader_record(QName, Self) ->
end,
ok.
-
+repair_amqqueue_nodes(VHost, QueueName) ->
+ QName = #resource{virtual_host = VHost, name = QueueName, kind = queue},
+ repair_amqqueue_nodes(QName).
+
+-spec repair_amqqueue_nodes(rabbit_types:r('queue') | amqqueue:amqqueue()) ->
+ ok | repaired.
+repair_amqqueue_nodes(QName = #resource{}) ->
+ {ok, Q0} = rabbit_amqqueue:lookup(QName),
+ repair_amqqueue_nodes(Q0);
+repair_amqqueue_nodes(Q0) ->
+ QName = amqqueue:get_name(Q0),
+ Leader = amqqueue:get_pid(Q0),
+ {ok, Members, _} = ra:members(Leader),
+ RaNodes = [N || {_, N} <- Members],
+ #{nodes := Nodes} = amqqueue:get_type_state(Q0),
+ case lists:sort(RaNodes) =:= lists:sort(Nodes) of
+ true ->
+ %% up to date
+ ok;
+ false ->
+ %% update amqqueue record
+ Fun = fun (Q) ->
+ TS0 = amqqueue:get_type_state(Q),
+ TS = TS0#{nodes => RaNodes},
+ amqqueue:set_type_state(Q, TS)
+ end,
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_amqqueue:update(QName, Fun)
+ end),
+ repaired
+ end.
reductions(Name) ->
try
@@ -899,8 +933,8 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% deleting the last member is not allowed
{error, last_node};
Members ->
- case ra:leave_and_delete_server(Members, ServerId) of
- ok ->
+ case ra:remove_member(Members, ServerId) of
+ {ok, _, _Leader} ->
Fun = fun(Q1) ->
update_type_state(
Q1,
@@ -910,8 +944,15 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
- ok;
- timeout ->
+ case ra:force_delete_server(ServerId) of
+ ok ->
+ ok;
+ {error, _} = Err ->
+ Err;
+ Err ->
+ {error, Err}
+ end;
+ {timeout, _} ->
{error, timeout};
E ->
E