diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-21 16:44:00 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-21 16:44:00 +0000 |
| commit | eaac22a45dc5fa2c64ed54c80e3fc6e5eaade901 (patch) | |
| tree | 442099cf5bfb796aa496f0275e8d8070a408751d /src | |
| parent | 6c6d8477623f543cb461a045f008a9fcbc320629 (diff) | |
| download | rabbitmq-server-git-eaac22a45dc5fa2c64ed54c80e3fc6e5eaade901.tar.gz | |
Made set_prefetch_count into a proper gen_server call
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 34 |
2 files changed, 21 insertions, 15 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3306d6f60a..c6108489e4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -410,7 +410,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> - Limiter ! {prefetch_count, PrefetchCount}, + rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index abca7ce1b1..fbce5ea4bf 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,7 +10,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([can_send/2, decrement_capacity/2]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -25,6 +25,9 @@ start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. +set_prefetch_count(LimiterPid, PrefetchCount) -> + gen_server:call(LimiterPid, {prefetch_count, PrefetchCount}). + % Queries the limiter to ask whether the queue can deliver a message % without breaching a limit can_send(LimiterPid, QPid) -> @@ -48,7 +51,20 @@ handle_call({can_send, QPid}, _From, State) -> case limit_reached(State) of true -> {reply, false, State}; false -> {reply, true, update_in_use_capacity(QPid, State)} - end. + end; + +% When the new limit is larger than the existing limit, +% notify all queues and forget about queues with an in-use +% capcity of zero +handle_call({prefetch_count, PrefetchCount}, _From, + State = #lim{prefetch_count = CurrentLimit}) + when PrefetchCount > CurrentLimit -> + % TODO implement this requirement + {reply, ok, State#lim{prefetch_count = PrefetchCount}}; + +% Default setter of the prefetch count +handle_call({prefetch_count, PrefetchCount}, _From, State) -> + {reply, ok, State#lim{prefetch_count = PrefetchCount}}. % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue @@ -62,18 +78,8 @@ handle_cast({decrement_capacity, QPid}, State) -> end, {noreply, NewState}. -% When the new limit is larger than the existing limit, -% notify all queues and forget about queues with an in-use -% capcity of zero -handle_info({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = CurrentLimit}) - when PrefetchCount > CurrentLimit -> - % TODO implement this requirement - {noreply, State#lim{prefetch_count = PrefetchCount}}; - -% Default setter of the prefetch count -handle_info({prefetch_count, PrefetchCount}, State) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}. +handle_info(_, State) -> + {noreply, State}. terminate(_, _) -> ok. |
