summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:54:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:54:24 +0000
commit969764b187eff634eab8aeba759b59b392262906 (patch)
treeba70e1af1ac162af0f923a1ea8a2a68475362d95 /src
parent1fe25e1a3b83eef969e1ce38c37742fd846465cf (diff)
downloadrabbitmq-server-git-969764b187eff634eab8aeba759b59b392262906.tar.gz
Unify rabbit_limiter:credit/5 and rabbit_limiter:set_consumer_prefetch/4.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_limiter.erl33
-rw-r--r--src/rabbit_queue_consumers.erl24
2 files changed, 23 insertions, 34 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 5a5c478c87..2b13590749 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -128,8 +128,7 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5,
- set_consumer_prefetch/4, ack_from_queue/3,
+ is_suspended/1, is_consumer_blocked/2, credit/6, ack_from_queue/3,
drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -148,6 +147,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
+-type(credit_mode() :: 'manual' | 'auto').
+
-spec(start_link/1 :: (rabbit_types:proc_name()) ->
rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
@@ -172,10 +173,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
- boolean()) -> {boolean(), qstate()}).
--spec(set_consumer_prefetch/4 :: (qstate(), rabbit_types:ctag(), boolean(),
- non_neg_integer()) -> qstate()).
+-spec(credit/6 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+ credit_mode(), boolean()) -> {boolean(), qstate()}).
-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
-> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
@@ -195,7 +194,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false, mode = manual}).
+-record(credit, {credit = 0, drain = false, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -284,22 +283,14 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
- {Res, Cr} = case IsEmpty andalso Drain of
- true -> {true, #credit{credit = 0, drain = false}};
- false -> {false, #credit{credit = Credit, drain = Drain}}
- end,
+credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Drain, Mode, IsEmpty) ->
+ {Res, Cr} =
+ case IsEmpty andalso Drain of
+ true -> {true, #credit{credit = 0, drain = false, mode = Mode}};
+ false -> {false, #credit{credit = Crd, drain = Drain, mode = Mode}}
+ end,
{Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
-set_consumer_prefetch(Lim, _CTag, true, _Credit) ->
- Lim;
-set_consumer_prefetch(Lim, _CTag, _NoAck, 0) ->
- Lim;
-set_consumer_prefetch(Lim = #qstate{credits = Credits}, CTag, false, Credit) ->
- Credits1 = gb_trees:enter(
- CTag, #credit{credit = Credit, mode = auto}, Credits),
- Lim#qstate{credits = Credits1}.
-
ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
{Credits1, Unblocked} =
case gb_trees:lookup(CTag, Credits) of
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f864a5ba78..ca500d4800 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -131,14 +131,12 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
case parse_credit_args(Args) of
- none ->
- C1;
- {credit, Credit, Drain} ->
- credit_and_drain(C1, ConsumerTag, Credit, Drain, IsEmpty);
- {prefetch, P} ->
- Limiter2 = rabbit_limiter:set_consumer_prefetch(
- Limiter1, ConsumerTag, NoAck, P),
- C1#cr{limiter = Limiter2}
+ none -> C1;
+ {0, _Drain, auto} -> C1;
+ {Credit, _Drain, auto} when NoAck -> C1;
+ {Credit, Drain, Mode} -> credit_and_drain(
+ C1, ConsumerTag, Credit,
+ Drain, Mode, IsEmpty)
end),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
@@ -149,11 +147,11 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, C}, {bool, D}} -> {credit, C, D};
+ {{long, C}, {bool, D}} -> {C, D, manual};
_ -> none
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
- {_, Prefetch} -> {prefetch, Prefetch};
+ {_, Prefetch} -> {Prefetch, false, auto};
_ -> none
end
end.
@@ -332,7 +330,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -407,8 +405,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ CTag, Credit, Drain, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};