diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 32 |
2 files changed, 22 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b74b9034b9..9f497f3d21 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -335,7 +335,9 @@ ch_record_state_transition(OldCR, NewCR) -> deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> + blocked_consumers = BlockedConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -345,7 +347,8 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send( LimiterPid, self(), AckRequired, + BQ:len(BQS) )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index bf9cf583e4..cd3ac9c5ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -21,7 +21,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). -export([start_link/2]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, set_credit/3, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -33,7 +33,7 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). +-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -70,12 +70,12 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> +can_send(undefined, _QPid, _AckRequired, _Len) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(LimiterPid, QPid, AckRequired, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, Len}, infinity) end). %% Let the limiter know that the channel has received some acks from a @@ -118,20 +118,22 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired}, _From, +handle_call({can_send, _QPid, _AckRequired, _Len}, _From, State = #lim{credit = 0}) -> {reply, false, State}; -handle_call({can_send, QPid, AckRequired}, _From, - State = #lim{volume = Volume, credit = Credit}) -> +handle_call({can_send, QPid, AckRequired, Len}, _From, + State = #lim{volume = Volume, credit = Credit, drain = Drain}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end, - credit = case Credit of - unlimited -> unlimited; - _ -> Credit - 1 - end}} + false -> {reply, true, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end, + credit = case {Credit, Len, Drain} of + {unlimited, _, _} -> unlimited; + {_, 1, true} -> 0; + {_, _, _} -> Credit - 1 + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> |
