diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 142 |
3 files changed, 127 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9f497f3d21..9fda12cdc3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -348,7 +348,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso rabbit_limiter:can_send( LimiterPid, self(), AckRequired, - BQ:len(BQS) )) of + ConsumerTag, BQ:len(BQS) )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eb634cca13..bac106f993 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1004,23 +1004,34 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, Content, State) -> - {noreply, State1 = #ch{writer_pid = WriterPid}} = - handle_method(#'channel.credit'{credit = -1, drain = true}, - Content, State), - ok = rabbit_writer:send_command(WriterPid, - #'channel.flow_ok'{active = true}), - {noreply, State1}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; -handle_method(#'channel.flow'{active = false}, Content, State) -> - {noreply, State1 = #ch{writer_pid = WriterPid}} = - handle_method(#'channel.credit'{credit = 0, drain = true}, - Content, State), - ok = rabbit_writer:send_command(WriterPid, - #'channel.flow_ok'{active = false}), - {noreply, State1}; +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + State1 = State#ch{limiter_pid = LimiterPid1}, + ok = rabbit_limiter:block(LimiterPid1), + case consumer_queues(Consumers) of + [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State1#ch{blocking = dict:from_list(Queues)}} + end; -handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _, +handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, + drain = Drain}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> LimiterPid1 = case LimiterPid of @@ -1028,7 +1039,7 @@ handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _, Other -> Other end, LimiterPid2 = - case rabbit_limiter:set_credit(LimiterPid1, Credit, Drain) of + case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Drain) of ok -> limit_queues(LimiterPid1, State), LimiterPid1; stopped -> unlimit_queues(State) @@ -1036,7 +1047,7 @@ handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _, State1 = State#ch{limiter_pid = LimiterPid2}, {noreply, State1}; - %% TODO port this bit + %% TODO port this bit ? %% case consumer_queues(Consumers) of %% [] -> {reply, #'channel.flow_ok'{active = false}, State1}; %% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || @@ -1237,12 +1248,12 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) - end. + %% TODO this could be faster, group the acks + rabbit_misc:queue_fold( + fun ({_, none, _}, Acc) -> Acc; + ({_, CTag, _}, Acc) -> rabbit_limiter:ack(LimiterPid, CTag), + Acc + end, ok, Acked). is_message_persistent(Content) -> case rabbit_basic:is_message_persistent(Content) of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index cd3ac9c5ce..efe6023b9e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -21,8 +21,8 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). -export([start_link/2]). --export([limit/2, can_send/4, ack/2, register/2, unregister/2]). --export([get_limit/1, set_credit/3, is_blocked/1]). +-export([limit/2, can_send/5, ack/2, register/2, unregister/2]). +-export([get_limit/1, block/1, unblock/1, set_credit/4, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -33,13 +33,13 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/5 :: (maybe_pid(), pid(), boolean(), binary(), non_neg_integer()) -> boolean()). +-spec(ack/2 :: (maybe_pid(), binary()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -%% -spec(block/1 :: (maybe_pid()) -> 'ok'). -%% -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -spec(is_blocked/1 :: (maybe_pid()) -> boolean()). -endif. @@ -48,10 +48,13 @@ -record(lim, {prefetch_count = 0, ch_pid, - credit = unlimited, - drain = false, + blocked = false, + credits = dict:new(), queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). + +-record(credit, {credit = 0, drain = false}). + %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. @@ -70,18 +73,19 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired, _Len) -> +can_send(undefined, _QPid, _AckRequired, _CTag, _Len) -> true; -can_send(LimiterPid, QPid, AckRequired, Len) -> +can_send(LimiterPid, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, Len}, + fun () -> gen_server2:call(LimiterPid, + {can_send, QPid, AckRequired, CTag, Len}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(undefined, _CTag) -> ok; +ack(LimiterPid, CTag) -> gen_server2:cast(LimiterPid, {ack, CTag}). register(undefined, _QPid) -> ok; register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). @@ -96,12 +100,20 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:call(Pid, get_limit, infinity) end). -set_credit(undefined, _, _) -> +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> ok; -set_credit(LimiterPid, -1, Drain) -> - gen_server2:call(LimiterPid, {set_credit, unlimited, Drain}, infinity); -set_credit(LimiterPid, Credit, Drain) -> - gen_server2:call(LimiterPid, {set_credit, Credit, Drain}, infinity). +unblock(LimiterPid) -> + gen_server2:call(LimiterPid, unblock, infinity). + +set_credit(undefined, _, _, _) -> + ok; +set_credit(LimiterPid, CTag, Credit, Drain) -> + gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Drain}, infinity). is_blocked(undefined) -> false; @@ -118,47 +130,47 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired, _Len}, _From, - State = #lim{credit = 0}) -> +handle_call({can_send, _QPid, _AckRequired, _CTag, _Len}, _From, + State = #lim{blocked = true}) -> {reply, false, State}; -handle_call({can_send, QPid, AckRequired, Len}, _From, - State = #lim{volume = Volume, credit = Credit, drain = Drain}) -> - case limit_reached(State) of +handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, + State = #lim{volume = Volume}) -> + case limit_reached(CTag, State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, - State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end, - credit = case {Credit, Len, Drain} of - {unlimited, _, _} -> unlimited; - {_, 1, true} -> 0; - {_, _, _} -> Credit - 1 - end}} + decr_credit(CTag, Len, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(irrelevant, + State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} end; -handle_call({set_credit, Credit, Drain}, _From, State) -> - case maybe_notify(State, State#lim{credit = Credit, drain = Drain}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} - end; +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + maybe_notify_reply(irrelevant, State, State#lim{blocked = false}); + +handle_call({set_credit, CTag, Credit, Drain}, _From, State) -> + maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Drain, State)); handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}. -handle_cast({ack, Count}, State = #lim{volume = Volume}) -> +handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - Count + true -> Volume - 1 end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -180,22 +192,52 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of +maybe_notify_reply(CTag, OldState, NewState) -> + case maybe_notify(CTag, OldState, NewState) of + {cont, State} -> {reply, ok, State}; + {stop, State} -> {stop, normal, stopped, State} + end. + +maybe_notify(CTag, OldState, NewState) -> + case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso + not (limit_reached(CTag, NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case {NewState1#lim.prefetch_count, NewState1#lim.credit} of - {0, unlimited} -> stop; - _ -> cont + {case {NewState1#lim.prefetch_count, + dict:size(NewState1#lim.credits)} of + {0, 0} -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> - Limit =/= 0 andalso Volume >= Limit. +limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, + credits = Credits}) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = 0 }} -> true; + _ -> false + end orelse (Limit =/= 0 andalso Volume >= Limit). + +decr_credit(CTag, Len, State = #lim{ credits = Credits } ) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = Credit, drain = Drain }} -> + NewCredit = case {Len, Drain} of + {1, true} -> 0; + {_, _} -> Credit - 1 + end, + update_credit(CTag, NewCredit, Drain, State); + error -> + State + end. + +update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:erase(CTag, Credits)}; + +update_credit(CTag, Credit, Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:store(CTag, + #credit{credit = Credit, drain = Drain}, + Credits)}. -blocked(#lim{credit = 0}) -> true; -blocked(_) -> false. +blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of |
