summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_limiter.erl32
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7de9974396..d2fc884208 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -767,13 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- CreditArgs, OtherArgs,
+ parse_credit_args(Args), Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1261,16 +1260,17 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>),
- rabbit_misc:table_lookup(T, <<"prefetch">>)} of
- {{long, C}, {bool, D}, _} -> {credit, C, D};
- {_, _, {long, P}} -> {prefetch, P};
- _ -> none
- end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
- undefined -> {none, Arguments}
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} -> {credit, C, D};
+ _ -> none
+ end;
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, P} when is_number(P) -> {prefetch, P};
+ _ -> none
+ end
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index afe63b91a6..d0dccd035e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -190,7 +190,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false, mode}).
+-record(credit, {credit = 0, drain = false, mode = manual}).
%%----------------------------------------------------------------------------
%% API
@@ -281,7 +281,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)};
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, _IsEmpty, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
@@ -290,15 +290,15 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
Limiter#qstate{credits = Credits1}.
ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
- Limiter#qstate{
- credits = case gb_trees:lookup(CTag, Credits) of
- {value, #credit{mode = auto,
- credit = Current,
- drain = Drain}} ->
- update_credit(CTag, Current + Credit, Drain, Credits);
- none ->
- Credits
- end}.
+ Credits1 = case gb_trees:lookup(CTag, Credits) of
+ {value, C = #credit{mode = auto,
+ credit = Credit0}} ->
+ gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit},
+ Credits);
+ _ ->
+ Credits
+ end,
+ Limiter#qstate{credits = Credits1}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -326,8 +326,8 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
- {value, #credit{credit = Credit, drain = Drain}} ->
- update_credit(CTag, Credit - 1, Drain, Credits);
+ {value, C = #credit{credit = Credit}} ->
+ gb_trees:enter(CTag, C#credit{credit = Credit - 1}, Credits);
none ->
Credits
end.
@@ -335,7 +335,11 @@ decrement_credit(CTag, Credits) ->
update_credit(CTag, Credit, Drain, Credits) ->
%% Using up all credit implies no need to send a 'drained' event
Drain1 = Drain andalso Credit > 0,
- gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+ C = case gb_trees:lookup(CTag, Credits) of
+ {value, C0} -> C0;
+ none -> #credit{}
+ end,
+ gb_trees:enter(CTag, C#credit{credit = Credit, drain = Drain1}, Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks