diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 15:42:00 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-14 15:42:00 +0000 |
| commit | 245789cfe151110e27db35ffcfd2e20cd2f59f25 (patch) | |
| tree | 839fc95c2f58090e7f0849c839f137c84cadd061 | |
| parent | 55920f3ff2eefe6965ca87b66445eac14522b0dc (diff) | |
| download | rabbitmq-server-git-245789cfe151110e27db35ffcfd2e20cd2f59f25.tar.gz | |
Improve workingness further. We now should be handling multiple blocks for different reasons correctly.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 22 |
2 files changed, 76 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f48005ef95..8878de9c27 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -64,9 +64,18 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% List of consumer tags which have individually been + %% blocked by the limiter. + blocked_ctags, + %% The limiter itself limiter, + %% Has the limiter imposed a channel-wide block, either + %% because of qos or channel flow? is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -355,6 +364,7 @@ ch_record(ChPid) -> acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), + blocked_ctags = [], is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -402,13 +412,6 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok - end. - deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, @@ -423,27 +426,38 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, +deliver_msg_to_consumer(DeliverFun, + E = {ChPid, + Consumer = #consumer{tag = CTag, + ack_required = AckReq}}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> C = ch_record(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), - {false, State}; - false -> #cr{limiter = Limiter, ch_pid = ChPid} = C, - {CanSend, Lim2} = - rabbit_limiter:can_send( - Limiter, ChPid, self(), Consumer#consumer.ack_required, - Consumer#consumer.tag, BQ:len(BQS)), - case CanSend of - false -> block_consumer(C#cr{is_limit_active = true, - limiter = Lim2}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C#cr{limiter = Lim2}, - State#q{active_consumers = AC1}) - end + true -> + block_consumer(C, E), + {false, State}; + false -> + #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, + case rabbit_limiter:can_cons_send( + Limiter, ChPid, CTag, BQ:len(BQS)) of + {false, Lim2} -> + %% TODO unify with first case? + block_consumer(C#cr{limiter = Lim2, + blocked_ctags = [CTag | BCTags]}, E), + {false, State}; + {true, Lim2} -> + case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of + false -> + block_consumer(C#cr{is_limit_active = true}, E), + {false, State}; + true -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Lim2}, + State#q{active_consumers = AC1}) + end + end end. deliver_msg_to_consumer(DeliverFun, @@ -601,16 +615,20 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) + C1 = #cr{blocked_ctags = BCTags1} = Update(C), + {Blocked, Unblocked} = + lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + is_ch_blocked(C1) orelse lists:member(CTag, BCTags1) + end, queue:to_list(C1#cr.blocked_consumers)), + case Unblocked of + [] -> update_ch_record(C1), + State; + _ -> update_ch_record( + C1#cr{blocked_consumers = queue:from_list(Blocked)}), + AC1 = queue:join(State#q.active_consumers, + queue:from_list(Unblocked)), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -662,8 +680,6 @@ check_exclusive_access(none, true, State) -> consumer_count() -> consumer_count(fun (_) -> false end). -active_consumer_count() -> consumer_count(fun is_ch_blocked/1). - consumer_count(Exclude) -> lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), not Exclude(C)]). @@ -909,8 +925,8 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); -i(active_consumers, _) -> - active_consumer_count(); +i(active_consumers, #q{active_consumers = ActiveConsumers}) -> + queue:len(ActiveConsumers); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1124,9 +1140,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end; handle_call(stat, _From, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS} = drop_expired_msgs(ensure_expiry_timer(State)), - reply({ok, BQ:len(BQS), active_consumer_count()}, State1); + reply({ok, BQ:len(BQS), queue:len(AC)}, State1); handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1314,26 +1331,16 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({inform_limiter, ChPid, Msg}, - State = #q{active_consumers = AC, - backing_queue = BQ, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - C = #cr{limiter = Limiter, - blocked_consumers = Blocked} = ch_record(ChPid), + #cr{limiter = Limiter, + blocked_ctags = BCTags} = ch_record(ChPid), {Unblock, Limiter2} = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), - NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> - not lists:member(CTag, Unblock) - end, Blocked), - NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> - lists:member(CTag, Unblock) - end, Blocked), - %% TODO can this whole thing be replaced by possibly_unblock? - %% TODO that is_limit_active = false thing is wrong - but we do - %% not allow for per-consumer blocking! - update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked, - is_limit_active = false}), - AC1 = queue:join(NewUnblocked, AC), - noreply(run_message_queue(State#q{active_consumers = AC1})); + noreply(possibly_unblock( + State, ChPid, + fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, + limiter = Limiter2} end)); handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index f031db517f..9da1bc6f7c 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,7 +24,8 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/6, ack/2, register/2, unregister/2]). +-export([limit/2, can_ch_send/3, can_cons_send/4, + ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). -export([inform/4]). @@ -47,6 +48,7 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). +-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()). %% TODO %% -spec(can_send/5 :: (token(), pid(), boolean(), %% rabbit_types:ctag(), non_neg_integer()) -> boolean()). @@ -98,18 +100,18 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true, q_state = QState} = Token, - ChPid, QPid, AckRequired, CTag, Len) -> +can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> rabbit_misc:with_exit_handler( - fun () -> {true, Token} end, + fun () -> true end, fun () -> - CanLim = gen_server2:call(Pid, {can_send, QPid, AckRequired}, - infinity), - {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), - {CanLim andalso CanQ, Token#token{q_state = NewQState}} + gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) end); -can_send(Token, _, _, _, _, _) -> - {true, Token}. +can_ch_send(_, _, _) -> + true. + +can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> + {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), + {CanQ, Token#token{q_state = NewQState}}. %% Let the limiter know that the channel has received some acks from a %% consumer |
