summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-10-24 16:00:55 +0300
committerGitHub <noreply@github.com>2019-10-24 16:00:55 +0300
commit2fd609388022681995b9263cbedf5fa58e57c302 (patch)
tree4c3780ec6f6e64f3decda02f0d3f317d2ea04eeb /test
parent1df9805b4ad1d6ce50c812485d85baa6c946cdf0 (diff)
parent8a5ceea67f73af319516a7890828fa2c813d3163 (diff)
downloadrabbitmq-server-git-2fd609388022681995b9263cbedf5fa58e57c302.tar.gz
Merge pull request #2141 from rabbitmq/reserve-qq-file-handles
Reserve file handles for quorum queues
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl51
-rw-r--r--test/rabbit_fifo_SUITE.erl30
-rw-r--r--test/rabbit_fifo_int_SUITE.erl2
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl148
4 files changed, 223 insertions, 8 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bed7dedae0..aa617ce4bc 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -72,7 +72,9 @@ groups() ->
metrics_cleanup_on_leader_crash,
consume_in_minority,
shrink_all,
- rebalance
+ rebalance,
+ file_handle_reservations,
+ file_handle_reservations_above_limit
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -1369,6 +1371,53 @@ delete_member_not_a_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+file_handle_reservations(Config) ->
+ Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server1}),
+ [Follower1, Follower2] = Servers -- [Leader],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])),
+ force_leader_change(Servers, QQ),
+ {ok, _, {_, Leader0}} = ra:members({RaName, Server1}),
+ [Follower01, Follower02] = Servers -- [Leader0],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])).
+
+file_handle_reservations_above_limit(Config) ->
+ [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
+ QQ = ?config(queue_name, Config),
+ QQ2 = ?config(alt_queue_name, Config),
+
+ Limit = rpc:call(S1, file_handle_cache, get_limit, []),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [3]),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0d9acfa1fa..dba47aa225 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -523,11 +523,27 @@ duplicate_delivery_test(_) ->
?assertEqual(1, maps:size(Messages)),
ok.
-state_enter_test(_) ->
+state_enter_file_handle_leader_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
- [{mod_call, m, f, [a, the_name]}] = rabbit_fifo:state_enter(leader, S0),
+
+ Resource = {resource, <<"/">>, queue, <<"test">>},
+ Effects = rabbit_fifo:state_enter(leader, S0),
+ ?assertEqual([
+ {mod_call, m, f, [a, the_name]},
+ {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
+ ], Effects),
+ ok.
+
+state_enter_file_handle_other_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Effects = rabbit_fifo:state_enter(other, S0),
+ ?assertEqual([
+ {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
+ ],
+ Effects),
ok.
state_enter_monitors_and_notifications_test(_) ->
@@ -948,8 +964,9 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(leader, State1),
- % 2 effects for each consumer process (channel process), 1 effect for the node
- ?assertEqual(2 * 3 + 1, length(Effects)).
+ %% 2 effects for each consumer process (channel process), 1 effect for the node,
+ %% 1 effect for file handle reservation
+ ?assertEqual(2 * 3 + 1 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
@@ -978,8 +995,9 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(eol, State1),
- % 1 effect for each consumer process (channel process)
- ?assertEqual(3, length(Effects)).
+ %% 1 effect for each consumer process (channel process),
+ %% 1 effect for file handle reservation
+ ?assertEqual(4, length(Effects)).
query_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index d4ae417a78..e611b5e3a2 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -55,6 +55,8 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end),
+ meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 425ccab183..69b0efc8fe 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -47,7 +47,12 @@ groups() ->
exchange_count,
queue_count,
connection_count,
- connection_lookup
+ connection_lookup,
+ file_handle_cache_reserve,
+ file_handle_cache_reserve_release,
+ file_handle_cache_reserve_above_limit,
+ file_handle_cache_reserve_monitor,
+ file_handle_cache_reserve_open_file_above_limit
]}
].
@@ -196,6 +201,147 @@ file_handle_cache1(_Config) ->
ok = file_handle_cache:set_limit(Limit),
passed.
+file_handle_cache_reserve(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve1, [Config]).
+
+file_handle_cache_reserve1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:set_reservation(7),
+
+ Self = self(),
+ spawn(fun () -> ok = file_handle_cache:obtain(),
+ Self ! obtained
+ end),
+
+ Props = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+ ?assertEqual(0, proplists:get_value(sockets_used, Props)),
+
+ %% The obtain should still be blocked, as there are no file handles
+ %% available
+ receive
+ obtained ->
+ throw(error_file_obtained)
+ after 1000 ->
+ %% Let's release 5 file handles, that should leave
+ %% enough free for the `obtain` to go through
+ file_handle_cache:set_reservation(2),
+ Props0 = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(2, proplists:get_value(files_reserved, Props0)),
+ ?assertEqual(1, proplists:get_value(sockets_used, Props0)),
+ receive
+ obtained ->
+ ok = file_handle_cache:set_limit(Limit),
+ passed
+ after 5000 ->
+ throw(error_file_not_released)
+ end
+ end.
+
+file_handle_cache_reserve_release(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_release1, [Config]).
+
+file_handle_cache_reserve_release1(_Config) ->
+ ok = file_handle_cache:set_reservation(7),
+ ?assertEqual([{files_reserved, 7}], file_handle_cache:info([files_reserved])),
+ ok = file_handle_cache:set_reservation(3),
+ ?assertEqual([{files_reserved, 3}], file_handle_cache:info([files_reserved])),
+ ok = file_handle_cache:release_reservation(),
+ ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])),
+ passed.
+
+file_handle_cache_reserve_above_limit(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_above_limit1, [Config]).
+
+file_handle_cache_reserve_above_limit1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:obtain(5),
+ ?assertEqual([{file_descriptor_limit, []}], rabbit_alarm:get_alarms()),
+
+ ok = file_handle_cache:set_reservation(7),
+
+ Props = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+ ?assertEqual(5, proplists:get_value(sockets_used, Props)),
+
+ ok = file_handle_cache:set_limit(Limit),
+ passed.
+
+file_handle_cache_reserve_open_file_above_limit(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]).
+
+file_handle_cache_reserve_open_file_above_limit1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:set_reservation(7),
+
+ Self = self(),
+ TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
+ spawn(fun () -> {ok, _} = file_handle_cache:open(
+ filename:join(TmpDir, "file_above_limit"),
+ [write], []),
+ Self ! opened
+ end),
+
+ Props = file_handle_cache:info([files_reserved]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+
+ %% The open should still be blocked, as there are no file handles
+ %% available
+ receive
+ opened ->
+ throw(error_file_opened)
+ after 1000 ->
+ %% Let's release 5 file handles, that should leave
+ %% enough free for the `open` to go through
+ file_handle_cache:set_reservation(2),
+ Props0 = file_handle_cache:info([files_reserved, total_used]),
+ ?assertEqual(2, proplists:get_value(files_reserved, Props0)),
+ receive
+ opened ->
+ ok = file_handle_cache:set_limit(Limit),
+ passed
+ after 5000 ->
+ throw(error_file_not_released)
+ end
+ end.
+
+file_handle_cache_reserve_monitor(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_monitor1, [Config]).
+
+file_handle_cache_reserve_monitor1(_Config) ->
+ %% Check that if the process that does the reserve dies, the file handlers are
+ %% released by the cache
+ Self = self(),
+ Pid = spawn(fun () ->
+ ok = file_handle_cache:set_reservation(2),
+ Self ! done,
+ receive
+ stop -> ok
+ end
+ end),
+ receive
+ done -> ok
+ end,
+ ?assertEqual([{files_reserved, 2}], file_handle_cache:info([files_reserved])),
+ Pid ! stop,
+ timer:sleep(500),
+ ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])),
+ passed.
+
%% -------------------------------------------------------------------
%% Log management.
%% -------------------------------------------------------------------