diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 32 |
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7de9974396..d2fc884208 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -767,13 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, - CreditArgs, OtherArgs, + parse_credit_args(Args), Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1261,16 +1260,17 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -parse_credit_args(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>), - rabbit_misc:table_lookup(T, <<"prefetch">>)} of - {{long, C}, {bool, D}, _} -> {credit, C, D}; - {_, _, {long, P}} -> {prefetch, P}; - _ -> none - end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; - undefined -> {none, Arguments} +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, C}, {bool, D}} -> {credit, C, D}; + _ -> none + end; + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, P} when is_number(P) -> {prefetch, P}; + _ -> none + end end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index afe63b91a6..d0dccd035e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -190,7 +190,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false, mode}). +-record(credit, {credit = 0, drain = false, mode = manual}). %%---------------------------------------------------------------------------- %% API @@ -281,7 +281,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)}; -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, _IsEmpty, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> @@ -290,15 +290,15 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> Limiter#qstate{credits = Credits1}. ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> - Limiter#qstate{ - credits = case gb_trees:lookup(CTag, Credits) of - {value, #credit{mode = auto, - credit = Current, - drain = Drain}} -> - update_credit(CTag, Current + Credit, Drain, Credits); - none -> - Credits - end}. + Credits1 = case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, + credit = Credit0}} -> + gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit}, + Credits); + _ -> + Credits + end, + Limiter#qstate{credits = Credits1}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = @@ -326,8 +326,8 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of - {value, #credit{credit = Credit, drain = Drain}} -> - update_credit(CTag, Credit - 1, Drain, Credits); + {value, C = #credit{credit = Credit}} -> + gb_trees:enter(CTag, C#credit{credit = Credit - 1}, Credits); none -> Credits end. @@ -335,7 +335,11 @@ decrement_credit(CTag, Credits) -> update_credit(CTag, Credit, Drain, Credits) -> %% Using up all credit implies no need to send a 'drained' event Drain1 = Drain andalso Credit > 0, - gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + C = case gb_trees:lookup(CTag, Credits) of + {value, C0} -> C0; + none -> #credit{} + end, + gb_trees:enter(CTag, C#credit{credit = Credit, drain = Drain1}, Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks |
