summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_limiter.erl31
2 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f5648ecaab..d932d1b670 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -68,9 +68,6 @@
%% Queue of {ChPid, #consumer{}} for consumers which have
%% been blocked for any reason
blocked_consumers,
- %% List of consumer tags which have individually been
- %% blocked by the limiter.
- blocked_ctags,
%% The limiter itself
limiter,
%% Has the limiter imposed a channel-wide block, either
@@ -375,7 +372,6 @@ ch_record(ChPid) ->
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- blocked_ctags = [],
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -459,12 +455,12 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
block_consumer(C, E),
{false, State};
false ->
- #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
+ #cr{limiter = Limiter, ch_pid = ChPid} = C,
#consumer{tag = CTag} = Consumer,
case rabbit_limiter:can_send(
Limiter, self(), Consumer#consumer.ack_required, CTag) of
- consumer_blocked ->
- block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E),
+ {consumer_blocked, Limiter2} ->
+ block_consumer(C#cr{limiter = Limiter2}, E),
{false, State};
channel_blocked ->
block_consumer(C#cr{is_limit_active = true}, E),
@@ -634,12 +630,12 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- C1 = #cr{blocked_ctags = BCTags} = Update(C),
- IsBlocked = is_ch_blocked(C1),
+ C1 = #cr{limiter = Limiter} = Update(C),
{Blocked, Unblocked} =
lists:partition(
fun({_ChPid, #consumer{tag = CTag}}) ->
- IsBlocked orelse lists:member(CTag, BCTags)
+ is_ch_blocked(C1) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
end, queue:to_list(C1#cr.blocked_consumers)),
case Unblocked of
[] -> update_ch_record(C1),
@@ -1351,13 +1347,11 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- #cr{limiter = Lim,
- blocked_ctags = BCTags} = ch_record(ChPid),
- {Unblock, Lim2} = rabbit_limiter:credit(Lim, CTag, Credit, Drain),
+ #cr{limiter = Lim} = ch_record(ChPid),
+ Lim2 = rabbit_limiter:credit(Lim, CTag, Credit, Drain),
rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)),
State1 = possibly_unblock(
- State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
- limiter = Lim2} end),
+ State, ChPid, fun(C) -> C#cr{limiter = Lim2} end),
maybe_send_drained(State1),
noreply(State1);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 0d836ca682..5774aee063 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -23,7 +23,7 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([get_limit/1, block/1, unblock/1, is_consumer_blocked/2, is_blocked/1]).
-export([initial_credit/4, credit/4, drained/1, forget_consumer/2,
copy_queue_state/2]).
@@ -31,7 +31,7 @@
%%----------------------------------------------------------------------------
--record(token, {pid, enabled, credits}).
+-record(token, {pid, enabled, credits, blocked_ctags}).
-ifdef(use_specs).
@@ -55,10 +55,11 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(is_consumer_blocked/2 :: (token(), rabbit_types:ctag()) -> boolean()).
-spec(initial_credit/4 :: (token(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> token()).
-spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean())
- -> {[rabbit_types:ctag()], token()}).
+ -> token()).
-spec(drained/1 :: (token())
-> {[{rabbit_types:ctag(), non_neg_integer()}], token()}).
-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()).
@@ -86,8 +87,10 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false,
- credits = dict:new()}.
+make_token(Pid) -> #token{pid = Pid,
+ enabled = false,
+ credits = dict:new(),
+ blocked_ctags = []}.
is_enabled(#token{enabled = Enabled}) -> Enabled.
@@ -104,7 +107,8 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits},
+can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits,
+ blocked_ctags = BCTags},
QPid, AckReq, CTag) ->
ConsAllows = case dict:find(CTag, Credits) of
{ok, #credit{credit = C}} when C > 0 -> true;
@@ -117,7 +121,7 @@ can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits},
Token#token{credits = Credits2};
false -> channel_blocked
end;
- false -> consumer_blocked
+ false -> {consumer_blocked, Token#token{blocked_ctags = [CTag|BCTags]}}
end.
call_can_send(Pid, QPid, AckRequired) ->
@@ -146,6 +150,9 @@ block(Limiter) ->
unblock(Limiter) ->
maybe_call(Limiter, {unblock, Limiter}, ok).
+is_consumer_blocked(#token{blocked_ctags = BCTags}, CTag) ->
+ lists:member(CTag, BCTags).
+
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
@@ -153,9 +160,11 @@ initial_credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) ->
{[], Credits2} = update_credit(CTag, Credit, Drain, Credits),
Limiter#token{credits = Credits2}.
-credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) ->
+credit(Limiter = #token{credits = Credits, blocked_ctags = BCTags},
+ CTag, Credit, Drain) ->
{Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits),
- {Unblock, Limiter#token{credits = Credits2}}.
+ Limiter#token{credits = Credits2,
+ blocked_ctags = BCTags -- Unblock}.
drained(Limiter = #token{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -170,8 +179,8 @@ drained(Limiter = #token{credits = Credits}) ->
forget_consumer(Limiter = #token{credits = Credits}, CTag) ->
Limiter#token{credits = dict:erase(CTag, Credits)}.
-copy_queue_state(#token{credits = Credits}, Token) ->
- Token#token{credits = Credits}.
+copy_queue_state(#token{credits = Credits, blocked_ctags = BCTags}, Token) ->
+ Token#token{credits = Credits, blocked_ctags = BCTags}.
%%----------------------------------------------------------------------------
%% Queue-local code