diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:54:24 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:54:24 +0000 |
| commit | 969764b187eff634eab8aeba759b59b392262906 (patch) | |
| tree | ba70e1af1ac162af0f923a1ea8a2a68475362d95 /src | |
| parent | 1fe25e1a3b83eef969e1ce38c37742fd846465cf (diff) | |
| download | rabbitmq-server-git-969764b187eff634eab8aeba759b59b392262906.tar.gz | |
Unify rabbit_limiter:credit/5 and rabbit_limiter:set_consumer_prefetch/4.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_limiter.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
2 files changed, 23 insertions, 34 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5a5c478c87..2b13590749 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -128,8 +128,7 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, - set_consumer_prefetch/4, ack_from_queue/3, + is_suspended/1, is_consumer_blocked/2, credit/6, ack_from_queue/3, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -148,6 +147,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). +-type(credit_mode() :: 'manual' | 'auto'). + -spec(start_link/1 :: (rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). @@ -172,10 +173,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), - boolean()) -> {boolean(), qstate()}). --spec(set_consumer_prefetch/4 :: (qstate(), rabbit_types:ctag(), boolean(), - non_neg_integer()) -> qstate()). +-spec(credit/6 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), + credit_mode(), boolean()) -> {boolean(), qstate()}). -spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) @@ -195,7 +194,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false, mode = manual}). +-record(credit, {credit = 0, drain = false, mode}). %%---------------------------------------------------------------------------- %% API @@ -284,22 +283,14 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> - {Res, Cr} = case IsEmpty andalso Drain of - true -> {true, #credit{credit = 0, drain = false}}; - false -> {false, #credit{credit = Credit, drain = Drain}} - end, +credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Drain, Mode, IsEmpty) -> + {Res, Cr} = + case IsEmpty andalso Drain of + true -> {true, #credit{credit = 0, drain = false, mode = Mode}}; + false -> {false, #credit{credit = Crd, drain = Drain, mode = Mode}} + end, {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. -set_consumer_prefetch(Lim, _CTag, true, _Credit) -> - Lim; -set_consumer_prefetch(Lim, _CTag, _NoAck, 0) -> - Lim; -set_consumer_prefetch(Lim = #qstate{credits = Credits}, CTag, false, Credit) -> - Credits1 = gb_trees:enter( - CTag, #credit{credit = Credit, mode = auto}, Credits), - Lim#qstate{credits = Credits1}. - ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> {Credits1, Unblocked} = case gb_trees:lookup(CTag, Credits) of diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f864a5ba78..ca500d4800 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -131,14 +131,12 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( case parse_credit_args(Args) of - none -> - C1; - {credit, Credit, Drain} -> - credit_and_drain(C1, ConsumerTag, Credit, Drain, IsEmpty); - {prefetch, P} -> - Limiter2 = rabbit_limiter:set_consumer_prefetch( - Limiter1, ConsumerTag, NoAck, P), - C1#cr{limiter = Limiter2} + none -> C1; + {0, _Drain, auto} -> C1; + {Credit, _Drain, auto} when NoAck -> C1; + {Credit, Drain, Mode} -> credit_and_drain( + C1, ConsumerTag, Credit, + Drain, Mode, IsEmpty) end), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, @@ -149,11 +147,11 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, C}, {bool, D}} -> {credit, C, D}; + {{long, C}, {bool, D}} -> {C, D, manual}; _ -> none end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of - {_, Prefetch} -> {prefetch, Prefetch}; + {_, Prefetch} -> {Prefetch, false, auto}; _ -> none end end. @@ -332,7 +330,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -407,8 +405,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Drain, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |
