summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-09 11:56:43 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-09 11:56:43 +0000
commita6979c1c49e0cd97c37df215c9c281510ca943c0 (patch)
tree3c82cc4a3fa219f1c4816a3dd04e22942d8b78db /src
parent7aa7642d3231e14341f759f64e578075085763b4 (diff)
downloadrabbitmq-server-git-a6979c1c49e0cd97c37df215c9c281510ca943c0.tar.gz
Fix various bugs that prevented this form working at all. Also change the "OtherArgs" we pass through to Args (since it's there to be reported, it makes no sense to hide args we have parsed out).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_limiter.erl32
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7de9974396..d2fc884208 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -767,13 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- CreditArgs, OtherArgs,
+ parse_credit_args(Args), Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1261,16 +1260,17 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>),
- rabbit_misc:table_lookup(T, <<"prefetch">>)} of
- {{long, C}, {bool, D}, _} -> {credit, C, D};
- {_, _, {long, P}} -> {prefetch, P};
- _ -> none
- end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
- undefined -> {none, Arguments}
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} -> {credit, C, D};
+ _ -> none
+ end;
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, P} when is_number(P) -> {prefetch, P};
+ _ -> none
+ end
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index afe63b91a6..d0dccd035e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -190,7 +190,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false, mode}).
+-record(credit, {credit = 0, drain = false, mode = manual}).
%%----------------------------------------------------------------------------
%% API
@@ -281,7 +281,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)};
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, _IsEmpty, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
@@ -290,15 +290,15 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
Limiter#qstate{credits = Credits1}.
ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
- Limiter#qstate{
- credits = case gb_trees:lookup(CTag, Credits) of
- {value, #credit{mode = auto,
- credit = Current,
- drain = Drain}} ->
- update_credit(CTag, Current + Credit, Drain, Credits);
- none ->
- Credits
- end}.
+ Credits1 = case gb_trees:lookup(CTag, Credits) of
+ {value, C = #credit{mode = auto,
+ credit = Credit0}} ->
+ gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit},
+ Credits);
+ _ ->
+ Credits
+ end,
+ Limiter#qstate{credits = Credits1}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -326,8 +326,8 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
- {value, #credit{credit = Credit, drain = Drain}} ->
- update_credit(CTag, Credit - 1, Drain, Credits);
+ {value, C = #credit{credit = Credit}} ->
+ gb_trees:enter(CTag, C#credit{credit = Credit - 1}, Credits);
none ->
Credits
end.
@@ -335,7 +335,11 @@ decrement_credit(CTag, Credits) ->
update_credit(CTag, Credit, Drain, Credits) ->
%% Using up all credit implies no need to send a 'drained' event
Drain1 = Drain andalso Credit > 0,
- gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+ C = case gb_trees:lookup(CTag, Credits) of
+ {value, C0} -> C0;
+ none -> #credit{}
+ end,
+ gb_trees:enter(CTag, C#credit{credit = Credit, drain = Drain1}, Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks