diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 14:55:13 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 14:55:13 +0000 |
| commit | ae37cc66edccd148e8112be56a719f8d5b51b722 (patch) | |
| tree | 52edbb801212fe5990dfd3036c7450e0494fbf53 /src | |
| parent | bee039366958ff6643e5d7fc8701581103749854 (diff) | |
| download | rabbitmq-server-git-ae37cc66edccd148e8112be56a719f8d5b51b722.tar.gz | |
Move blocked_ctags into the limiter.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 31 |
2 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f5648ecaab..d932d1b670 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -68,9 +68,6 @@ %% Queue of {ChPid, #consumer{}} for consumers which have %% been blocked for any reason blocked_consumers, - %% List of consumer tags which have individually been - %% blocked by the limiter. - blocked_ctags, %% The limiter itself limiter, %% Has the limiter imposed a channel-wide block, either @@ -375,7 +372,6 @@ ch_record(ChPid) -> acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - blocked_ctags = [], is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -459,12 +455,12 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> block_consumer(C, E), {false, State}; false -> - #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, + #cr{limiter = Limiter, ch_pid = ChPid} = C, #consumer{tag = CTag} = Consumer, case rabbit_limiter:can_send( Limiter, self(), Consumer#consumer.ack_required, CTag) of - consumer_blocked -> - block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E), + {consumer_blocked, Limiter2} -> + block_consumer(C#cr{limiter = Limiter2}, E), {false, State}; channel_blocked -> block_consumer(C#cr{is_limit_active = true}, E), @@ -634,12 +630,12 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - C1 = #cr{blocked_ctags = BCTags} = Update(C), - IsBlocked = is_ch_blocked(C1), + C1 = #cr{limiter = Limiter} = Update(C), {Blocked, Unblocked} = lists:partition( fun({_ChPid, #consumer{tag = CTag}}) -> - IsBlocked orelse lists:member(CTag, BCTags) + is_ch_blocked(C1) orelse + rabbit_limiter:is_consumer_blocked(Limiter, CTag) end, queue:to_list(C1#cr.blocked_consumers)), case Unblocked of [] -> update_ch_record(C1), @@ -1351,13 +1347,11 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - #cr{limiter = Lim, - blocked_ctags = BCTags} = ch_record(ChPid), - {Unblock, Lim2} = rabbit_limiter:credit(Lim, CTag, Credit, Drain), + #cr{limiter = Lim} = ch_record(ChPid), + Lim2 = rabbit_limiter:credit(Lim, CTag, Credit, Drain), rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), State1 = possibly_unblock( - State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, - limiter = Lim2} end), + State, ChPid, fun(C) -> C#cr{limiter = Lim2} end), maybe_send_drained(State1), noreply(State1); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 0d836ca682..5774aee063 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -23,7 +23,7 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). -export([limit/2, can_send/4, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([get_limit/1, block/1, unblock/1, is_consumer_blocked/2, is_blocked/1]). -export([initial_credit/4, credit/4, drained/1, forget_consumer/2, copy_queue_state/2]). @@ -31,7 +31,7 @@ %%---------------------------------------------------------------------------- --record(token, {pid, enabled, credits}). +-record(token, {pid, enabled, credits, blocked_ctags}). -ifdef(use_specs). @@ -55,10 +55,11 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). +-spec(is_consumer_blocked/2 :: (token(), rabbit_types:ctag()) -> boolean()). -spec(initial_credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> token()). -spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean()) - -> {[rabbit_types:ctag()], token()}). + -> token()). -spec(drained/1 :: (token()) -> {[{rabbit_types:ctag(), non_neg_integer()}], token()}). -spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). @@ -86,8 +87,10 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false, - credits = dict:new()}. +make_token(Pid) -> #token{pid = Pid, + enabled = false, + credits = dict:new(), + blocked_ctags = []}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -104,7 +107,8 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits}, +can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits, + blocked_ctags = BCTags}, QPid, AckReq, CTag) -> ConsAllows = case dict:find(CTag, Credits) of {ok, #credit{credit = C}} when C > 0 -> true; @@ -117,7 +121,7 @@ can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits}, Token#token{credits = Credits2}; false -> channel_blocked end; - false -> consumer_blocked + false -> {consumer_blocked, Token#token{blocked_ctags = [CTag|BCTags]}} end. call_can_send(Pid, QPid, AckRequired) -> @@ -146,6 +150,9 @@ block(Limiter) -> unblock(Limiter) -> maybe_call(Limiter, {unblock, Limiter}, ok). +is_consumer_blocked(#token{blocked_ctags = BCTags}, CTag) -> + lists:member(CTag, BCTags). + is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). @@ -153,9 +160,11 @@ initial_credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) -> {[], Credits2} = update_credit(CTag, Credit, Drain, Credits), Limiter#token{credits = Credits2}. -credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) -> +credit(Limiter = #token{credits = Credits, blocked_ctags = BCTags}, + CTag, Credit, Drain) -> {Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits), - {Unblock, Limiter#token{credits = Credits2}}. + Limiter#token{credits = Credits2, + blocked_ctags = BCTags -- Unblock}. drained(Limiter = #token{credits = Credits}) -> {CTagCredits, Credits2} = @@ -170,8 +179,8 @@ drained(Limiter = #token{credits = Credits}) -> forget_consumer(Limiter = #token{credits = Credits}, CTag) -> Limiter#token{credits = dict:erase(CTag, Credits)}. -copy_queue_state(#token{credits = Credits}, Token) -> - Token#token{credits = Credits}. +copy_queue_state(#token{credits = Credits, blocked_ctags = BCTags}, Token) -> + Token#token{credits = Credits, blocked_ctags = BCTags}. %%---------------------------------------------------------------------------- %% Queue-local code |
