summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl11
-rw-r--r--src/rabbit_quorum_queue.erl12
-rw-r--r--test/quorum_queue_SUITE.erl51
3 files changed, 69 insertions, 5 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 062fb7eee1..6982022d61 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -541,6 +541,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
+ resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {[], []}
}) ->
@@ -551,7 +552,8 @@ state_enter(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
- Effects = Mons ++ Nots ++ NodeMons,
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
case BLH of
undefined ->
Effects;
@@ -570,10 +572,11 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
- || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
-state_enter(_, _) ->
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(_, #?MODULE{cfg = #cfg{resource = Resource} }) ->
%% catch all as not handling all states
- [].
+ [{mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}].
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4f8c129291..e602cd9f04 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -40,6 +40,8 @@
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
+-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
+-export([file_handle_release_reservation/0]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -883,6 +885,16 @@ matches_strategy(even, Members) ->
is_match(Subj, E) ->
nomatch /= re:run(Subj, E).
+file_handle_leader_reservation(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ClusterSize = length(get_nodes(Q)),
+ file_handle_cache:set_reservation(2 + ClusterSize).
+
+file_handle_other_reservation() ->
+ file_handle_cache:set_reservation(2).
+
+file_handle_release_reservation() ->
+ file_handle_cache:release_reserve().
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bed7dedae0..aa617ce4bc 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -72,7 +72,9 @@ groups() ->
metrics_cleanup_on_leader_crash,
consume_in_minority,
shrink_all,
- rebalance
+ rebalance,
+ file_handle_reservations,
+ file_handle_reservations_above_limit
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -1369,6 +1371,53 @@ delete_member_not_a_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+file_handle_reservations(Config) ->
+ Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server1}),
+ [Follower1, Follower2] = Servers -- [Leader],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])),
+ force_leader_change(Servers, QQ),
+ {ok, _, {_, Leader0}} = ra:members({RaName, Server1}),
+ [Follower01, Follower02] = Servers -- [Leader0],
+ ?assertEqual([{files_reserved, 5}],
+ rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])),
+ ?assertEqual([{files_reserved, 2}],
+ rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])).
+
+file_handle_reservations_above_limit(Config) ->
+ [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
+ QQ = ?config(queue_name, Config),
+ QQ2 = ?config(alt_queue_name, Config),
+
+ Limit = rpc:call(S1, file_handle_cache, get_limit, []),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [3]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [3]),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]),
+ ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone