diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 11:56:43 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 11:56:43 +0000 |
| commit | a6979c1c49e0cd97c37df215c9c281510ca943c0 (patch) | |
| tree | 3c82cc4a3fa219f1c4816a3dd04e22942d8b78db /src | |
| parent | 7aa7642d3231e14341f759f64e578075085763b4 (diff) | |
| download | rabbitmq-server-git-a6979c1c49e0cd97c37df215c9c281510ca943c0.tar.gz | |
Fix various bugs that prevented this form working at all. Also change the "OtherArgs" we pass through to Args (since it's there to be reported, it makes no sense to hide args we have parsed out).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 32 |
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7de9974396..d2fc884208 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -767,13 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, - CreditArgs, OtherArgs, + parse_credit_args(Args), Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1261,16 +1260,17 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -parse_credit_args(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>), - rabbit_misc:table_lookup(T, <<"prefetch">>)} of - {{long, C}, {bool, D}, _} -> {credit, C, D}; - {_, _, {long, P}} -> {prefetch, P}; - _ -> none - end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; - undefined -> {none, Arguments} +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, C}, {bool, D}} -> {credit, C, D}; + _ -> none + end; + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, P} when is_number(P) -> {prefetch, P}; + _ -> none + end end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index afe63b91a6..d0dccd035e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -190,7 +190,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false, mode}). +-record(credit, {credit = 0, drain = false, mode = manual}). %%---------------------------------------------------------------------------- %% API @@ -281,7 +281,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)}; -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, _IsEmpty, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> @@ -290,15 +290,15 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> Limiter#qstate{credits = Credits1}. ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> - Limiter#qstate{ - credits = case gb_trees:lookup(CTag, Credits) of - {value, #credit{mode = auto, - credit = Current, - drain = Drain}} -> - update_credit(CTag, Current + Credit, Drain, Credits); - none -> - Credits - end}. + Credits1 = case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, + credit = Credit0}} -> + gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit}, + Credits); + _ -> + Credits + end, + Limiter#qstate{credits = Credits1}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = @@ -326,8 +326,8 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of - {value, #credit{credit = Credit, drain = Drain}} -> - update_credit(CTag, Credit - 1, Drain, Credits); + {value, C = #credit{credit = Credit}} -> + gb_trees:enter(CTag, C#credit{credit = Credit - 1}, Credits); none -> Credits end. @@ -335,7 +335,11 @@ decrement_credit(CTag, Credits) -> update_credit(CTag, Credit, Drain, Credits) -> %% Using up all credit implies no need to send a 'drained' event Drain1 = Drain andalso Credit > 0, - gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + C = case gb_trees:lookup(CTag, Credits) of + {value, C0} -> C0; + none -> #credit{} + end, + gb_trees:enter(CTag, C#credit{credit = Credit, drain = Drain1}, Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks |
