diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-11 18:01:16 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-11 18:01:16 +0000 |
| commit | 55920f3ff2eefe6965ca87b66445eac14522b0dc (patch) | |
| tree | 6781d659765fcc65f396ef395769cfbef92a53fb | |
| parent | a82c107ce5fd652688811b8af057243e8086159e (diff) | |
| download | rabbitmq-server-git-55920f3ff2eefe6965ca87b66445eac14522b0dc.tar.gz | |
Right. There were a lot of bugs. This fixes most of them. One major confusion was that both the adapter and the credit impl were trying to normalise against remote count, now it's just the adapter. Also when we get credit we need to unblock. Also there were various things that assumed our local credit could not go negative - well it can and we just need to wait for it to be positive again.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 67 |
3 files changed, 55 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ec54c7b2e..f48005ef95 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -439,16 +439,13 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, false -> block_consumer(C#cr{is_limit_active = true, limiter = Lim2}, E), {false, State}; - true -> update_ch_record(C#cr{limiter = Lim2}), %%[0] - AC1 = queue:in(E, State#q.active_consumers), + true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, + DeliverFun, Consumer, C#cr{limiter = Lim2}, State#q{active_consumers = AC1}) end end. -%% [0] TODO is this a hotspot in the case where the limiter has not changed? - deliver_msg_to_consumer(DeliverFun, #consumer{tag = ConsumerTag, ack_required = AckRequired}, @@ -1317,12 +1314,26 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({inform_limiter, ChPid, Msg}, - State = #q{backing_queue = BQ, + State = #q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> - C = #cr{limiter = Limiter} = ch_record(ChPid), - Limiter2 = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), - update_ch_record(C#cr{limiter = Limiter2}), - noreply(State); + C = #cr{limiter = Limiter, + blocked_consumers = Blocked} = ch_record(ChPid), + {Unblock, Limiter2} = + rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), + NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> + not lists:member(CTag, Unblock) + end, Blocked), + NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) -> + lists:member(CTag, Unblock) + end, Blocked), + %% TODO can this whole thing be replaced by possibly_unblock? + %% TODO that is_limit_active = false thing is wrong - but we do + %% not allow for per-consumer blocking! + update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked, + is_limit_active = false}), + AC1 = queue:join(NewUnblocked, AC), + noreply(run_message_queue(State#q{active_consumers = AC1})); handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6d00fdb251..c3a5b16df9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1082,9 +1082,8 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, count = Count, - drain = Drain} = M, _, + drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> - %%io:format(" ~p~n", [M]), case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( self(), Q#amqqueue.pid, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6e3a228bab..f031db517f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -60,7 +60,7 @@ %% -spec(set_credit/5 :: (token(), rabbit_types:ctag(), %% non_neg_integer(), %% non_neg_integer(), boolean()) -> 'ok'). --spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). +%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). -endif. %%---------------------------------------------------------------------------- @@ -135,8 +135,9 @@ is_blocked(Limiter) -> inform(Limiter = #token{q_state = Credits}, ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> - Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), - Limiter#token{q_state = Credits2}. + {Unblock, Credits2} = + update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + {Unblock, Limiter#token{q_state = Credits2}}. %%---------------------------------------------------------------------------- %% Queue-local code @@ -150,26 +151,26 @@ inform(Limiter = #token{q_state = Credits}, can_send_q(CTag, Len, ChPid, Credits) -> case dict:find(CTag, Credits) of - {ok, #credit{credit = 0}} -> exit(bang), {false, Credits}; - {ok, Cred} -> Credits2 = - decr_credit( - CTag, Len, ChPid, Cred, Credits), - {true, Credits2}; - _ -> {true, Credits} + {ok, #credit{credit = C} = Cred} -> + if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits), + {true, Credits2}; + true -> {false, Credits} + end; + _ -> + {true, Credits} end. decr_credit(CTag, Len, ChPid, Cred, Credits) -> #credit{credit = Credit, count = Count, drain = Drain} = Cred, {NewCredit, NewCount} = - case {Credit, Len, Drain} of - {1, _, _} -> {0, serial_add(Count, 1)}; - {_, 1, true} -> %% Drain, so advance til credit = 0 - NewCount0 = serial_add(Count, (Credit - 1)), - send_drained(ChPid, CTag, NewCount0), - {0, NewCount0}; %% Magic reduction to 0 - {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + case {Len, Drain} of + {1, true} -> %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _} -> {Credit - 1, serial_add(Count, 1)} end, - update_credit(CTag, NewCredit, NewCount, Drain, Credits). + write_credit(CTag, NewCredit, NewCount, Drain, Credits). send_drained(ChPid, CTag, Count) -> rabbit_channel:send_command(ChPid, @@ -179,29 +180,27 @@ send_drained(ChPid, CTag, Count) -> available = 0, drain = true}). -%% Assert the credit state. The count may not match ours, in which -%% case we must rebase the credit. +%% Update the credit state. %% TODO Edge case: if the queue has nothing in it, and drain is set, %% we want to send a basic.credit back. -reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) -> +update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> Count = case dict:find(CTag, Credits) of - {ok, #credit{ count = LocalCount }} -> - LocalCount; - _ -> Count0 + %% Use our count if we can, more accurate + {ok, #credit{ count = LocalCount }} -> LocalCount; + %% But if this is new, take it from the adapter + _ -> Count0 end, - %% Our credit may have been reduced while messages are in flight, - %% so we bottom out at 0. - Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), - rabbit_channel:send_command(ChPid, - #'basic.credit_ok'{available = Len}), - update_credit(CTag, Credit, Count, Drain, Credits). - -%% Store the credit -update_credit(CTag, -1, _Count, _Drain, Credits) -> - dict:erase(CTag, Credits); + rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}), + NewCredits = write_credit(CTag, Credit, Count, Drain, Credits), + case Credit > 0 of + true -> {[CTag], NewCredits}; + false -> {[], NewCredits} + end. -update_credit(CTag, Credit, Count, Drain, Credits) -> +%% TODO currently we leak when a single session creates and destroys +%% lot of links. +write_credit(CTag, Credit, Count, Drain, Credits) -> dict:store(CTag, #credit{credit = Credit, count = Count, drain = Drain}, Credits). |
