diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 67 |
3 files changed, 55 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ec54c7b2e..f48005ef95 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -439,16 +439,13 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, false -> block_consumer(C#cr{is_limit_active = true, limiter = Lim2}, E), {false, State}; - true -> update_ch_record(C#cr{limiter = Lim2}), %%[0] - AC1 = queue:in(E, State#q.active_consumers), + true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, + DeliverFun, Consumer, C#cr{limiter = Lim2}, State#q{active_consumers = AC1}) end end. -%% [0] TODO is this a hotspot in the case where the limiter has not changed? - deliver_msg_to_consumer(DeliverFun, #consumer{tag = ConsumerTag, ack_required = AckRequired}, @@ -1317,12 +1314,26 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({inform_limiter, ChPid, Msg}, - State = #q{backing_queue = BQ, + State = #q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> - C = #cr{limiter = Limiter} = ch_record(ChPid), - Limiter2 = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), - update_ch_record(C#cr{limiter = Limiter2}), - noreply(State); + C = #cr{limiter = Limiter, + blocked_consumers = Blocked} = ch_record(ChPid), + {Unblock, Limiter2} = + rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), + NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> + not lists:member(CTag, Unblock) + end, Blocked), + NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> + lists:member(CTag, Unblock) + end, Blocked), + %% TODO can this whole thing be replaced by possibly_unblock? + %% TODO that is_limit_active = false thing is wrong - but we do + %% not allow for per-consumer blocking! + update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked, + is_limit_active = false}), + AC1 = queue:join(NewUnblocked, AC), + noreply(run_message_queue(State#q{active_consumers = AC1})); handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6d00fdb251..c3a5b16df9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1082,9 +1082,8 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, count = Count, - drain = Drain} = M, _, + drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> - %%io:format(" ~p~n", [M]), case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( self(), Q#amqqueue.pid, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6e3a228bab..f031db517f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -60,7 +60,7 @@ %% -spec(set_credit/5 :: (token(), rabbit_types:ctag(), %% non_neg_integer(), %% non_neg_integer(), boolean()) -> 'ok'). --spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). +%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). -endif. %%---------------------------------------------------------------------------- @@ -135,8 +135,9 @@ is_blocked(Limiter) -> inform(Limiter = #token{q_state = Credits}, ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> - Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), - Limiter#token{q_state = Credits2}. + {Unblock, Credits2} = + update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + {Unblock, Limiter#token{q_state = Credits2}}. %%---------------------------------------------------------------------------- %% Queue-local code @@ -150,26 +151,26 @@ inform(Limiter = #token{q_state = Credits}, can_send_q(CTag, Len, ChPid, Credits) -> case dict:find(CTag, Credits) of - {ok, #credit{credit = 0}} -> exit(bang), {false, Credits}; - {ok, Cred} -> Credits2 = - decr_credit( - CTag, Len, ChPid, Cred, Credits), - {true, Credits2}; - _ -> {true, Credits} + {ok, #credit{credit = C} = Cred} -> + if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits), + {true, Credits2}; + true -> {false, Credits} + end; + _ -> + {true, Credits} end. decr_credit(CTag, Len, ChPid, Cred, Credits) -> #credit{credit = Credit, count = Count, drain = Drain} = Cred, {NewCredit, NewCount} = - case {Credit, Len, Drain} of - {1, _, _} -> {0, serial_add(Count, 1)}; - {_, 1, true} -> %% Drain, so advance til credit = 0 - NewCount0 = serial_add(Count, (Credit - 1)), - send_drained(ChPid, CTag, NewCount0), - {0, NewCount0}; %% Magic reduction to 0 - {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + case {Len, Drain} of + {1, true} -> %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _} -> {Credit - 1, serial_add(Count, 1)} end, - update_credit(CTag, NewCredit, NewCount, Drain, Credits). + write_credit(CTag, NewCredit, NewCount, Drain, Credits). send_drained(ChPid, CTag, Count) -> rabbit_channel:send_command(ChPid, @@ -179,29 +180,27 @@ send_drained(ChPid, CTag, Count) -> available = 0, drain = true}). -%% Assert the credit state. The count may not match ours, in which -%% case we must rebase the credit. +%% Update the credit state. %% TODO Edge case: if the queue has nothing in it, and drain is set, %% we want to send a basic.credit back. -reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) -> +update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> Count = case dict:find(CTag, Credits) of - {ok, #credit{ count = LocalCount }} -> - LocalCount; - _ -> Count0 + %% Use our count if we can, more accurate + {ok, #credit{ count = LocalCount }} -> LocalCount; + %% But if this is new, take it from the adapter + _ -> Count0 end, - %% Our credit may have been reduced while messages are in flight, - %% so we bottom out at 0. - Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), - rabbit_channel:send_command(ChPid, - #'basic.credit_ok'{available = Len}), - update_credit(CTag, Credit, Count, Drain, Credits). - -%% Store the credit -update_credit(CTag, -1, _Count, _Drain, Credits) -> - dict:erase(CTag, Credits); + rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}), + NewCredits = write_credit(CTag, Credit, Count, Drain, Credits), + case Credit > 0 of + true -> {[CTag], NewCredits}; + false -> {[], NewCredits} + end. -update_credit(CTag, Credit, Count, Drain, Credits) -> +%% TODO currently we leak when a single session creates and destroys +%% lot of links. +write_credit(CTag, Credit, Count, Drain, Credits) -> dict:store(CTag, #credit{credit = Credit, count = Count, drain = Drain}, Credits). |
