diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 18 |
2 files changed, 30 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b4195866ea..467cd998bc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,8 @@ -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). -export([on_node_up/1, on_node_down/1]). -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). --export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). +-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). +-export([is_mirrored/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. -export([pid_of/1, pid_of/2]). @@ -919,6 +920,11 @@ cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mi is_mirrored(Q) -> rabbit_mirror_queue_misc:is_mirrored(Q). +is_dead_exclusive(#amqqueue{exclusive_owner = none}) -> + false; +is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) -> + not rabbit_mnesia:is_process_alive(Pid). + on_node_up(Node) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -963,11 +969,12 @@ on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = qlc:e(qlc:q([{QName, delete_queue(QName)} || - #amqqueue{name = QName, pid = Pid} = Q - <- mnesia:table(rabbit_queue), - not rabbit_amqqueue:is_mirrored(Q) andalso - node(Pid) == Node andalso - not rabbit_mnesia:is_process_alive(Pid)])), + #amqqueue{name = QName, pid = Pid} = + Q <- mnesia:table(rabbit_queue), + node(Pid) == Node andalso + not rabbit_mnesia:is_process_alive(Pid) andalso + (not rabbit_amqqueue:is_mirrored(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q))])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index b655c1c8cf..25d8a304e6 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -31,7 +31,8 @@ groups() -> [ {cluster_size_2, [], [ rapid_redeclare, - declare_synchrony + declare_synchrony, + clean_up_exclusive_queues ]}, {cluster_size_3, [], [ consume_survives_stop, @@ -125,6 +126,21 @@ declare_synchrony(Config) -> declare(Ch, Name) -> amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}). +%% Ensure that exclusive queues are cleaned up when part of ha cluster +%% and node is killed abruptly then restarted +clean_up_exclusive_queues(Config) -> + QName = <<"excl">>, + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>), + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ChA = rabbit_ct_client_helpers:open_channel(Config, A), + amqp_channel:call(ChA, #'queue.declare'{queue = QName, + exclusive = true}), + ok = rabbit_ct_broker_helpers:kill_node(Config, A), + [] = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_amqqueue, list, []), + ok = rabbit_ct_broker_helpers:start_node(Config, A), + [[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []), + ok. + consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true). consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true). consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true). |
