summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-10-24 16:00:55 +0300
committerGitHub <noreply@github.com>2019-10-24 16:00:55 +0300
commit2fd609388022681995b9263cbedf5fa58e57c302 (patch)
tree4c3780ec6f6e64f3decda02f0d3f317d2ea04eeb /src
parent1df9805b4ad1d6ce50c812485d85baa6c946cdf0 (diff)
parent8a5ceea67f73af319516a7890828fa2c813d3163 (diff)
downloadrabbitmq-server-git-2fd609388022681995b9263cbedf5fa58e57c302.tar.gz
Merge pull request #2141 from rabbitmq/reserve-qq-file-handles
Reserve file handles for quorum queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl12
-rw-r--r--src/rabbit_quorum_queue.erl12
2 files changed, 21 insertions, 3 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 891a6827dc..12083b105c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -548,6 +548,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
+ resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {[], []}
}) ->
@@ -558,7 +559,8 @@ state_enter(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
- Effects = Mons ++ Nots ++ NodeMons,
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
case BLH of
undefined ->
Effects;
@@ -577,8 +579,12 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
- || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
-state_enter(_, _) ->
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+ FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
+ [FHReservation];
+ state_enter(_, _) ->
%% catch all as not handling all states
[].
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b52678605b..7e013fd725 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -40,6 +40,8 @@
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
+-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
+-export([file_handle_release_reservation/0]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -886,6 +888,16 @@ matches_strategy(even, Members) ->
is_match(Subj, E) ->
nomatch /= re:run(Subj, E).
+file_handle_leader_reservation(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ClusterSize = length(get_nodes(Q)),
+ file_handle_cache:set_reservation(2 + ClusterSize).
+
+file_handle_other_reservation() ->
+ file_handle_cache:set_reservation(2).
+
+file_handle_release_reservation() ->
+ file_handle_cache:release_reservation().
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->