diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-11-17 05:31:33 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-11-17 05:31:33 +0300 |
| commit | 7c1b567ae4ccd7f39eff7c6ec16c3b83bc81889d (patch) | |
| tree | f1f3e934197a6c03cebef96fe739bdc1b65ae933 /src | |
| parent | a91a4fa9024be5cd015455417feefc622a04ace5 (diff) | |
| parent | 4faedab6b56072c75a363df02c930bad3b2c8263 (diff) | |
| download | rabbitmq-server-git-7c1b567ae4ccd7f39eff7c6ec16c3b83bc81889d.tar.gz | |
Merge pull request #1761 from rabbitmq/quorum-data-cleanup
Data cleanup for quorum queues
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 32 |
2 files changed, 39 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c4386ba57..40b74c04ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,7 @@ emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). -export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -124,6 +125,7 @@ -spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. -spec list_names() -> [rabbit_amqqueue:name()]. -spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list_by_type(atom()) -> [rabbit_types:amqqueue()]. -spec info_keys() -> rabbit_types:info_keys(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> @@ -741,6 +743,15 @@ list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. +list_by_type(Type) -> + {atomic, Qs} = + mnesia:sync_transaction( + fun () -> + mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_', type = Type}, read) + end), + Qs. + list_local_followers() -> [ Q#amqqueue.name || #amqqueue{state = State, type = quorum, pid = {_, Leader}, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 795465855b..311894dee6 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -240,7 +241,7 @@ recover(Queues) -> case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of ok -> ok; Err -> - rabbit_log:warning("recover: Quorum queue ~w could not" + rabbit_log:warning("recover: quorum queue ~w could not" " be started ~w", [Name, Err]), ok end; @@ -251,7 +252,7 @@ recover(Queues) -> ok; Err -> %% catch all clause to avoid causing the vhost not to start - rabbit_log:warning("recover: Quorum queue ~w could not be " + rabbit_log:warning("recover: quorum queue ~w could not be " "restarted ~w", [Name, Err]), ok end, @@ -281,7 +282,7 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q end, rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]), {ok, Msgs}; - {error, {no_more_nodes_to_try, Errs}} = Err -> + {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; (_) -> false end, Errs) of @@ -291,7 +292,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q rabbit_core_metrics:queue_deleted(QName), {ok, Msgs}; false -> - Err + rabbit_misc:protocol_error( + internal_error, + "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p", + [rabbit_misc:rs(QName), Errs]) end end. @@ -386,6 +390,26 @@ purge(Node) -> requeue(ConsumerTag, MsgIds, FState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +cleanup_data_dir() -> + Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} + <- rabbit_amqqueue:list_by_type(quorum), + lists:member(node(), Nodes)], + Registered = ra_directory:list_registered(), + [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], + ok. + +maybe_delete_data_dir(UId) -> + Dir = ra_env:server_data_dir(UId), + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir), + ra_directory:unregister_name(UId); + _ -> + ok + end. + cluster_state(Name) -> case whereis(Name) of undefined -> down; |
