summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_quorum_queue.erl37
-rw-r--r--test/quorum_queue_SUITE.erl28
3 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4fe4d954b9..a83f755e7e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,7 +609,8 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
+state_enter(eol, #state{enqueuers = Enqs,
+ consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
state_enter(_, _) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 429be39067..42ddaa0457 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -296,14 +296,15 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
+ Servers = [{Name, Node} || Node <- QNodes],
+ case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
end,
+ ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?TICK_TIME),
{ok, Msgs};
@@ -314,16 +315,36 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
true ->
%% If all ra nodes were already down, the delete
%% has succeed
- rabbit_core_metrics:queue_deleted(QName),
+ delete_queue_data(QName, ActingUser),
{ok, Msgs};
false ->
- rabbit_misc:protocol_error(
- internal_error,
- "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
- [rabbit_misc:rs(QName), Errs])
+ %% attempt forced deletion of all servers
+ rabbit_log:info(
+ "Could not delete quorum queue '~s', not enough nodes "
+ " online to reach a quorum: ~255p."
+ " Attempting force delete.",
+ [rabbit_misc:rs(QName), Errs]),
+ [begin
+ case catch(ra:delete_server(S)) of
+ ok -> ok;
+ Err ->
+ rabbit_log:warning(
+ "Force delete of ~w failed with: ~w"
+ "This may require manual data clean up~n",
+ [S, Err]),
+ ok
+ end
+ end || S <- Servers],
+ delete_queue_data(QName, ActingUser),
+ {ok, Msgs}
end
end.
+delete_queue_data(QName, ActingUser) ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ ok.
+
+
delete_immediately(Resource, {_Name, _} = QPid) ->
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
{ok, _} = ra:delete_cluster([QPid]),
@@ -450,7 +471,7 @@ requeue(ConsumerTag, MsgIds, QState) ->
cleanup_data_dir() ->
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
- <- rabbit_amqqueue:list_by_type(quorum),
+ <- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 0beecf6c79..dcba910a6a 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1773,22 +1773,28 @@ cleanup_data_dir(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
- [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []),
- DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]),
- ?assert(filelib:is_dir(DataDir)),
+ [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []),
+ [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []),
+ DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]),
+ DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]),
+ ?assert(filelib:is_dir(DataDir1)),
+ ?assert(filelib:is_dir(DataDir2)),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
- ?assertExit({{shutdown,
- {connection_closing, {server_initiated_close, 541, _}}}, _},
- amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
- catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}),
- ?assert(filelib:is_dir(DataDir)),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ %% data dir 1 should be force deleted at this point
+ ?assert(not filelib:is_dir(DataDir1)),
+ ?assert(filelib:is_dir(DataDir2)),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ timer:sleep(2000),
?assertEqual(ok,
- rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir,
- [])),
- ?assert(not filelib:is_dir(DataDir)).
+ rpc:call(Server2, rabbit_quorum_queue, cleanup_data_dir, [])),
+ ?assert(not filelib:is_dir(DataDir2)),
+ ok.
reconnect_consumer_and_publish(Config) ->
[Server | _] = Servers =