summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-01-16 03:37:05 +0300
committerGitHub <noreply@github.com>2019-01-16 03:37:05 +0300
commit488d55c9885087f706ea5860e4c332403e62f7c6 (patch)
tree3cf0d78a222809dfb3296560b03745d27b90775b /src
parent46d995d1d9a13f3ba9705bbdaa6dc068814dd03c (diff)
parentbddfe15cb4ddbcb2c9d2d05ce99bfd5ad6fc03fc (diff)
downloadrabbitmq-server-git-488d55c9885087f706ea5860e4c332403e62f7c6.tar.gz
Merge pull request #1823 from rabbitmq/delete_quorum_queue_fixes
Improve quorum queue deletion
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_quorum_queue.erl47
2 files changed, 40 insertions, 10 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4fe4d954b9..a83f755e7e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,7 +609,8 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
+state_enter(eol, #state{enqueuers = Enqs,
+ consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
state_enter(_, _) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 429be39067..deeff4194a 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -80,6 +80,7 @@
]).
-define(TICK_TIME, 1000). %% the ra server tick time
+-define(DELETE_TIMEOUT, 5000). %% the ra server tick time
%%----------------------------------------------------------------------------
@@ -294,16 +295,19 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ Timeout = ?DELETE_TIMEOUT,
Msgs = quorum_messages(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
+ Servers = [{Name, Node} || Node <- QNodes],
+ case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
+ after Timeout ->
+ ok = force_delete_queue(Servers)
end,
+ ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?TICK_TIME),
{ok, Msgs};
@@ -314,16 +318,41 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
true ->
%% If all ra nodes were already down, the delete
%% has succeed
- rabbit_core_metrics:queue_deleted(QName),
+ delete_queue_data(QName, ActingUser),
{ok, Msgs};
false ->
- rabbit_misc:protocol_error(
- internal_error,
- "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
- [rabbit_misc:rs(QName), Errs])
+ %% attempt forced deletion of all servers
+ rabbit_log:warning(
+ "Could not delete quorum queue '~s', not enough nodes "
+ " online to reach a quorum: ~255p."
+ " Attempting force delete.",
+ [rabbit_misc:rs(QName), Errs]),
+ ok = force_delete_queue(Servers),
+ delete_queue_data(QName, ActingUser),
+ {ok, Msgs}
end
end.
+
+force_delete_queue(Servers) ->
+ [begin
+ case catch(ra:delete_server(S)) of
+ ok -> ok;
+ Err ->
+ rabbit_log:warning(
+ "Force delete of ~w failed with: ~w"
+ "This may require manual data clean up~n",
+ [S, Err]),
+ ok
+ end
+ end || S <- Servers],
+ ok.
+
+delete_queue_data(QName, ActingUser) ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ ok.
+
+
delete_immediately(Resource, {_Name, _} = QPid) ->
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
{ok, _} = ra:delete_cluster([QPid]),
@@ -450,7 +479,7 @@ requeue(ConsumerTag, MsgIds, QState) ->
cleanup_data_dir() ->
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
- <- rabbit_amqqueue:list_by_type(quorum),
+ <- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,