summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-03-18 08:06:59 +0000
committerkjnilsson <knilsson@pivotal.io>2019-03-27 16:44:42 +0000
commit9dfa931d3275513da2d63e2913eef94d2c936311 (patch)
treebf4da8fd710f55f9e50eb1568df09d2db378f029 /test
parentc722c96ee6a9afe6d2fcf972e32f9ed1f6251291 (diff)
downloadrabbitmq-server-git-9dfa931d3275513da2d63e2913eef94d2c936311.tar.gz
Purge stale node state from quorum queues
When a node is disconnected then removed from the RabbitMQ cluster it is possibly that quorum queues retain some state for this node. This change purges any enqueuer or consumer state for pids relating to nodes that are not in the RabbitMQ cluster. [#164214265]
Diffstat (limited to 'test')
-rw-r--r--test/rabbit_fifo_SUITE.erl64
-rw-r--r--test/rabbit_fifo_int_SUITE.erl2
2 files changed, 59 insertions, 7 deletions
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 6582104708..310553dc56 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -480,9 +480,12 @@ tick_test(_) ->
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
- [{mod_call, _, _,
+ [{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{},
- {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4),
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
+ [_Node]
+ ]},
+ {aux, emit}] = rabbit_fifo:tick(1, S4),
ok.
@@ -921,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) ->
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource =>
+ rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -1203,6 +1207,54 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+purge_nodes_test(_) ->
+ Node = purged@node,
+ ThisNode = node(),
+ EnqPid = test_util:fake_pid(Node),
+ EnqPid2 = test_util:fake_pid(node()),
+ ConPid = test_util:fake_pid(Node),
+ Cid = {<<"tag">>, ConPid},
+ % WaitingPid = test_util:fake_pid(Node),
+
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ single_active_consumer_on => false}),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo:make_enqueue(EnqPid, 1, msg1),
+ State0),
+ {State2, _, _} = apply(meta(2),
+ rabbit_fifo:make_enqueue(EnqPid2, 1, msg2),
+ State1),
+ {State3, _} = check(Cid, 3, 1000, State2),
+ {State4, _, _} = apply(meta(4),
+ {down, EnqPid, noconnection},
+ State3),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode, Node]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State4)),
+ %% assert there are both enqueuers and consumers
+ {State, _, _} = apply(meta(5),
+ rabbit_fifo:make_purge_nodes([Node]),
+ State4),
+
+ %% assert there are no enqueuers nor consumers
+ ?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1,
+ State),
+
+ ?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
+ State),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State)),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index f281d15795..d4ae417a78 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -54,7 +54,7 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
- meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),