summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-10 11:55:01 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-15 15:26:26 +0000
commit014319f84004186e080894b75c9357594d7a18d8 (patch)
tree74c76ea29704fdb2940ed197ffa8affd7b8aa103 /src
parent45632289309e80a38348ed936961330a4ae0763b (diff)
downloadrabbitmq-server-git-014319f84004186e080894b75c9357594d7a18d8.tar.gz
Improve quorum queue deletion
Only delete the amqqueue record after successful Ra cluster deletion. Force delete Ra servers when not able to achieve consensus.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_quorum_queue.erl37
2 files changed, 31 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4fe4d954b9..a83f755e7e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,7 +609,8 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
+state_enter(eol, #state{enqueuers = Enqs,
+ consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
state_enter(_, _) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 429be39067..42ddaa0457 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -296,14 +296,15 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
+ Servers = [{Name, Node} || Node <- QNodes],
+ case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
end,
+ ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?TICK_TIME),
{ok, Msgs};
@@ -314,16 +315,36 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
true ->
%% If all ra nodes were already down, the delete
%% has succeed
- rabbit_core_metrics:queue_deleted(QName),
+ delete_queue_data(QName, ActingUser),
{ok, Msgs};
false ->
- rabbit_misc:protocol_error(
- internal_error,
- "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
- [rabbit_misc:rs(QName), Errs])
+ %% attempt forced deletion of all servers
+ rabbit_log:info(
+ "Could not delete quorum queue '~s', not enough nodes "
+ " online to reach a quorum: ~255p."
+ " Attempting force delete.",
+ [rabbit_misc:rs(QName), Errs]),
+ [begin
+ case catch(ra:delete_server(S)) of
+ ok -> ok;
+ Err ->
+ rabbit_log:warning(
+ "Force delete of ~w failed with: ~w"
+ "This may require manual data clean up~n",
+ [S, Err]),
+ ok
+ end
+ end || S <- Servers],
+ delete_queue_data(QName, ActingUser),
+ {ok, Msgs}
end
end.
+delete_queue_data(QName, ActingUser) ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ ok.
+
+
delete_immediately(Resource, {_Name, _} = QPid) ->
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
{ok, _} = ra:delete_cluster([QPid]),
@@ -450,7 +471,7 @@ requeue(ConsumerTag, MsgIds, QState) ->
cleanup_data_dir() ->
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
- <- rabbit_amqqueue:list_by_type(quorum),
+ <- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,