diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 35 |
2 files changed, 65 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9b6c87d8cb..38a6a8447f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -542,23 +542,15 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - unacked_message_q = UAMQ }) -> + _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, _} -> - LimiterPid + {undefined, 0} -> undefined; + {undefined, _} -> start_limiter(State); + {_, _} -> LimiterPid end, LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of ok -> LimiterPid1; - stopped -> ok = limit_queues(undefined, State), - undefined + stopped -> unlimit_queues(State) end, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; @@ -793,9 +785,24 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'channel.flow'{active = _}, _, State) -> - %% FIXME: implement - {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; + +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case LimiterPid =:= undefined of + true -> start_limiter(State); + false -> LimiterPid + end, + ok = rabbit_limiter:block(LimiterPid1), + {reply, #'channel.flow_ok'{active = false}, + State#ch{limiter_pid = LimiterPid1}}; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we @@ -942,9 +949,18 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). +start_limiter(State = #ch{unacked_message_q = UAMQ}) -> + LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid. + notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). +unlimit_queues(State) -> + ok = limit_queues(undefined, State), + undefined. + limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d998499d76..43f31511c9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,8 @@ -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -112,6 +115,16 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + gen_server2:call(LimiterPid, unblock, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -119,6 +132,9 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of @@ -131,11 +147,23 @@ handle_call({can_send, QPid, AckRequired}, _From, handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; +handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) -> + {reply, ok, State#lim{prefetch_count = PrefetchCount}}; handle_call({limit, PrefetchCount}, _From, State) -> State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}), case PrefetchCount == 0 of true -> {stop, normal, stopped, State1}; false -> {reply, ok, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) -> + State1 = maybe_notify(State, State#lim{blocked = false}), + case PrefetchCount == 0 of + true -> {stop, normal, stopped, State1}; + false -> {reply, ok, State1} end. handle_cast(shutdown, State) -> @@ -167,7 +195,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of + case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse + (is_blocked(OldState) andalso not is_blocked(NewState)) of true -> notify_queues(NewState); false -> NewState end. @@ -175,6 +204,8 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), |
