diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-10-24 16:00:55 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-10-24 16:00:55 +0300 |
| commit | 2fd609388022681995b9263cbedf5fa58e57c302 (patch) | |
| tree | 4c3780ec6f6e64f3decda02f0d3f317d2ea04eeb /test | |
| parent | 1df9805b4ad1d6ce50c812485d85baa6c946cdf0 (diff) | |
| parent | 8a5ceea67f73af319516a7890828fa2c813d3163 (diff) | |
| download | rabbitmq-server-git-2fd609388022681995b9263cbedf5fa58e57c302.tar.gz | |
Merge pull request #2141 from rabbitmq/reserve-qq-file-handles
Reserve file handles for quorum queues
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 51 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 30 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 148 |
4 files changed, 223 insertions, 8 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index bed7dedae0..aa617ce4bc 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -72,7 +72,9 @@ groups() -> metrics_cleanup_on_leader_crash, consume_in_minority, shrink_all, - rebalance + rebalance, + file_handle_reservations, + file_handle_reservations_above_limit ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -1369,6 +1371,53 @@ delete_member_not_a_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +file_handle_reservations(Config) -> + Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server1}), + [Follower1, Follower2] = Servers -- [Leader], + ?assertEqual([{files_reserved, 5}], + rpc:call(Leader, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])), + force_leader_change(Servers, QQ), + {ok, _, {_, Leader0}} = ra:members({RaName, Server1}), + [Follower01, Follower02] = Servers -- [Leader0], + ?assertEqual([{files_reserved, 5}], + rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])). + +file_handle_reservations_above_limit(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QQ = ?config(queue_name, Config), + QQ2 = ?config(alt_queue_name, Config), + + Limit = rpc:call(S1, file_handle_cache, get_limit, []), + + ok = rpc:call(S1, file_handle_cache, set_limit, [3]), + ok = rpc:call(S2, file_handle_cache, set_limit, [3]), + ok = rpc:call(S3, file_handle_cache, set_limit, [3]), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]), + ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]), + ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 0d9acfa1fa..dba47aa225 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -523,11 +523,27 @@ duplicate_delivery_test(_) -> ?assertEqual(1, maps:size(Messages)), ok. -state_enter_test(_) -> +state_enter_file_handle_leader_reservation_test(_) -> S0 = init(#{name => the_name, queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), become_leader_handler => {m, f, [a]}}), - [{mod_call, m, f, [a, the_name]}] = rabbit_fifo:state_enter(leader, S0), + + Resource = {resource, <<"/">>, queue, <<"test">>}, + Effects = rabbit_fifo:state_enter(leader, S0), + ?assertEqual([ + {mod_call, m, f, [a, the_name]}, + {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} + ], Effects), + ok. + +state_enter_file_handle_other_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Effects = rabbit_fifo:state_enter(other, S0), + ?assertEqual([ + {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} + ], + Effects), ok. state_enter_monitors_and_notifications_test(_) -> @@ -948,8 +964,9 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(leader, State1), - % 2 effects for each consumer process (channel process), 1 effect for the node - ?assertEqual(2 * 3 + 1, length(Effects)). + %% 2 effects for each consumer process (channel process), 1 effect for the node, + %% 1 effect for file handle reservation + ?assertEqual(2 * 3 + 1 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), @@ -978,8 +995,9 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(eol, State1), - % 1 effect for each consumer process (channel process) - ?assertEqual(3, length(Effects)). + %% 1 effect for each consumer process (channel process), + %% 1 effect for file handle reservation + ?assertEqual(4, length(Effects)). query_consumers_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index d4ae417a78..e611b5e3a2 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -55,6 +55,8 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), + meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end), + meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(), diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index 425ccab183..69b0efc8fe 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -47,7 +47,12 @@ groups() -> exchange_count, queue_count, connection_count, - connection_lookup + connection_lookup, + file_handle_cache_reserve, + file_handle_cache_reserve_release, + file_handle_cache_reserve_above_limit, + file_handle_cache_reserve_monitor, + file_handle_cache_reserve_open_file_above_limit ]} ]. @@ -196,6 +201,147 @@ file_handle_cache1(_Config) -> ok = file_handle_cache:set_limit(Limit), passed. +file_handle_cache_reserve(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, file_handle_cache_reserve1, [Config]). + +file_handle_cache_reserve1(_Config) -> + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), + %% Reserves are always accepted, even if above the limit + %% These are for special processes such as quorum queues + ok = file_handle_cache:set_reservation(7), + + Self = self(), + spawn(fun () -> ok = file_handle_cache:obtain(), + Self ! obtained + end), + + Props = file_handle_cache:info([files_reserved, sockets_used]), + ?assertEqual(7, proplists:get_value(files_reserved, Props)), + ?assertEqual(0, proplists:get_value(sockets_used, Props)), + + %% The obtain should still be blocked, as there are no file handles + %% available + receive + obtained -> + throw(error_file_obtained) + after 1000 -> + %% Let's release 5 file handles, that should leave + %% enough free for the `obtain` to go through + file_handle_cache:set_reservation(2), + Props0 = file_handle_cache:info([files_reserved, sockets_used]), + ?assertEqual(2, proplists:get_value(files_reserved, Props0)), + ?assertEqual(1, proplists:get_value(sockets_used, Props0)), + receive + obtained -> + ok = file_handle_cache:set_limit(Limit), + passed + after 5000 -> + throw(error_file_not_released) + end + end. + +file_handle_cache_reserve_release(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, file_handle_cache_reserve_release1, [Config]). + +file_handle_cache_reserve_release1(_Config) -> + ok = file_handle_cache:set_reservation(7), + ?assertEqual([{files_reserved, 7}], file_handle_cache:info([files_reserved])), + ok = file_handle_cache:set_reservation(3), + ?assertEqual([{files_reserved, 3}], file_handle_cache:info([files_reserved])), + ok = file_handle_cache:release_reservation(), + ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])), + passed. + +file_handle_cache_reserve_above_limit(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, file_handle_cache_reserve_above_limit1, [Config]). + +file_handle_cache_reserve_above_limit1(_Config) -> + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), + %% Reserves are always accepted, even if above the limit + %% These are for special processes such as quorum queues + ok = file_handle_cache:obtain(5), + ?assertEqual([{file_descriptor_limit, []}], rabbit_alarm:get_alarms()), + + ok = file_handle_cache:set_reservation(7), + + Props = file_handle_cache:info([files_reserved, sockets_used]), + ?assertEqual(7, proplists:get_value(files_reserved, Props)), + ?assertEqual(5, proplists:get_value(sockets_used, Props)), + + ok = file_handle_cache:set_limit(Limit), + passed. + +file_handle_cache_reserve_open_file_above_limit(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]). + +file_handle_cache_reserve_open_file_above_limit1(_Config) -> + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), + %% Reserves are always accepted, even if above the limit + %% These are for special processes such as quorum queues + ok = file_handle_cache:set_reservation(7), + + Self = self(), + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + spawn(fun () -> {ok, _} = file_handle_cache:open( + filename:join(TmpDir, "file_above_limit"), + [write], []), + Self ! opened + end), + + Props = file_handle_cache:info([files_reserved]), + ?assertEqual(7, proplists:get_value(files_reserved, Props)), + + %% The open should still be blocked, as there are no file handles + %% available + receive + opened -> + throw(error_file_opened) + after 1000 -> + %% Let's release 5 file handles, that should leave + %% enough free for the `open` to go through + file_handle_cache:set_reservation(2), + Props0 = file_handle_cache:info([files_reserved, total_used]), + ?assertEqual(2, proplists:get_value(files_reserved, Props0)), + receive + opened -> + ok = file_handle_cache:set_limit(Limit), + passed + after 5000 -> + throw(error_file_not_released) + end + end. + +file_handle_cache_reserve_monitor(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, file_handle_cache_reserve_monitor1, [Config]). + +file_handle_cache_reserve_monitor1(_Config) -> + %% Check that if the process that does the reserve dies, the file handlers are + %% released by the cache + Self = self(), + Pid = spawn(fun () -> + ok = file_handle_cache:set_reservation(2), + Self ! done, + receive + stop -> ok + end + end), + receive + done -> ok + end, + ?assertEqual([{files_reserved, 2}], file_handle_cache:info([files_reserved])), + Pid ! stop, + timer:sleep(500), + ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])), + passed. + %% ------------------------------------------------------------------- %% Log management. %% ------------------------------------------------------------------- |
