diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 12 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 51 |
3 files changed, 69 insertions, 5 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 062fb7eee1..6982022d61 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -541,6 +541,7 @@ state_enter(leader, #?MODULE{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, cfg = #cfg{name = Name, + resource = Resource, become_leader_handler = BLH}, prefix_msgs = {[], []} }) -> @@ -551,7 +552,8 @@ state_enter(leader, #?MODULE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - Effects = Mons ++ Nots ++ NodeMons, + FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], + Effects = Mons ++ Nots ++ NodeMons ++ FHReservation, case BLH of undefined -> Effects; @@ -570,10 +572,11 @@ state_enter(eol, #?MODULE{enqueuers = Enqs, #{}, WaitingConsumers0), AllConsumers = maps:merge(Custs, WaitingConsumers1), [{send_msg, P, eol, ra_event} - || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; -state_enter(_, _) -> + || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++ + [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}]; +state_enter(_, #?MODULE{cfg = #cfg{resource = Resource} }) -> %% catch all as not handling all states - []. + [{mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}]. -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4f8c129291..e602cd9f04 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -40,6 +40,8 @@ -export([shrink_all/1, grow/4]). -export([transfer_leadership/2, get_replicas/1, queue_length/1]). +-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]). +-export([file_handle_release_reservation/0]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -883,6 +885,16 @@ matches_strategy(even, Members) -> is_match(Subj, E) -> nomatch /= re:run(Subj, E). +file_handle_leader_reservation(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + ClusterSize = length(get_nodes(Q)), + file_handle_cache:set_reservation(2 + ClusterSize). + +file_handle_other_reservation() -> + file_handle_cache:set_reservation(2). + +file_handle_release_reservation() -> + file_handle_cache:release_reserve(). %%---------------------------------------------------------------------------- dlx_mfa(Q) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index bed7dedae0..aa617ce4bc 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -72,7 +72,9 @@ groups() -> metrics_cleanup_on_leader_crash, consume_in_minority, shrink_all, - rebalance + rebalance, + file_handle_reservations, + file_handle_reservations_above_limit ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -1369,6 +1371,53 @@ delete_member_not_a_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +file_handle_reservations(Config) -> + Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server1}), + [Follower1, Follower2] = Servers -- [Leader], + ?assertEqual([{files_reserved, 5}], + rpc:call(Leader, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower1, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower2, file_handle_cache, info, [[files_reserved]])), + force_leader_change(Servers, QQ), + {ok, _, {_, Leader0}} = ra:members({RaName, Server1}), + [Follower01, Follower02] = Servers -- [Leader0], + ?assertEqual([{files_reserved, 5}], + rpc:call(Leader0, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower01, file_handle_cache, info, [[files_reserved]])), + ?assertEqual([{files_reserved, 2}], + rpc:call(Follower02, file_handle_cache, info, [[files_reserved]])). + +file_handle_reservations_above_limit(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QQ = ?config(queue_name, Config), + QQ2 = ?config(alt_queue_name, Config), + + Limit = rpc:call(S1, file_handle_cache, get_limit, []), + + ok = rpc:call(S1, file_handle_cache, set_limit, [3]), + ok = rpc:call(S2, file_handle_cache, set_limit, [3]), + ok = rpc:call(S3, file_handle_cache, set_limit, [3]), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rpc:call(S1, file_handle_cache, set_limit, [Limit]), + ok = rpc:call(S2, file_handle_cache, set_limit, [Limit]), + ok = rpc:call(S3, file_handle_cache, set_limit, [Limit]). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone |
