summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-19 11:10:40 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-19 11:10:40 +0000
commit8614b6e32381b6b499483d689afd4db6c9bc3856 (patch)
treec79a28c8b22a3ee719f4427e688a62788b53e75c /src
parent7c25a669cc01dd7c51941ef0b48b8a4b4a3cfd8b (diff)
downloadrabbitmq-server-git-8614b6e32381b6b499483d689afd4db6c9bc3856.tar.gz
limiter API revision, part 1/2 - channel-side API
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_channel.erl68
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_limiter.erl207
5 files changed, 139 insertions, 161 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 82ac74fac5..bd5de23923 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -144,7 +144,7 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:lstate()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 18b641d4f7..0ddc9eba11 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -372,7 +372,7 @@ ch_record(ChPid) ->
consumer_count = 0,
blocked_consumers = queue:new(),
is_limit_active = false,
- limiter = rabbit_limiter:make_token(),
+ limiter = undefined,
unsent_message_count = 0},
put(Key, C),
C;
@@ -395,18 +395,17 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
erase_ch_record(#cr{ch_pid = ChPid,
limiter = Limiter,
monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
+ ok = rabbit_limiter:unregister(Limiter),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter, self()),
+ ok = rabbit_limiter:register(Limiter),
update_ch_record(C#cr{consumer_count = 1});
update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 0,
- limiter = rabbit_limiter:make_token()});
+ ok = rabbit_limiter:unregister(Limiter),
+ update_ch_record(C#cr{consumer_count = 0, limiter = undefined});
update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
update_ch_record(C#cr{consumer_count = Count + Delta}).
@@ -444,7 +443,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
{false, State};
@@ -1308,11 +1307,11 @@ handle_cast({limit, ChPid, Limiter}, State) ->
limiter = OldLimiter,
is_limit_active = OldLimited}) ->
case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_enabled(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter, self());
+ not rabbit_limiter:is_active(OldLimiter)) of
+ true -> ok = rabbit_limiter:register(Limiter);
false -> ok
end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
+ Limited = OldLimited andalso rabbit_limiter:is_active(Limiter),
C#cr{limiter = Limiter, is_limit_active = Limited}
end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 792a06c908..cda5747ad4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -82,7 +82,7 @@
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(),
- pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
+ pid(), pid()) -> rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -180,7 +180,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, Limiter]) ->
+ Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
@@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = Limiter,
+ limiter = rabbit_limiter:new(LimiterPid),
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -804,18 +804,10 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
- State = #ch{limiter = Limiter}) ->
- Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
- {false, 0} -> Limiter;
- {false, _} -> enable_limiter(State);
- {_, _} -> Limiter
- end,
- Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
- ok -> Limiter1;
- {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
- Limiter2
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
+ State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+ Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)),
+ {reply, #'basic.qos_ok'{},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
@@ -1078,25 +1070,23 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter = Limiter}) ->
- Limiter2 = case rabbit_limiter:unblock(Limiter) of
- ok -> Limiter;
- {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
- Limiter1
- end,
- {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
+ Limiter1 = rabbit_limiter:unblock(Limiter),
+ {reply, #'channel.flow_ok'{active = true},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{consumer_mapping = Consumers,
limiter = Limiter}) ->
- Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
- true -> Limiter;
- false -> enable_limiter(State)
- end,
- State1 = State#ch{limiter = Limiter1},
- ok = rabbit_limiter:block(Limiter1),
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
+ case rabbit_limiter:is_blocked(Limiter) of
+ true -> {noreply, maybe_send_flow_ok(State)};
+ false -> Limiter1 = rabbit_limiter:block(Limiter),
+ State1 = maybe_limit_queues(Limiter, Limiter1,
+ State#ch{limiter = Limiter1}),
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(
+ State1#ch{blocking = sets:from_list(QPids)})}
+ end;
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1332,14 +1322,14 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-enable_limiter(State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
- ok = limit_queues(Limiter1, State),
- Limiter1.
-
-limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
+maybe_limit_queues(OldLimiter, NewLimiter, State) ->
+ case ((not rabbit_limiter:is_active(OldLimiter)) andalso
+ rabbit_limiter:is_active(NewLimiter)) of
+ true -> Queues = consumer_queues(State#ch.consumer_mapping),
+ rabbit_amqqueue:limit_all(Queues, self(), NewLimiter);
+ false -> ok
+ end,
+ State.
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1350,7 +1340,7 @@ consumer_queues(Consumers) ->
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
notify_limiter(Limiter, Acked) ->
- case rabbit_limiter:is_enabled(Limiter) of
+ case rabbit_limiter:is_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8ea44a8179..a0c7624b4f 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
@@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a7d14fe0b..ae6563286e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -18,38 +18,43 @@
-behaviour(gen_server2).
+-export([start_link/0]).
+-export([new/1, limit/3, unlimit/1, block/1, unblock/1,
+ is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2]).
+-export([can_send/2, register/1, unregister/1]).
+
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
--export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
- disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(lstate, {pid, limited, blocked}).
-ifdef(use_specs).
--export_type([token/0]).
+-export_type([lstate/0]).
--opaque(token() :: #token{}).
+-opaque(lstate() :: #lstate {pid :: pid(),
+ limited :: boolean(),
+ blocked :: boolean()}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(make_token/0 :: () -> token()).
--spec(make_token/1 :: ('undefined' | pid()) -> token()).
--spec(is_enabled/1 :: (token()) -> boolean()).
--spec(enable/2 :: (token(), non_neg_integer()) -> token()).
--spec(disable/1 :: (token()) -> token()).
--spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
--spec(register/2 :: (token(), pid()) -> 'ok').
--spec(unregister/2 :: (token(), pid()) -> 'ok').
--spec(get_limit/1 :: (token()) -> non_neg_integer()).
--spec(block/1 :: (token()) -> 'ok').
--spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
--spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(new/1 :: (pid()) -> lstate()).
+
+-spec(limit/3 :: (lstate(), non_neg_integer(), non_neg_integer()) ->
+ lstate()).
+-spec(unlimit/1 :: (lstate()) -> lstate()).
+-spec(block/1 :: (lstate()) -> lstate()).
+-spec(unblock/1 :: (lstate()) -> lstate()).
+-spec(is_limited/1 :: (lstate()) -> boolean()).
+-spec(is_blocked/1 :: (lstate()) -> boolean()).
+-spec(is_active/1 :: (lstate()) -> boolean()).
+-spec(get_limit/1 :: (lstate()) -> non_neg_integer()).
+-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+
+-spec(can_send/2 :: (lstate(), boolean()) -> boolean()).
+-spec(register/1 :: (lstate()) -> 'ok').
+-spec(unregister/1 :: (lstate()) -> 'ok').
-endif.
@@ -70,65 +75,95 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
-make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+new(Pid) ->
+ %% this a 'call' to ensure that it is invoked at most once.
+ ok = gen_server:call(Pid, {new, self()}),
+ #lstate{pid = Pid, limited = false, blocked = false}.
-is_enabled(#token{enabled = Enabled}) -> Enabled.
+limit(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
+ ok = gen_server:call(L#lstate.pid, {limit, PrefetchCount, UnackedCount}),
+ L#lstate{limited = true}.
-enable(#token{pid = Pid} = Token, Volume) ->
- gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity).
+unlimit(L) ->
+ ok = gen_server:call(L#lstate.pid, unlimit),
+ L#lstate{limited = false}.
-disable(#token{pid = Pid} = Token) ->
- gen_server2:call(Pid, {disable, Token}, infinity).
+block(L) ->
+ ok = gen_server:call(L#lstate.pid, block),
+ L#lstate{blocked = true}.
-limit(Limiter, PrefetchCount) ->
- maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok).
+unblock(L) ->
+ ok = gen_server:call(L#lstate.pid, unblock),
+ L#lstate{blocked = false}.
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit. Note that we don't use maybe_call here in order
-%% to avoid always going through with_exit_handler/2, even when the
-%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
- rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end);
-can_send(_, _, _) ->
- true.
+is_limited(#lstate{limited = Limited}) -> Limited.
-%% Let the limiter know that the channel has received some acks from a
-%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
+is_active(L) -> is_limited(L) orelse is_blocked(L).
-unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}).
+get_limit(#lstate{limited = false}) -> 0;
+get_limit(L) -> gen_server:call(L#lstate.pid, get_limit).
-get_limit(Limiter) ->
- rabbit_misc:with_exit_handler(
- fun () -> 0 end,
- fun () -> maybe_call(Limiter, get_limit, 0) end).
+ack(#lstate{limited = false}, _AckCount) -> ok;
+ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
-block(Limiter) ->
- maybe_call(Limiter, block, ok).
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit.
+can_send(L, AckRequired) ->
+ case is_active(L) of
+ false -> true;
+ true -> rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () -> Msg = {can_send, self(), AckRequired},
+ gen_server2:call(L#lstate.pid, Msg, infinity)
+ end)
+ end.
-unblock(Limiter) ->
- maybe_call(Limiter, {unblock, Limiter}, ok).
+register(L) ->
+ case is_active(L) of
+ false -> ok;
+ true -> gen_server:cast(L#lstate.pid, {register, self()})
+ end.
-is_blocked(Limiter) ->
- maybe_call(Limiter, is_blocked, false).
+unregister(L) ->
+ case is_active(L) of
+ false -> ok;
+ true -> gen_server:cast(L#lstate.pid, {unregister, self()})
+ end.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #lim{}}.
+init([]) -> {ok, #lim{}}.
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
+handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
+ {reply, ok, State#lim{ch_pid = ChPid}};
+
+handle_call({limit, PrefetchCount, UnackedCount}, _From, State) ->
+ %% assertion
+ true = State#lim.prefetch_count == 0 orelse
+ State#lim.volume == UnackedCount,
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount,
+ volume = UnackedCount})};
+
+handle_call(unlimit, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
+ volume = 0})};
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{blocked = false})};
+
+handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State};
+
handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
@@ -139,45 +174,13 @@ handle_call({can_send, QPid, AckRequired}, _From,
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end;
-
-handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State};
-
-handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(is_blocked, _From, State) ->
- {reply, blocked(State), State};
-
-handle_call({enable, Token, Channel, Volume}, _From, State) ->
- {reply, Token#token{enabled = true},
- State#lim{ch_pid = Channel, volume = Volume}};
-handle_call({disable, Token}, _From, State) ->
- {reply, Token#token{enabled = false}, State}.
+ end.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
- {noreply, State1};
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -201,24 +204,10 @@ code_change(_, State, _) ->
maybe_notify(OldState, NewState) ->
case (limit_reached(OldState) orelse blocked(OldState)) andalso
not (limit_reached(NewState) orelse blocked(NewState)) of
- true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
- end, NewState1};
- false -> {cont, NewState}
+ true -> notify_queues(NewState);
+ false -> NewState
end.
-maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) ->
- gen_server2:call(Pid, Call, infinity);
-maybe_call(_, _Call, Default) ->
- Default.
-
-maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
- gen_server2:cast(Pid, Cast);
-maybe_cast(_, _Call) ->
- ok.
-
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.