diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 51 |
1 files changed, 46 insertions, 5 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 7e9974b27a..972716a396 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -46,6 +46,9 @@ filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). -export([is_policy_applicable/2]). +-export([repair_amqqueue_nodes/1, + repair_amqqueue_nodes/2 + ]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -376,7 +379,38 @@ repair_leader_record(QName, Self) -> end, ok. - +repair_amqqueue_nodes(VHost, QueueName) -> + QName = #resource{virtual_host = VHost, name = QueueName, kind = queue}, + repair_amqqueue_nodes(QName). + +-spec repair_amqqueue_nodes(rabbit_types:r('queue') | amqqueue:amqqueue()) -> + ok | repaired. +repair_amqqueue_nodes(QName = #resource{}) -> + {ok, Q0} = rabbit_amqqueue:lookup(QName), + repair_amqqueue_nodes(Q0); +repair_amqqueue_nodes(Q0) -> + QName = amqqueue:get_name(Q0), + Leader = amqqueue:get_pid(Q0), + {ok, Members, _} = ra:members(Leader), + RaNodes = [N || {_, N} <- Members], + #{nodes := Nodes} = amqqueue:get_type_state(Q0), + case lists:sort(RaNodes) =:= lists:sort(Nodes) of + true -> + %% up to date + ok; + false -> + %% update amqqueue record + Fun = fun (Q) -> + TS0 = amqqueue:get_type_state(Q), + TS = TS0#{nodes => RaNodes}, + amqqueue:set_type_state(Q, TS) + end, + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_amqqueue:update(QName, Fun) + end), + repaired + end. reductions(Name) -> try @@ -899,8 +933,8 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% deleting the last member is not allowed {error, last_node}; Members -> - case ra:leave_and_delete_server(Members, ServerId) of - ok -> + case ra:remove_member(Members, ServerId) of + {ok, _, _Leader} -> Fun = fun(Q1) -> update_type_state( Q1, @@ -910,8 +944,15 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), - ok; - timeout -> + case ra:force_delete_server(ServerId) of + ok -> + ok; + {error, _} = Err -> + Err; + Err -> + {error, Err} + end; + {timeout, _} -> {error, timeout}; E -> E |
