summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-11 18:01:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-11 18:01:16 +0000
commit55920f3ff2eefe6965ca87b66445eac14522b0dc (patch)
tree6781d659765fcc65f396ef395769cfbef92a53fb
parenta82c107ce5fd652688811b8af057243e8086159e (diff)
downloadrabbitmq-server-git-55920f3ff2eefe6965ca87b66445eac14522b0dc.tar.gz
Right. There were a lot of bugs. This fixes most of them. One major confusion was that both the adapter and the credit impl were trying to normalise against remote count, now it's just the adapter. Also when we get credit we need to unblock. Also there were various things that assumed our local credit could not go negative - well it can and we just need to wait for it to be positive again.
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_limiter.erl67
3 files changed, 55 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ec54c7b2e..f48005ef95 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -439,16 +439,13 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
false -> block_consumer(C#cr{is_limit_active = true,
limiter = Lim2}, E),
{false, State};
- true -> update_ch_record(C#cr{limiter = Lim2}), %%[0]
- AC1 = queue:in(E, State#q.active_consumers),
+ true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
+ DeliverFun, Consumer, C#cr{limiter = Lim2},
State#q{active_consumers = AC1})
end
end.
-%% [0] TODO is this a hotspot in the case where the limiter has not changed?
-
deliver_msg_to_consumer(DeliverFun,
#consumer{tag = ConsumerTag,
ack_required = AckRequired},
@@ -1317,12 +1314,26 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({inform_limiter, ChPid, Msg},
- State = #q{backing_queue = BQ,
+ State = #q{active_consumers = AC,
+ backing_queue = BQ,
backing_queue_state = BQS}) ->
- C = #cr{limiter = Limiter} = ch_record(ChPid),
- Limiter2 = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
- update_ch_record(C#cr{limiter = Limiter2}),
- noreply(State);
+ C = #cr{limiter = Limiter,
+ blocked_consumers = Blocked} = ch_record(ChPid),
+ {Unblock, Limiter2} =
+ rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
+ NewBlocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
+ not lists:member(CTag, Unblock)
+ end, Blocked),
+ NewUnblocked = queue:filter(fun({_ChPid, #consumer{tag = CTag}}) ->
+ lists:member(CTag, Unblock)
+ end, Blocked),
+ %% TODO can this whole thing be replaced by possibly_unblock?
+ %% TODO that is_limit_active = false thing is wrong - but we do
+ %% not allow for per-consumer blocking!
+ update_ch_record(C#cr{limiter = Limiter2, blocked_consumers = NewBlocked,
+ is_limit_active = false}),
+ AC1 = queue:join(NewUnblocked, AC),
+ noreply(run_message_queue(State#q{active_consumers = AC1}));
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6d00fdb251..c3a5b16df9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1082,9 +1082,8 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
count = Count,
- drain = Drain} = M, _,
+ drain = Drain}, _,
State = #ch{consumer_mapping = Consumers}) ->
- %%io:format(" ~p~n", [M]),
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
self(), Q#amqqueue.pid,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6e3a228bab..f031db517f 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -60,7 +60,7 @@
%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(),
%% non_neg_integer(),
%% non_neg_integer(), boolean()) -> 'ok').
--spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
+%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
-endif.
%%----------------------------------------------------------------------------
@@ -135,8 +135,9 @@ is_blocked(Limiter) ->
inform(Limiter = #token{q_state = Credits},
ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) ->
- Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
- Limiter#token{q_state = Credits2}.
+ {Unblock, Credits2} =
+ update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ {Unblock, Limiter#token{q_state = Credits2}}.
%%----------------------------------------------------------------------------
%% Queue-local code
@@ -150,26 +151,26 @@ inform(Limiter = #token{q_state = Credits},
can_send_q(CTag, Len, ChPid, Credits) ->
case dict:find(CTag, Credits) of
- {ok, #credit{credit = 0}} -> exit(bang), {false, Credits};
- {ok, Cred} -> Credits2 =
- decr_credit(
- CTag, Len, ChPid, Cred, Credits),
- {true, Credits2};
- _ -> {true, Credits}
+ {ok, #credit{credit = C} = Cred} ->
+ if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits),
+ {true, Credits2};
+ true -> {false, Credits}
+ end;
+ _ ->
+ {true, Credits}
end.
decr_credit(CTag, Len, ChPid, Cred, Credits) ->
#credit{credit = Credit, count = Count, drain = Drain} = Cred,
{NewCredit, NewCount} =
- case {Credit, Len, Drain} of
- {1, _, _} -> {0, serial_add(Count, 1)};
- {_, 1, true} -> %% Drain, so advance til credit = 0
- NewCount0 = serial_add(Count, (Credit - 1)),
- send_drained(ChPid, CTag, NewCount0),
- {0, NewCount0}; %% Magic reduction to 0
- {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
+ case {Len, Drain} of
+ {1, true} -> %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _} -> {Credit - 1, serial_add(Count, 1)}
end,
- update_credit(CTag, NewCredit, NewCount, Drain, Credits).
+ write_credit(CTag, NewCredit, NewCount, Drain, Credits).
send_drained(ChPid, CTag, Count) ->
rabbit_channel:send_command(ChPid,
@@ -179,29 +180,27 @@ send_drained(ChPid, CTag, Count) ->
available = 0,
drain = true}).
-%% Assert the credit state. The count may not match ours, in which
-%% case we must rebase the credit.
+%% Update the credit state.
%% TODO Edge case: if the queue has nothing in it, and drain is set,
%% we want to send a basic.credit back.
-reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) ->
+update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
Count =
case dict:find(CTag, Credits) of
- {ok, #credit{ count = LocalCount }} ->
- LocalCount;
- _ -> Count0
+ %% Use our count if we can, more accurate
+ {ok, #credit{ count = LocalCount }} -> LocalCount;
+ %% But if this is new, take it from the adapter
+ _ -> Count0
end,
- %% Our credit may have been reduced while messages are in flight,
- %% so we bottom out at 0.
- Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
- rabbit_channel:send_command(ChPid,
- #'basic.credit_ok'{available = Len}),
- update_credit(CTag, Credit, Count, Drain, Credits).
-
-%% Store the credit
-update_credit(CTag, -1, _Count, _Drain, Credits) ->
- dict:erase(CTag, Credits);
+ rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}),
+ NewCredits = write_credit(CTag, Credit, Count, Drain, Credits),
+ case Credit > 0 of
+ true -> {[CTag], NewCredits};
+ false -> {[], NewCredits}
+ end.
-update_credit(CTag, Credit, Count, Drain, Credits) ->
+%% TODO currently we leak when a single session creates and destroys
+%% lot of links.
+write_credit(CTag, Credit, Count, Drain, Credits) ->
dict:store(CTag, #credit{credit = Credit,
count = Count,
drain = Drain}, Credits).