summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-11-21 16:44:00 +0000
committerBen Hood <0x6e6562@gmail.com>2008-11-21 16:44:00 +0000
commiteaac22a45dc5fa2c64ed54c80e3fc6e5eaade901 (patch)
tree442099cf5bfb796aa496f0275e8d8070a408751d /src
parent6c6d8477623f543cb461a045f008a9fcbc320629 (diff)
downloadrabbitmq-server-git-eaac22a45dc5fa2c64ed54c80e3fc6e5eaade901.tar.gz
Made set_prefetch_count into a proper gen_server call
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_limiter.erl34
2 files changed, 21 insertions, 15 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3306d6f60a..c6108489e4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -410,7 +410,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter}) ->
- Limiter ! {prefetch_count, PrefetchCount},
+ rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount),
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.recover'{requeue = true},
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index abca7ce1b1..fbce5ea4bf 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -10,7 +10,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1]).
--export([can_send/2, decrement_capacity/2]).
+-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
-record(lim, {prefetch_count = 0,
ch_pid,
@@ -25,6 +25,9 @@ start_link(ChPid) ->
{ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
Pid.
+set_prefetch_count(LimiterPid, PrefetchCount) ->
+ gen_server:call(LimiterPid, {prefetch_count, PrefetchCount}).
+
% Queries the limiter to ask whether the queue can deliver a message
% without breaching a limit
can_send(LimiterPid, QPid) ->
@@ -48,7 +51,20 @@ handle_call({can_send, QPid}, _From, State) ->
case limit_reached(State) of
true -> {reply, false, State};
false -> {reply, true, update_in_use_capacity(QPid, State)}
- end.
+ end;
+
+% When the new limit is larger than the existing limit,
+% notify all queues and forget about queues with an in-use
+% capcity of zero
+handle_call({prefetch_count, PrefetchCount}, _From,
+ State = #lim{prefetch_count = CurrentLimit})
+ when PrefetchCount > CurrentLimit ->
+ % TODO implement this requirement
+ {reply, ok, State#lim{prefetch_count = PrefetchCount}};
+
+% Default setter of the prefetch count
+handle_call({prefetch_count, PrefetchCount}, _From, State) ->
+ {reply, ok, State#lim{prefetch_count = PrefetchCount}}.
% This is an asynchronous ack from a queue that it has received an ack from
% a queue. This allows the limiter to update the the in-use-by-that queue
@@ -62,18 +78,8 @@ handle_cast({decrement_capacity, QPid}, State) ->
end,
{noreply, NewState}.
-% When the new limit is larger than the existing limit,
-% notify all queues and forget about queues with an in-use
-% capcity of zero
-handle_info({prefetch_count, PrefetchCount},
- State = #lim{prefetch_count = CurrentLimit})
- when PrefetchCount > CurrentLimit ->
- % TODO implement this requirement
- {noreply, State#lim{prefetch_count = PrefetchCount}};
-
-% Default setter of the prefetch count
-handle_info({prefetch_count, PrefetchCount}, State) ->
- {noreply, State#lim{prefetch_count = PrefetchCount}}.
+handle_info(_, State) ->
+ {noreply, State}.
terminate(_, _) ->
ok.