summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-02 12:04:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-02 12:04:07 +0000
commit6178c2747d7b2a0a747f1d4e58a917dd7afeb12f (patch)
treecfa12635b6110403a66dcc9091d963ff2647e4a2
parent7c9bf8899d82442eaf759116005f4cab2e214815 (diff)
downloadrabbitmq-server-git-6178c2747d7b2a0a747f1d4e58a917dd7afeb12f.tar.gz
Credit needs to be per ctag.
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl57
-rw-r--r--src/rabbit_limiter.erl142
3 files changed, 127 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9f497f3d21..9fda12cdc3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -348,7 +348,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
rabbit_limiter:can_send( LimiterPid, self(), AckRequired,
- BQ:len(BQS) )) of
+ ConsumerTag, BQ:len(BQS) )) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eb634cca13..bac106f993 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1004,23 +1004,34 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true}, Content, State) ->
- {noreply, State1 = #ch{writer_pid = WriterPid}} =
- handle_method(#'channel.credit'{credit = -1, drain = true},
- Content, State),
- ok = rabbit_writer:send_command(WriterPid,
- #'channel.flow_ok'{active = true}),
- {noreply, State1};
+handle_method(#'channel.flow'{active = true}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
+ ok -> LimiterPid;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'channel.flow_ok'{active = true},
+ State#ch{limiter_pid = LimiterPid1}};
-handle_method(#'channel.flow'{active = false}, Content, State) ->
- {noreply, State1 = #ch{writer_pid = WriterPid}} =
- handle_method(#'channel.credit'{credit = 0, drain = true},
- Content, State),
- ok = rabbit_writer:send_command(WriterPid,
- #'channel.flow_ok'{active = false}),
- {noreply, State1};
+handle_method(#'channel.flow'{active = false}, _,
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
+ LimiterPid1 = case LimiterPid of
+ undefined -> start_limiter(State);
+ Other -> Other
+ end,
+ State1 = State#ch{limiter_pid = LimiterPid1},
+ ok = rabbit_limiter:block(LimiterPid1),
+ case consumer_queues(Consumers) of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
+ QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ end;
-handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _,
+handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit,
+ drain = Drain}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
LimiterPid1 = case LimiterPid of
@@ -1028,7 +1039,7 @@ handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _,
Other -> Other
end,
LimiterPid2 =
- case rabbit_limiter:set_credit(LimiterPid1, Credit, Drain) of
+ case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Drain) of
ok -> limit_queues(LimiterPid1, State),
LimiterPid1;
stopped -> unlimit_queues(State)
@@ -1036,7 +1047,7 @@ handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _,
State1 = State#ch{limiter_pid = LimiterPid2},
{noreply, State1};
- %% TODO port this bit
+ %% TODO port this bit ?
%% case consumer_queues(Consumers) of
%% [] -> {reply, #'channel.flow_ok'{active = false}, State1};
%% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
@@ -1237,12 +1248,12 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
- 0 -> ok;
- Count -> rabbit_limiter:ack(LimiterPid, Count)
- end.
+ %% TODO this could be faster, group the acks
+ rabbit_misc:queue_fold(
+ fun ({_, none, _}, Acc) -> Acc;
+ ({_, CTag, _}, Acc) -> rabbit_limiter:ack(LimiterPid, CTag),
+ Acc
+ end, ok, Acked).
is_message_persistent(Content) ->
case rabbit_basic:is_message_persistent(Content) of
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index cd3ac9c5ce..efe6023b9e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -21,8 +21,8 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
--export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
--export([get_limit/1, set_credit/3, is_blocked/1]).
+-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
+-export([get_limit/1, block/1, unblock/1, set_credit/4, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -33,13 +33,13 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) ->
rabbit_types:ok_pid_or_error()).
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
--spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> boolean()).
--spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/5 :: (maybe_pid(), pid(), boolean(), binary(), non_neg_integer()) -> boolean()).
+-spec(ack/2 :: (maybe_pid(), binary()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-%% -spec(block/1 :: (maybe_pid()) -> 'ok').
-%% -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-spec(is_blocked/1 :: (maybe_pid()) -> boolean()).
-endif.
@@ -48,10 +48,13 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- credit = unlimited,
- drain = false,
+ blocked = false,
+ credits = dict:new(),
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
+
+-record(credit, {credit = 0, drain = false}).
+
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
@@ -70,18 +73,19 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid, _AckRequired, _Len) ->
+can_send(undefined, _QPid, _AckRequired, _CTag, _Len) ->
true;
-can_send(LimiterPid, QPid, AckRequired, Len) ->
+can_send(LimiterPid, QPid, AckRequired, CTag, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, Len},
+ fun () -> gen_server2:call(LimiterPid,
+ {can_send, QPid, AckRequired, CTag, Len},
infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(undefined, _Count) -> ok;
-ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
+ack(undefined, _CTag) -> ok;
+ack(LimiterPid, CTag) -> gen_server2:cast(LimiterPid, {ack, CTag}).
register(undefined, _QPid) -> ok;
register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
@@ -96,12 +100,20 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:call(Pid, get_limit, infinity) end).
-set_credit(undefined, _, _) ->
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
ok;
-set_credit(LimiterPid, -1, Drain) ->
- gen_server2:call(LimiterPid, {set_credit, unlimited, Drain}, infinity);
-set_credit(LimiterPid, Credit, Drain) ->
- gen_server2:call(LimiterPid, {set_credit, Credit, Drain}, infinity).
+unblock(LimiterPid) ->
+ gen_server2:call(LimiterPid, unblock, infinity).
+
+set_credit(undefined, _, _, _) ->
+ ok;
+set_credit(LimiterPid, CTag, Credit, Drain) ->
+ gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Drain}, infinity).
is_blocked(undefined) ->
false;
@@ -118,47 +130,47 @@ init([ChPid, UnackedMsgCount]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, _QPid, _AckRequired, _Len}, _From,
- State = #lim{credit = 0}) ->
+handle_call({can_send, _QPid, _AckRequired, _CTag, _Len}, _From,
+ State = #lim{blocked = true}) ->
{reply, false, State};
-handle_call({can_send, QPid, AckRequired, Len}, _From,
- State = #lim{volume = Volume, credit = Credit, drain = Drain}) ->
- case limit_reached(State) of
+handle_call({can_send, QPid, AckRequired, CTag, Len}, _From,
+ State = #lim{volume = Volume}) ->
+ case limit_reached(CTag, State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true,
- State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end,
- credit = case {Credit, Len, Drain} of
- {unlimited, _, _} -> unlimited;
- {_, 1, true} -> 0;
- {_, _, _} -> Credit - 1
- end}}
+ decr_credit(CTag, Len,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end})}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
handle_call({limit, PrefetchCount}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ case maybe_notify(irrelevant,
+ State, State#lim{prefetch_count = PrefetchCount}) of
{cont, State1} -> {reply, ok, State1};
{stop, State1} -> {stop, normal, stopped, State1}
end;
-handle_call({set_credit, Credit, Drain}, _From, State) ->
- case maybe_notify(State, State#lim{credit = Credit, drain = Drain}) of
- {cont, State1} -> {reply, ok, State1};
- {stop, State1} -> {stop, normal, stopped, State1}
- end;
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ maybe_notify_reply(irrelevant, State, State#lim{blocked = false});
+
+handle_call({set_credit, CTag, Credit, Drain}, _From, State) ->
+ maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Drain, State));
handle_call(is_blocked, _From, State) ->
{reply, blocked(State), State}.
-handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+handle_cast({ack, CTag}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
- true -> Volume - Count
+ true -> Volume - 1
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}),
{noreply, State1};
handle_cast({register, QPid}, State) ->
@@ -180,22 +192,52 @@ code_change(_, State, _) ->
%% Internal plumbing
%%----------------------------------------------------------------------------
-maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
+maybe_notify_reply(CTag, OldState, NewState) ->
+ case maybe_notify(CTag, OldState, NewState) of
+ {cont, State} -> {reply, ok, State};
+ {stop, State} -> {stop, normal, stopped, State}
+ end.
+
+maybe_notify(CTag, OldState, NewState) ->
+ case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(CTag, NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
- {case {NewState1#lim.prefetch_count, NewState1#lim.credit} of
- {0, unlimited} -> stop;
- _ -> cont
+ {case {NewState1#lim.prefetch_count,
+ dict:size(NewState1#lim.credits)} of
+ {0, 0} -> stop;
+ _ -> cont
end, NewState1};
false -> {cont, NewState}
end.
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
- Limit =/= 0 andalso Volume >= Limit.
+limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
+ credits = Credits}) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = 0 }} -> true;
+ _ -> false
+ end orelse (Limit =/= 0 andalso Volume >= Limit).
+
+decr_credit(CTag, Len, State = #lim{ credits = Credits } ) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = Credit, drain = Drain }} ->
+ NewCredit = case {Len, Drain} of
+ {1, true} -> 0;
+ {_, _} -> Credit - 1
+ end,
+ update_credit(CTag, NewCredit, Drain, State);
+ error ->
+ State
+ end.
+
+update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:erase(CTag, Credits)};
+
+update_credit(CTag, Credit, Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:store(CTag,
+ #credit{credit = Credit, drain = Drain},
+ Credits)}.
-blocked(#lim{credit = 0}) -> true;
-blocked(_) -> false.
+blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of