diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 49 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 40 |
2 files changed, 87 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ce73460603..a0b49c4812 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -835,8 +835,53 @@ delete_immediately(QPids) -> [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], ok. -delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) -> - delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}). +delete(Q, IfUnused, IfEmpty, ActingUser) -> + case wait_for_promoted_or_stopped(Q) of + {promoted, Q1 = #amqqueue{pid = QPid}} -> + delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}); + {stopped, Q1} -> + #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name, + case IfEmpty of + true -> + rabbit_log:error("Queue ~s on vhost ~s master node is down. " + "Unable to check if the queue is empty. " + "Delete failed", + [Name, Vhost]), + {error, not_empty}; + false -> + rabbit_log:warning("Queue ~s on vhost ~s master node is down. " + "Force-deleting the queue", + [Name, Vhost]), + delete_crashed_internal(Q1, ActingUser), + {ok, 0} + end; + {error, not_found} -> + %% Assume the queue was deleted + {ok, 0} + end. + +-spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}. +wait_for_promoted_or_stopped(#amqqueue{name = QName}) -> + case lookup(QName) of + {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} -> + case rabbit_mnesia:is_process_alive(QPid) of + true -> {promoted, Q}; + false -> + case lists:any(fun(Pid) -> + rabbit_mnesia:is_process_alive(Pid) + end, SPids) of + %% There is a live slave. May be promoted + true -> + timer:sleep(100), + wait_for_promoted_or_stopped(Q); + %% All slave pids are stopped. + %% No process left for the queue + false -> {stopped, Q} + end + end; + {error, not_found} -> + {error, not_found} + end. delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index c70f23c066..7629d2264a 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -57,6 +57,7 @@ groups() -> {clustered, [], [ {cluster_size_2, [], [ vhost_deletion, + force_delete_if_no_master, promote_on_shutdown, slave_recovers_after_vhost_failure, slave_recovers_after_vhost_down_an_up, @@ -247,6 +248,45 @@ vhost_deletion(Config) -> ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]), ok. +force_delete_if_no_master(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>, + <<"all">>), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + [begin + amqp_channel:call(ACh, #'queue.declare'{queue = Q, + durable = true}), + rabbit_ct_client_helpers:publish(ACh, Q, 10) + end || Q <- [<<"ha.nopromote.test1">>, <<"ha.nopromote.test2">>]], + ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:stop_node(Config, A), + + BCh = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh, #'queue.declare'{queue = <<"ha.nopromote.test1">>, + durable = true})), + + BCh1 = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh1, #'queue.declare'{queue = <<"ha.nopromote.test2">>, + durable = true})), + BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.delete_ok'{} = + amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test1">>}), + %% Delete with if_empty will fail, since we don't know if the queue is empty + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call(BCh2, #'queue.delete'{queue = <<"ha.nopromote.test2">>, + if_empty = true})), + BCh3 = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.delete_ok'{} = + amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}), + ok. + promote_on_shutdown(Config) -> [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>, |
