summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_limiter.erl67
3 files changed, 55 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ec54c7b2e..f48005ef95 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -439,16 +439,13 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
false -> block_consumer(C#cr{is_limit_active = true,
limiter = Lim2}, E),
{false, State};
- true -> update_ch_record(C#cr{limiter = Lim2}), %%[0]
- AC1 = queue:in(E, State#q.active_consumers),
+ true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
+ DeliverFun, Consumer, C#cr{limiter = Lim2},
State#q{active_consumers = AC1})
end
end.
-%% [0] TODO is this a hotspot in the case where the limiter has not changed?
-
deliver_msg_to_consumer(DeliverFun,
#consumer{tag = ConsumerTag,
ack_required = AckRequired},
@@ -1317,12 +1314,26 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({inform_limiter, ChPid, Msg},
- State = #q{backing_queue = BQ,
+ State = #q{active_consumers = AC,
+ backing_queue = BQ,
backing_queue_state = BQS}) ->
- C = #cr{limiter = Limiter} = ch_record(ChPid),
- Limiter2 = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
- update_ch_record(C#cr{limiter = Limiter2}),
- noreply(State);
+ C = #cr{limiter = Limiter,
+ blocked_consumers = Blocked} = ch_record(ChPid),
+ {Unblock, Limiter2} =
+ rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
+ NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
+ not lists:member(CTag, Unblock)
+ end, Blocked),
+ NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
+ lists:member(CTag, Unblock)
+ end, Blocked),
+ %% TODO can this whole thing be replaced by possibly_unblock?
+ %% TODO that is_limit_active = false thing is wrong - but we do
+ %% not allow for per-consumer blocking!
+ update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked,
+ is_limit_active = false}),
+ AC1 = queue:join(NewUnblocked, AC),
+ noreply(run_message_queue(State#q{active_consumers = AC1}));
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6d00fdb251..c3a5b16df9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1082,9 +1082,8 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
count = Count,
- drain = Drain} = M, _,
+ drain = Drain}, _,
State = #ch{consumer_mapping = Consumers}) ->
- %%io:format(" ~p~n", [M]),
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
self(), Q#amqqueue.pid,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6e3a228bab..f031db517f 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -60,7 +60,7 @@
%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(),
%% non_neg_integer(),
%% non_neg_integer(), boolean()) -> 'ok').
--spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
+%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
-endif.
%%----------------------------------------------------------------------------
@@ -135,8 +135,9 @@ is_blocked(Limiter) ->
inform(Limiter = #token{q_state = Credits},
ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) ->
- Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
- Limiter#token{q_state = Credits2}.
+ {Unblock, Credits2} =
+ update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ {Unblock, Limiter#token{q_state = Credits2}}.
%%----------------------------------------------------------------------------
%% Queue-local code
@@ -150,26 +151,26 @@ inform(Limiter = #token{q_state = Credits},
can_send_q(CTag, Len, ChPid, Credits) ->
case dict:find(CTag, Credits) of
- {ok, #credit{credit = 0}} -> exit(bang), {false, Credits};
- {ok, Cred} -> Credits2 =
- decr_credit(
- CTag, Len, ChPid, Cred, Credits),
- {true, Credits2};
- _ -> {true, Credits}
+ {ok, #credit{credit = C} = Cred} ->
+ if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits),
+ {true, Credits2};
+ true -> {false, Credits}
+ end;
+ _ ->
+ {true, Credits}
end.
decr_credit(CTag, Len, ChPid, Cred, Credits) ->
#credit{credit = Credit, count = Count, drain = Drain} = Cred,
{NewCredit, NewCount} =
- case {Credit, Len, Drain} of
- {1, _, _} -> {0, serial_add(Count, 1)};
- {_, 1, true} -> %% Drain, so advance til credit = 0
- NewCount0 = serial_add(Count, (Credit - 1)),
- send_drained(ChPid, CTag, NewCount0),
- {0, NewCount0}; %% Magic reduction to 0
- {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
+ case {Len, Drain} of
+ {1, true} -> %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _} -> {Credit - 1, serial_add(Count, 1)}
end,
- update_credit(CTag, NewCredit, NewCount, Drain, Credits).
+ write_credit(CTag, NewCredit, NewCount, Drain, Credits).
send_drained(ChPid, CTag, Count) ->
rabbit_channel:send_command(ChPid,
@@ -179,29 +180,27 @@ send_drained(ChPid, CTag, Count) ->
available = 0,
drain = true}).
-%% Assert the credit state. The count may not match ours, in which
-%% case we must rebase the credit.
+%% Update the credit state.
%% TODO Edge case: if the queue has nothing in it, and drain is set,
%% we want to send a basic.credit back.
-reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) ->
+update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
Count =
case dict:find(CTag, Credits) of
- {ok, #credit{ count = LocalCount }} ->
- LocalCount;
- _ -> Count0
+ %% Use our count if we can, more accurate
+ {ok, #credit{ count = LocalCount }} -> LocalCount;
+ %% But if this is new, take it from the adapter
+ _ -> Count0
end,
- %% Our credit may have been reduced while messages are in flight,
- %% so we bottom out at 0.
- Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
- rabbit_channel:send_command(ChPid,
- #'basic.credit_ok'{available = Len}),
- update_credit(CTag, Credit, Count, Drain, Credits).
-
-%% Store the credit
-update_credit(CTag, -1, _Count, _Drain, Credits) ->
- dict:erase(CTag, Credits);
+ rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}),
+ NewCredits = write_credit(CTag, Credit, Count, Drain, Credits),
+ case Credit > 0 of
+ true -> {[CTag], NewCredits};
+ false -> {[], NewCredits}
+ end.
-update_credit(CTag, Credit, Count, Drain, Credits) ->
+%% TODO currently we leak when a single session creates and destroys
+%% lot of links.
+write_credit(CTag, Credit, Count, Drain, Credits) ->
dict:store(CTag, #credit{credit = Credit,
count = Count,
drain = Drain}, Credits).