summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Bakken <lbakken@pivotal.io>2017-08-09 09:14:15 -0700
committerLuke Bakken <lbakken@pivotal.io>2017-08-09 09:14:15 -0700
commitaf397dd1b42eaf44f7213f847fed6c0135812e20 (patch)
tree347c86ac2f8170e2750698530cf6c0b4a51673b4
parent23884dd83be137c356d4ae3ec19c7bc686118573 (diff)
downloadrabbitmq-server-git-af397dd1b42eaf44f7213f847fed6c0135812e20.tar.gz
Add test for cleaning up dead exclusive queues
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--test/partitions_SUITE.erl12
-rw-r--r--test/simple_ha_SUITE.erl18
3 files changed, 23 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8b1159e8db..467cd998bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
--export([is_mirrored/1, is_exclusive/1]). % Note: exported due to use in qlc expression.
+-export([is_mirrored/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
-export([pid_of/1, pid_of/2]).
@@ -920,10 +920,10 @@ cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mi
is_mirrored(Q) ->
rabbit_mirror_queue_misc:is_mirrored(Q).
-is_exclusive(#amqqueue{exclusive_owner = none}) ->
+is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
false;
-is_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
- true.
+is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
+ not rabbit_mnesia:is_process_alive(Pid).
on_node_up(Node) ->
ok = rabbit_misc:execute_mnesia_transaction(
@@ -974,7 +974,7 @@ on_node_down(Node) ->
node(Pid) == Node andalso
not rabbit_mnesia:is_process_alive(Pid) andalso
(not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_exclusive(Q))])),
+ rabbit_amqqueue:is_dead_exclusive(Q))])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index b533d8f7b6..b09d05b550 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -43,8 +43,7 @@ groups() ->
{net_ticktime_1, [], [
{cluster_size_2, [], [
ctl_ticktime_sync,
- prompt_disconnect_detection,
- clean_up_exclusive_queues
+ prompt_disconnect_detection
]},
{cluster_size_3, [], [
autoheal,
@@ -283,15 +282,6 @@ prompt_disconnect_detection(Config) ->
rabbit_ct_client_helpers:close_channel(ChB),
ok.
-clean_up_exclusive_queues(Config) ->
- rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- ChA = rabbit_ct_client_helpers:open_channel(Config, A),
- amqp_channel:call(ChA, #'queue.declare'{queue = <<"excl">>,
- exclusive = true}),
- block_unblock([{A, B}]),
- ok.
-
ctl_ticktime_sync(Config) ->
%% Server has 1s net_ticktime, make sure ctl doesn't get disconnected
Cmd = ["eval", "timer:sleep(5000)."],
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index b655c1c8cf..25d8a304e6 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -31,7 +31,8 @@ groups() ->
[
{cluster_size_2, [], [
rapid_redeclare,
- declare_synchrony
+ declare_synchrony,
+ clean_up_exclusive_queues
]},
{cluster_size_3, [], [
consume_survives_stop,
@@ -125,6 +126,21 @@ declare_synchrony(Config) ->
declare(Ch, Name) ->
amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}).
+%% Ensure that exclusive queues are cleaned up when part of ha cluster
+%% and node is killed abruptly then restarted
+clean_up_exclusive_queues(Config) ->
+ QName = <<"excl">>,
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, A),
+ amqp_channel:call(ChA, #'queue.declare'{queue = QName,
+ exclusive = true}),
+ ok = rabbit_ct_broker_helpers:kill_node(Config, A),
+ [] = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_amqqueue, list, []),
+ ok = rabbit_ct_broker_helpers:start_node(Config, A),
+ [[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
+ ok.
+
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).