summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_limiter.erl32
2 files changed, 22 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b74b9034b9..9f497f3d21 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -335,7 +335,9 @@ ch_record_state_transition(OldCR, NewCR) ->
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+ blocked_consumers = BlockedConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -345,7 +347,8 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired,
+ BQ:len(BQS) )) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index bf9cf583e4..cd3ac9c5ce 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -21,7 +21,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
-export([get_limit/1, set_credit/3, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -33,7 +33,7 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) ->
rabbit_types:ok_pid_or_error()).
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
--spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
+-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -70,12 +70,12 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid, _AckRequired) ->
+can_send(undefined, _QPid, _AckRequired, _Len) ->
true;
-can_send(LimiterPid, QPid, AckRequired) ->
+can_send(LimiterPid, QPid, AckRequired, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, Len},
infinity) end).
%% Let the limiter know that the channel has received some acks from a
@@ -118,20 +118,22 @@ init([ChPid, UnackedMsgCount]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, _QPid, _AckRequired}, _From,
+handle_call({can_send, _QPid, _AckRequired, _Len}, _From,
State = #lim{credit = 0}) ->
{reply, false, State};
-handle_call({can_send, QPid, AckRequired}, _From,
- State = #lim{volume = Volume, credit = Credit}) ->
+handle_call({can_send, QPid, AckRequired, Len}, _From,
+ State = #lim{volume = Volume, credit = Credit, drain = Drain}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end,
- credit = case Credit of
- unlimited -> unlimited;
- _ -> Credit - 1
- end}}
+ false -> {reply, true,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end,
+ credit = case {Credit, Len, Drain} of
+ {unlimited, _, _} -> unlimited;
+ {_, 1, true} -> 0;
+ {_, _, _} -> Credit - 1
+ end}}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->