diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 46 |
3 files changed, 60 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3418c663f4..b74b9034b9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1075,7 +1075,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> true -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, + NewLimited = Limited andalso LimiterPid =/= undefined + andalso rabbit_limiter:is_blocked(LimiterPid), C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff3e..eb634cca13 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -462,6 +462,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +%% TODO port this queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case dict:find(QPid, Blocking) of error -> State; @@ -1003,31 +1004,46 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of - ok -> LimiterPid; - stopped -> unlimit_queues(State) - end, - {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter_pid = LimiterPid1}}; +handle_method(#'channel.flow'{active = true}, Content, State) -> + {noreply, State1 = #ch{writer_pid = WriterPid}} = + handle_method(#'channel.credit'{credit = -1, drain = true}, + Content, State), + ok = rabbit_writer:send_command(WriterPid, + #'channel.flow_ok'{active = true}), + {noreply, State1}; -handle_method(#'channel.flow'{active = false}, _, - State = #ch{limiter_pid = LimiterPid, +handle_method(#'channel.flow'{active = false}, Content, State) -> + {noreply, State1 = #ch{writer_pid = WriterPid}} = + handle_method(#'channel.credit'{credit = 0, drain = true}, + Content, State), + ok = rabbit_writer:send_command(WriterPid, + #'channel.flow_ok'{active = false}), + {noreply, State1}; + +handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _, + State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> LimiterPid1 = case LimiterPid of undefined -> start_limiter(State); Other -> Other end, - State1 = State#ch{limiter_pid = LimiterPid1}, - ok = rabbit_limiter:block(LimiterPid1), - case consumer_queues(Consumers) of - [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || - QPid <- QPids], - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = dict:from_list(Queues)}} - end; + LimiterPid2 = + case rabbit_limiter:set_credit(LimiterPid1, Credit, Drain) of + ok -> limit_queues(LimiterPid1, State), + LimiterPid1; + stopped -> unlimit_queues(State) + end, + State1 = State#ch{limiter_pid = LimiterPid2}, + {noreply, State1}; + + %% TODO port this bit + %% case consumer_queues(Consumers) of + %% [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + %% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + %% QPid <- QPids], + %% ok = rabbit_amqqueue:flush_all(QPids, self()), + %% {noreply, State1#ch{blocking = dict:from_list(Queues)}} + %% end; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282d9..bf9cf583e4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -22,7 +22,7 @@ handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([get_limit/1, set_credit/3, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -38,8 +38,8 @@ -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). --spec(block/1 :: (maybe_pid()) -> 'ok'). --spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). +%% -spec(block/1 :: (maybe_pid()) -> 'ok'). +%% -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -spec(is_blocked/1 :: (maybe_pid()) -> boolean()). -endif. @@ -48,7 +48,8 @@ -record(lim, {prefetch_count = 0, ch_pid, - blocked = false, + credit = unlimited, + drain = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -95,15 +96,12 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:call(Pid, get_limit, infinity) end). -block(undefined) -> +set_credit(undefined, _, _) -> ok; -block(LimiterPid) -> - gen_server2:call(LimiterPid, block, infinity). - -unblock(undefined) -> - ok; -unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). +set_credit(LimiterPid, -1, Drain) -> + gen_server2:call(LimiterPid, {set_credit, unlimited, Drain}, infinity); +set_credit(LimiterPid, Credit, Drain) -> + gen_server2:call(LimiterPid, {set_credit, Credit, Drain}, infinity). is_blocked(undefined) -> false; @@ -121,14 +119,18 @@ prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. handle_call({can_send, _QPid, _AckRequired}, _From, - State = #lim{blocked = true}) -> + State = #lim{credit = 0}) -> {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, - State = #lim{volume = Volume}) -> + State = #lim{volume = Volume, credit = Credit}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume + end, + credit = case Credit of + unlimited -> unlimited; + _ -> Credit - 1 end}} end; @@ -141,11 +143,8 @@ handle_call({limit, PrefetchCount}, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end; -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call(unblock, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of +handle_call({set_credit, Credit, Drain}, _From, State) -> + case maybe_notify(State, State#lim{credit = Credit, drain = Drain}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} end; @@ -183,9 +182,9 @@ maybe_notify(OldState, NewState) -> case (limit_reached(OldState) orelse blocked(OldState)) andalso not (limit_reached(NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont + {case {NewState1#lim.prefetch_count, NewState1#lim.credit} of + {0, unlimited} -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. @@ -193,7 +192,8 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -blocked(#lim{blocked = Blocked}) -> Blocked. +blocked(#lim{credit = 0}) -> true; +blocked(_) -> false. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of |
