diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-03-15 16:23:12 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-03-15 16:23:12 +0300 |
| commit | e7d173c6c9d3ed94dfa0047f439864f111459936 (patch) | |
| tree | 696c61a5bf3ed0ab926f4fe2ac92d51d0c27ea25 | |
| parent | 6c03f0b2b206a7b8749470e7c1c430ae86a787dd (diff) | |
| parent | b30ae2f90c27c52c4e9db5f3845d37e441c6e371 (diff) | |
| download | rabbitmq-server-git-e7d173c6c9d3ed94dfa0047f439864f111459936.tar.gz | |
Merge pull request #1549 from rabbitmq/delete-dead-queue
Force-delete queues that have no promotable master (or candidates)
| -rw-r--r-- | src/rabbit_amqqueue.erl | 51 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 40 |
2 files changed, 89 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ce73460603..b9f1ba3631 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -835,8 +835,55 @@ delete_immediately(QPids) -> [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], ok. -delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) -> - delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}). +delete(Q, IfUnused, IfEmpty, ActingUser) -> + case wait_for_promoted_or_stopped(Q) of + {promoted, #amqqueue{pid = QPid}} -> + delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}); + {stopped, Q1} -> + #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name, + case IfEmpty of + true -> + rabbit_log:error("Queue ~s in vhost ~s has its master node down and " + "no mirrors available or eligible for promotion. " + "The queue may be non-empty. " + "Refusing to force-delete.", + [Name, Vhost]), + {error, not_empty}; + false -> + rabbit_log:warning("Queue ~s in vhost ~s has its master node is down and " + "no mirrors available or eligible for promotion. " + "Forcing queue deletion.", + [Name, Vhost]), + delete_crashed_internal(Q1, ActingUser), + {ok, 0} + end; + {error, not_found} -> + %% Assume the queue was deleted + {ok, 0} + end. + +-spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}. +wait_for_promoted_or_stopped(#amqqueue{name = QName}) -> + case lookup(QName) of + {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} -> + case rabbit_mnesia:is_process_alive(QPid) of + true -> {promoted, Q}; + false -> + case lists:any(fun(Pid) -> + rabbit_mnesia:is_process_alive(Pid) + end, SPids) of + %% There is a live slave. May be promoted + true -> + timer:sleep(100), + wait_for_promoted_or_stopped(Q); + %% All slave pids are stopped. + %% No process left for the queue + false -> {stopped, Q} + end + end; + {error, not_found} -> + {error, not_found} + end. delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index c70f23c066..7629d2264a 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -57,6 +57,7 @@ groups() -> {clustered, [], [ {cluster_size_2, [], [ vhost_deletion, + force_delete_if_no_master, promote_on_shutdown, slave_recovers_after_vhost_failure, slave_recovers_after_vhost_down_an_up, @@ -247,6 +248,45 @@ vhost_deletion(Config) -> ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]), ok. +force_delete_if_no_master(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>, + <<"all">>), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + [begin + amqp_channel:call(ACh, #'queue.declare'{queue = Q, + durable = true}), + rabbit_ct_client_helpers:publish(ACh, Q, 10) + end || Q <- [<<"ha.nopromote.test1">>, <<"ha.nopromote.test2">>]], + ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:stop_node(Config, A), + + BCh = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh, #'queue.declare'{queue = <<"ha.nopromote.test1">>, + durable = true})), + + BCh1 = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh1, #'queue.declare'{queue = <<"ha.nopromote.test2">>, + durable = true})), + BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.delete_ok'{} = + amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test1">>}), + %% Delete with if_empty will fail, since we don't know if the queue is empty + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test2">>, + if_empty = true})), + BCh3 = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.delete_ok'{} = + amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}), + ok. + promote_on_shutdown(Config) -> [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>, |
