summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2018-03-14 11:56:12 +0000
committerMichael Klishin <michael@clojurewerkz.org>2018-03-15 21:55:09 +0300
commit97db801ae37b610b69c232e2333daa3f6ce75b61 (patch)
tree0599d139a3cbe4ae0c4256e19dea6c0a59518677
parent39f446e0ce40b93949214ae20e9b1a9fbbb2df25 (diff)
downloadrabbitmq-server-git-97db801ae37b610b69c232e2333daa3f6ce75b61.tar.gz
Force-delete queues, which have no live master or slave processes.
Fixes #1501 [#155801556] If a queue is configured to not be promoted (via ha-promote-on-shutdown: when-synced) queue.delete can hang. Make it check for process existense first and force-delete if no master of slave processes are running. Do not force-delete if if_empty is set, since there is no way to check that the queue is empty. (cherry picked from commit 3e7bd564bda36c1bbb9e3b59b61509d0982a88ec)
-rw-r--r--src/rabbit_amqqueue.erl49
-rw-r--r--test/dynamic_ha_SUITE.erl40
2 files changed, 87 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ce73460603..a0b49c4812 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -835,8 +835,53 @@ delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
ok.
-delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
- delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
+delete(Q, IfUnused, IfEmpty, ActingUser) ->
+ case wait_for_promoted_or_stopped(Q) of
+ {promoted, Q1 = #amqqueue{pid = QPid}} ->
+ delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]});
+ {stopped, Q1} ->
+ #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name,
+ case IfEmpty of
+ true ->
+ rabbit_log:error("Queue ~s on vhost ~s master node is down. "
+ "Unable to check if the queue is empty. "
+ "Delete failed",
+ [Name, Vhost]),
+ {error, not_empty};
+ false ->
+ rabbit_log:warning("Queue ~s on vhost ~s master node is down. "
+ "Force-deleting the queue",
+ [Name, Vhost]),
+ delete_crashed_internal(Q1, ActingUser),
+ {ok, 0}
+ end;
+ {error, not_found} ->
+ %% Assume the queue was deleted
+ {ok, 0}
+ end.
+
+-spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}.
+wait_for_promoted_or_stopped(#amqqueue{name = QName}) ->
+ case lookup(QName) of
+ {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} ->
+ case rabbit_mnesia:is_process_alive(QPid) of
+ true -> {promoted, Q};
+ false ->
+ case lists:any(fun(Pid) ->
+ rabbit_mnesia:is_process_alive(Pid)
+ end, SPids) of
+ %% There is a live slave. May be promoted
+ true ->
+ timer:sleep(100),
+ wait_for_promoted_or_stopped(Q);
+ %% All slave pids are stopped.
+ %% No process left for the queue
+ false -> {stopped, Q}
+ end
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index c70f23c066..7629d2264a 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -57,6 +57,7 @@ groups() ->
{clustered, [], [
{cluster_size_2, [], [
vhost_deletion,
+ force_delete_if_no_master,
promote_on_shutdown,
slave_recovers_after_vhost_failure,
slave_recovers_after_vhost_down_an_up,
@@ -247,6 +248,45 @@ vhost_deletion(Config) ->
ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
ok.
+force_delete_if_no_master(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
+ <<"all">>),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ [begin
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q,
+ durable = true}),
+ rabbit_ct_client_helpers:publish(ACh, Q, 10)
+ end || Q <- [<<"ha.nopromote.test1">>, <<"ha.nopromote.test2">>]],
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.nopromote.test1">>,
+ durable = true})),
+
+ BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh1, #'queue.declare'{queue = <<"ha.nopromote.test2">>,
+ durable = true})),
+ BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.delete_ok'{} =
+ amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test1">>}),
+ %% Delete with if_empty will fail, since we don't know if the queue is empty
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test2">>,
+ if_empty = true})),
+ BCh3 = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.delete_ok'{} =
+ amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
+ ok.
+
promote_on_shutdown(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,