diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 37 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 28 |
3 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4fe4d954b9..a83f755e7e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,7 +609,8 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); -state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> +state_enter(eol, #state{enqueuers = Enqs, + consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; state_enter(_, _) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 429be39067..42ddaa0457 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -296,14 +296,15 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, Msgs = quorum_messages(Name), - _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of + Servers = [{Name, Node} || Node <- QNodes], + case ra:delete_cluster(Servers, Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive {'DOWN', MRef, process, _, _} -> ok end, + ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?TICK_TIME), {ok, Msgs}; @@ -314,16 +315,36 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, true -> %% If all ra nodes were already down, the delete %% has succeed - rabbit_core_metrics:queue_deleted(QName), + delete_queue_data(QName, ActingUser), {ok, Msgs}; false -> - rabbit_misc:protocol_error( - internal_error, - "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p", - [rabbit_misc:rs(QName), Errs]) + %% attempt forced deletion of all servers + rabbit_log:info( + "Could not delete quorum queue '~s', not enough nodes " + " online to reach a quorum: ~255p." + " Attempting force delete.", + [rabbit_misc:rs(QName), Errs]), + [begin + case catch(ra:delete_server(S)) of + ok -> ok; + Err -> + rabbit_log:warning( + "Force delete of ~w failed with: ~w" + "This may require manual data clean up~n", + [S, Err]), + ok + end + end || S <- Servers], + delete_queue_data(QName, ActingUser), + {ok, Msgs} end end. +delete_queue_data(QName, ActingUser) -> + _ = rabbit_amqqueue:internal_delete(QName, ActingUser), + ok. + + delete_immediately(Resource, {_Name, _} = QPid) -> _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER), {ok, _} = ra:delete_cluster([QPid]), @@ -450,7 +471,7 @@ requeue(ConsumerTag, MsgIds, QState) -> cleanup_data_dir() -> Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} - <- rabbit_amqqueue:list_by_type(quorum), + <- rabbit_amqqueue:list_by_type(quorum), lists:member(node(), Nodes)], Registered = ra_directory:list_registered(), _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 0beecf6c79..dcba910a6a 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1773,22 +1773,28 @@ cleanup_data_dir(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), timer:sleep(100), - [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []), - DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]), - ?assert(filelib:is_dir(DataDir)), + [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []), + [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []), + DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]), + DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]), + ?assert(filelib:is_dir(DataDir1)), + ?assert(filelib:is_dir(DataDir2)), ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), - ?assertExit({{shutdown, - {connection_closing, {server_initiated_close, 541, _}}}, _}, - amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), - catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}), - ?assert(filelib:is_dir(DataDir)), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + %% data dir 1 should be force deleted at this point + ?assert(not filelib:is_dir(DataDir1)), + ?assert(filelib:is_dir(DataDir2)), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + timer:sleep(2000), ?assertEqual(ok, - rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir, - [])), - ?assert(not filelib:is_dir(DataDir)). + rpc:call(Server2, rabbit_quorum_queue, cleanup_data_dir, [])), + ?assert(not filelib:is_dir(DataDir2)), + ok. reconnect_consumer_and_publish(Config) -> [Server | _] = Servers = |
