diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 47 |
2 files changed, 40 insertions, 10 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4fe4d954b9..a83f755e7e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,7 +609,8 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); -state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> +state_enter(eol, #state{enqueuers = Enqs, + consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; state_enter(_, _) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 429be39067..deeff4194a 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -80,6 +80,7 @@ ]). -define(TICK_TIME, 1000). %% the ra server tick time +-define(DELETE_TIMEOUT, 5000). %% the ra server tick time %%---------------------------------------------------------------------------- @@ -294,16 +295,19 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + Timeout = ?DELETE_TIMEOUT, Msgs = quorum_messages(Name), - _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of + Servers = [{Name, Node} || Node <- QNodes], + case ra:delete_cluster(Servers, Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive {'DOWN', MRef, process, _, _} -> ok + after Timeout -> + ok = force_delete_queue(Servers) end, + ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?TICK_TIME), {ok, Msgs}; @@ -314,16 +318,41 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, true -> %% If all ra nodes were already down, the delete %% has succeed - rabbit_core_metrics:queue_deleted(QName), + delete_queue_data(QName, ActingUser), {ok, Msgs}; false -> - rabbit_misc:protocol_error( - internal_error, - "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p", - [rabbit_misc:rs(QName), Errs]) + %% attempt forced deletion of all servers + rabbit_log:warning( + "Could not delete quorum queue '~s', not enough nodes " + " online to reach a quorum: ~255p." + " Attempting force delete.", + [rabbit_misc:rs(QName), Errs]), + ok = force_delete_queue(Servers), + delete_queue_data(QName, ActingUser), + {ok, Msgs} end end. + +force_delete_queue(Servers) -> + [begin + case catch(ra:delete_server(S)) of + ok -> ok; + Err -> + rabbit_log:warning( + "Force delete of ~w failed with: ~w" + "This may require manual data clean up~n", + [S, Err]), + ok + end + end || S <- Servers], + ok. + +delete_queue_data(QName, ActingUser) -> + _ = rabbit_amqqueue:internal_delete(QName, ActingUser), + ok. + + delete_immediately(Resource, {_Name, _} = QPid) -> _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER), {ok, _} = ra:delete_cluster([QPid]), @@ -450,7 +479,7 @@ requeue(ConsumerTag, MsgIds, QState) -> cleanup_data_dir() -> Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} - <- rabbit_amqqueue:list_by_type(quorum), + <- rabbit_amqqueue:list_by_type(quorum), lists:member(node(), Nodes)], Registered = ra_directory:list_registered(), _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, |
