summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-03-15 16:23:12 +0300
committerGitHub <noreply@github.com>2018-03-15 16:23:12 +0300
commite7d173c6c9d3ed94dfa0047f439864f111459936 (patch)
tree696c61a5bf3ed0ab926f4fe2ac92d51d0c27ea25
parent6c03f0b2b206a7b8749470e7c1c430ae86a787dd (diff)
parentb30ae2f90c27c52c4e9db5f3845d37e441c6e371 (diff)
downloadrabbitmq-server-git-e7d173c6c9d3ed94dfa0047f439864f111459936.tar.gz
Merge pull request #1549 from rabbitmq/delete-dead-queue
Force-delete queues that have no promotable master (or candidates)
-rw-r--r--src/rabbit_amqqueue.erl51
-rw-r--r--test/dynamic_ha_SUITE.erl40
2 files changed, 89 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ce73460603..b9f1ba3631 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -835,8 +835,55 @@ delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
ok.
-delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
- delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
+delete(Q, IfUnused, IfEmpty, ActingUser) ->
+ case wait_for_promoted_or_stopped(Q) of
+ {promoted, #amqqueue{pid = QPid}} ->
+ delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]});
+ {stopped, Q1} ->
+ #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name,
+ case IfEmpty of
+ true ->
+ rabbit_log:error("Queue ~s in vhost ~s has its master node down and "
+ "no mirrors available or eligible for promotion. "
+ "The queue may be non-empty. "
+ "Refusing to force-delete.",
+ [Name, Vhost]),
+ {error, not_empty};
+ false ->
+ rabbit_log:warning("Queue ~s in vhost ~s has its master node is down and "
+ "no mirrors available or eligible for promotion. "
+ "Forcing queue deletion.",
+ [Name, Vhost]),
+ delete_crashed_internal(Q1, ActingUser),
+ {ok, 0}
+ end;
+ {error, not_found} ->
+ %% Assume the queue was deleted
+ {ok, 0}
+ end.
+
+-spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}.
+wait_for_promoted_or_stopped(#amqqueue{name = QName}) ->
+ case lookup(QName) of
+ {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} ->
+ case rabbit_mnesia:is_process_alive(QPid) of
+ true -> {promoted, Q};
+ false ->
+ case lists:any(fun(Pid) ->
+ rabbit_mnesia:is_process_alive(Pid)
+ end, SPids) of
+ %% There is a live slave. May be promoted
+ true ->
+ timer:sleep(100),
+ wait_for_promoted_or_stopped(Q);
+ %% All slave pids are stopped.
+ %% No process left for the queue
+ false -> {stopped, Q}
+ end
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index c70f23c066..7629d2264a 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -57,6 +57,7 @@ groups() ->
{clustered, [], [
{cluster_size_2, [], [
vhost_deletion,
+ force_delete_if_no_master,
promote_on_shutdown,
slave_recovers_after_vhost_failure,
slave_recovers_after_vhost_down_an_up,
@@ -247,6 +248,45 @@ vhost_deletion(Config) ->
ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
ok.
+force_delete_if_no_master(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
+ <<"all">>),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ [begin
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q,
+ durable = true}),
+ rabbit_ct_client_helpers:publish(ACh, Q, 10)
+ end || Q <- [<<"ha.nopromote.test1">>, <<"ha.nopromote.test2">>]],
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.nopromote.test1">>,
+ durable = true})),
+
+ BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh1, #'queue.declare'{queue = <<"ha.nopromote.test2">>,
+ durable = true})),
+ BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.delete_ok'{} =
+ amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test1">>}),
+ %% Delete with if_empty will fail, since we don't know if the queue is empty
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test2">>,
+ if_empty = true})),
+ BCh3 = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.delete_ok'{} =
+ amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
+ ok.
+
promote_on_shutdown(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,