summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl12
-rw-r--r--src/rabbit_quorum_queue.erl12
-rw-r--r--test/quorum_queue_SUITE.erl51
-rw-r--r--test/rabbit_fifo_SUITE.erl30
-rw-r--r--test/rabbit_fifo_int_SUITE.erl2
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl148
6 files changed, 244 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 891a6827dc..12083b105c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -548,6 +548,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
+ resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {[], []}
}) ->
@@ -558,7 +559,8 @@ state_enter(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
- Effects = Mons ++ Nots ++ NodeMons,
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
case BLH of
undefined ->
Effects;
@@ -577,8 +579,12 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
- || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
-state_enter(_, _) ->
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+ FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
+ [FHReservation];
+ state_enter(_, _) ->
%% catch all as not handling all states
[].
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b52678605b..7e013fd725 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -40,6 +40,8 @@
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
+-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
+-export([file_handle_release_reservation/0]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -886,6 +888,16 @@ matches_strategy(even, Members) ->
is_match(Subj, E) ->
nomatch /= re:run(Subj, E).
+file_handle_leader_reservation(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ClusterSize = length(get_nodes(Q)),
+ file_handle_cache:set_reservation(2 + ClusterSize).
+
+file_handle_other_reservation() ->
+ file_handle_cache:set_reservation(2).
+
+file_handle_release_reservation() ->
+ file_handle_cache:release_reservation().
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bed7dedae0..aa617ce4bc 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -72,7 +72,9 @@ groups() ->
metrics_cleanup_on_leader_crash,
consume_in_minority,
shrink_all,
- rebalance
+ rebalance,
+ file_handle_reservations,
+ file_handle_reservations_above_limit
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -1369,6 +1371,53 @@ delete_member_not_a_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+file_handle_reservations(Config) ->
+ Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server1}),
+ [Follower1, Follower2] = Servers -- [Leader],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])),
+ force_leader_change(Servers, QQ),
+ {ok, _, {_, Leader0}} = ra:members({RaName, Server1}),
+ [Follower01, Follower02] = Servers -- [Leader0],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])).
+
+file_handle_reservations_above_limit(Config) ->
+ [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
+ QQ = ?config(queue_name, Config),
+ QQ2 = ?config(alt_queue_name, Config),
+
+ Limit = rpc:call(S1, file_handle_cache, get_limit, []),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [3]),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0d9acfa1fa..dba47aa225 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -523,11 +523,27 @@ duplicate_delivery_test(_) ->
?assertEqual(1, maps:size(Messages)),
ok.
-state_enter_test(_) ->
+state_enter_file_handle_leader_reservation_test(_) ->
S0 = init(#{name => the_name,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
- [{mod_call, m, f, [a, the_name]}] = rabbit_fifo:state_enter(leader, S0),
+
+ Resource = {resource, <<"/">>, queue, <<"test">>},
+ Effects = rabbit_fifo:state_enter(leader, S0),
+ ?assertEqual([
+ {mod_call, m, f, [a, the_name]},
+ {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
+ ], Effects),
+ ok.
+
+state_enter_file_handle_other_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Effects = rabbit_fifo:state_enter(other, S0),
+ ?assertEqual([
+ {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
+ ],
+ Effects),
ok.
state_enter_monitors_and_notifications_test(_) ->
@@ -948,8 +964,9 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(leader, State1),
- % 2 effects for each consumer process (channel process), 1 effect for the node
- ?assertEqual(2 * 3 + 1, length(Effects)).
+ %% 2 effects for each consumer process (channel process), 1 effect for the node,
+ %% 1 effect for file handle reservation
+ ?assertEqual(2 * 3 + 1 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
@@ -978,8 +995,9 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(eol, State1),
- % 1 effect for each consumer process (channel process)
- ?assertEqual(3, length(Effects)).
+ %% 1 effect for each consumer process (channel process),
+ %% 1 effect for file handle reservation
+ ?assertEqual(4, length(Effects)).
query_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index d4ae417a78..e611b5e3a2 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -55,6 +55,8 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, file_handle_leader_reservation, fun (_) -> ok end),
+ meck:expect(rabbit_quorum_queue, file_handle_other_reservation, fun () -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 425ccab183..69b0efc8fe 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -47,7 +47,12 @@ groups() ->
exchange_count,
queue_count,
connection_count,
- connection_lookup
+ connection_lookup,
+ file_handle_cache_reserve,
+ file_handle_cache_reserve_release,
+ file_handle_cache_reserve_above_limit,
+ file_handle_cache_reserve_monitor,
+ file_handle_cache_reserve_open_file_above_limit
]}
].
@@ -196,6 +201,147 @@ file_handle_cache1(_Config) ->
ok = file_handle_cache:set_limit(Limit),
passed.
+file_handle_cache_reserve(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve1, [Config]).
+
+file_handle_cache_reserve1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:set_reservation(7),
+
+ Self = self(),
+ spawn(fun () -> ok = file_handle_cache:obtain(),
+ Self ! obtained
+ end),
+
+ Props = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+ ?assertEqual(0, proplists:get_value(sockets_used, Props)),
+
+ %% The obtain should still be blocked, as there are no file handles
+ %% available
+ receive
+ obtained ->
+ throw(error_file_obtained)
+ after 1000 ->
+ %% Let's release 5 file handles, that should leave
+ %% enough free for the `obtain` to go through
+ file_handle_cache:set_reservation(2),
+ Props0 = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(2, proplists:get_value(files_reserved, Props0)),
+ ?assertEqual(1, proplists:get_value(sockets_used, Props0)),
+ receive
+ obtained ->
+ ok = file_handle_cache:set_limit(Limit),
+ passed
+ after 5000 ->
+ throw(error_file_not_released)
+ end
+ end.
+
+file_handle_cache_reserve_release(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_release1, [Config]).
+
+file_handle_cache_reserve_release1(_Config) ->
+ ok = file_handle_cache:set_reservation(7),
+ ?assertEqual([{files_reserved, 7}], file_handle_cache:info([files_reserved])),
+ ok = file_handle_cache:set_reservation(3),
+ ?assertEqual([{files_reserved, 3}], file_handle_cache:info([files_reserved])),
+ ok = file_handle_cache:release_reservation(),
+ ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])),
+ passed.
+
+file_handle_cache_reserve_above_limit(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_above_limit1, [Config]).
+
+file_handle_cache_reserve_above_limit1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:obtain(5),
+ ?assertEqual([{file_descriptor_limit, []}], rabbit_alarm:get_alarms()),
+
+ ok = file_handle_cache:set_reservation(7),
+
+ Props = file_handle_cache:info([files_reserved, sockets_used]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+ ?assertEqual(5, proplists:get_value(sockets_used, Props)),
+
+ ok = file_handle_cache:set_limit(Limit),
+ passed.
+
+file_handle_cache_reserve_open_file_above_limit(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_open_file_above_limit1, [Config]).
+
+file_handle_cache_reserve_open_file_above_limit1(_Config) ->
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5),
+ %% Reserves are always accepted, even if above the limit
+ %% These are for special processes such as quorum queues
+ ok = file_handle_cache:set_reservation(7),
+
+ Self = self(),
+ TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
+ spawn(fun () -> {ok, _} = file_handle_cache:open(
+ filename:join(TmpDir, "file_above_limit"),
+ [write], []),
+ Self ! opened
+ end),
+
+ Props = file_handle_cache:info([files_reserved]),
+ ?assertEqual(7, proplists:get_value(files_reserved, Props)),
+
+ %% The open should still be blocked, as there are no file handles
+ %% available
+ receive
+ opened ->
+ throw(error_file_opened)
+ after 1000 ->
+ %% Let's release 5 file handles, that should leave
+ %% enough free for the `open` to go through
+ file_handle_cache:set_reservation(2),
+ Props0 = file_handle_cache:info([files_reserved, total_used]),
+ ?assertEqual(2, proplists:get_value(files_reserved, Props0)),
+ receive
+ opened ->
+ ok = file_handle_cache:set_limit(Limit),
+ passed
+ after 5000 ->
+ throw(error_file_not_released)
+ end
+ end.
+
+file_handle_cache_reserve_monitor(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, file_handle_cache_reserve_monitor1, [Config]).
+
+file_handle_cache_reserve_monitor1(_Config) ->
+ %% Check that if the process that does the reserve dies, the file handlers are
+ %% released by the cache
+ Self = self(),
+ Pid = spawn(fun () ->
+ ok = file_handle_cache:set_reservation(2),
+ Self ! done,
+ receive
+ stop -> ok
+ end
+ end),
+ receive
+ done -> ok
+ end,
+ ?assertEqual([{files_reserved, 2}], file_handle_cache:info([files_reserved])),
+ Pid ! stop,
+ timer:sleep(500),
+ ?assertEqual([{files_reserved, 0}], file_handle_cache:info([files_reserved])),
+ passed.
+
%% -------------------------------------------------------------------
%% Log management.
%% -------------------------------------------------------------------