summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-14 15:42:00 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-14 15:42:00 +0000
commit245789cfe151110e27db35ffcfd2e20cd2f59f25 (patch)
tree839fc95c2f58090e7f0849c839f137c84cadd061
parent55920f3ff2eefe6965ca87b66445eac14522b0dc (diff)
downloadrabbitmq-server-git-245789cfe151110e27db35ffcfd2e20cd2f59f25.tar.gz
Improve workingness further. We now should be handling multiple blocks for different reasons correctly.
-rw-r--r--src/rabbit_amqqueue_process.erl121
-rw-r--r--src/rabbit_limiter.erl22
2 files changed, 76 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f48005ef95..8878de9c27 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -64,9 +64,18 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% List of consumer tags which have individually been
+ %% blocked by the limiter.
+ blocked_ctags,
+ %% The limiter itself
limiter,
+ %% Has the limiter imposed a channel-wide block, either
+ %% because of qos or channel flow?
is_limit_active,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -355,6 +364,7 @@ ch_record(ChPid) ->
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
+ blocked_ctags = [],
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -402,13 +412,6 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
- end.
-
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
@@ -423,27 +426,38 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
+deliver_msg_to_consumer(DeliverFun,
+ E = {ChPid,
+ Consumer = #consumer{tag = CTag,
+ ack_required = AckReq}},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
- {false, State};
- false -> #cr{limiter = Limiter, ch_pid = ChPid} = C,
- {CanSend, Lim2} =
- rabbit_limiter:can_send(
- Limiter, ChPid, self(), Consumer#consumer.ack_required,
- Consumer#consumer.tag, BQ:len(BQS)),
- case CanSend of
- false -> block_consumer(C#cr{is_limit_active = true,
- limiter = Lim2}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C#cr{limiter = Lim2},
- State#q{active_consumers = AC1})
- end
+ true ->
+ block_consumer(C, E),
+ {false, State};
+ false ->
+ #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
+ case rabbit_limiter:can_cons_send(
+ Limiter, ChPid, CTag, BQ:len(BQS)) of
+ {false, Lim2} ->
+ %% TODO unify with first case?
+ block_consumer(C#cr{limiter = Lim2,
+ blocked_ctags = [CTag | BCTags]}, E),
+ {false, State};
+ {true, Lim2} ->
+ case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of
+ false ->
+ block_consumer(C#cr{is_limit_active = true}, E),
+ {false, State};
+ true ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Lim2},
+ State#q{active_consumers = AC1})
+ end
+ end
end.
deliver_msg_to_consumer(DeliverFun,
@@ -601,16 +615,20 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
+ C1 = #cr{blocked_ctags = BCTags1} = Update(C),
+ {Blocked, Unblocked} =
+ lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ is_ch_blocked(C1) orelse lists:member(CTag, BCTags1)
+ end, queue:to_list(C1#cr.blocked_consumers)),
+ case Unblocked of
+ [] -> update_ch_record(C1),
+ State;
+ _ -> update_ch_record(
+ C1#cr{blocked_consumers = queue:from_list(Blocked)}),
+ AC1 = queue:join(State#q.active_consumers,
+ queue:from_list(Unblocked)),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -662,8 +680,6 @@ check_exclusive_access(none, true, State) ->
consumer_count() -> consumer_count(fun (_) -> false end).
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
consumer_count(Exclude) ->
lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
not Exclude(C)]).
@@ -909,8 +925,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
+i(active_consumers, #q{active_consumers = ActiveConsumers}) ->
+ queue:len(ActiveConsumers);
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1124,9 +1140,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{active_consumers = AC,
+ backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_msgs(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ reply({ok, BQ:len(BQS), queue:len(AC)}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1314,26 +1331,16 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({inform_limiter, ChPid, Msg},
- State = #q{active_consumers = AC,
- backing_queue = BQ,
+ State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- C = #cr{limiter = Limiter,
- blocked_consumers = Blocked} = ch_record(ChPid),
+ #cr{limiter = Limiter,
+ blocked_ctags = BCTags} = ch_record(ChPid),
{Unblock, Limiter2} =
rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
- NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
- not lists:member(CTag, Unblock)
- end, Blocked),
- NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
- lists:member(CTag, Unblock)
- end, Blocked),
- %% TODO can this whole thing be replaced by possibly_unblock?
- %% TODO that is_limit_active = false thing is wrong - but we do
- %% not allow for per-consumer blocking!
- update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked,
- is_limit_active = false}),
- AC1 = queue:join(NewUnblocked, AC),
- noreply(run_message_queue(State#q{active_consumers = AC1}));
+ noreply(possibly_unblock(
+ State, ChPid,
+ fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
+ limiter = Limiter2} end));
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index f031db517f..9da1bc6f7c 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,7 +24,8 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/6, ack/2, register/2, unregister/2]).
+-export([limit/2, can_ch_send/3, can_cons_send/4,
+ ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
-export([inform/4]).
@@ -47,6 +48,7 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
+-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()).
%% TODO
%% -spec(can_send/5 :: (token(), pid(), boolean(),
%% rabbit_types:ctag(), non_neg_integer()) -> boolean()).
@@ -98,18 +100,18 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true, q_state = QState} = Token,
- ChPid, QPid, AckRequired, CTag, Len) ->
+can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
- fun () -> {true, Token} end,
+ fun () -> true end,
fun () ->
- CanLim = gen_server2:call(Pid, {can_send, QPid, AckRequired},
- infinity),
- {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState),
- {CanLim andalso CanQ, Token#token{q_state = NewQState}}
+ gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
end);
-can_send(Token, _, _, _, _, _) ->
- {true, Token}.
+can_ch_send(_, _, _) ->
+ true.
+
+can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
+ {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState),
+ {CanQ, Token#token{q_state = NewQState}}.
%% Let the limiter know that the channel has received some acks from a
%% consumer