diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 15:21:29 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 15:21:29 +0000 |
| commit | 7cc7429362a997d4885da65f10c8f282f5656d12 (patch) | |
| tree | 46772bdf500686378ec6453c7bf858eb3135d96d /src | |
| parent | 14a229f670e17e99d85053c094f71eadcd71fa7e (diff) | |
| download | rabbitmq-server-git-7cc7429362a997d4885da65f10c8f282f5656d12.tar.gz | |
Unify the various checks before sending, and make sure we don't drop the new limiter on the floor.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 43 |
2 files changed, 42 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index aaa4b5370e..5c1b68f41f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -437,7 +437,9 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> @@ -446,36 +448,30 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> false -> #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, #consumer{tag = CTag} = Consumer, - case rabbit_limiter:can_cons_send(Limiter, CTag) of - false -> + case rabbit_limiter:can_send( + Limiter, self(), Consumer#consumer.ack_required, + ChPid, CTag, BQ:len(BQS)) of + consumer_blocked -> block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E), {false, State}; - true -> - case rabbit_limiter:can_ch_send( - Limiter, self(), Consumer#consumer.ack_required) of - false -> - block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> - AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) - end + channel_blocked -> + block_consumer(C#cr{is_limit_active = true}, E), + {false, State}; + Limiter2 -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Limiter2, Consumer, C, + State#q{active_consumers = AC1}) end end. -deliver_msg_to_consumer(DeliverFun, +deliver_msg_to_consumer(DeliverFun, NewLimiter, #consumer{tag = ConsumerTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, - limiter = Limiter, unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}, - backing_queue = BQ, - backing_queue_state = BQS}) -> - rabbit_limiter:record_cons_send(Limiter, ChPid, ConsumerTag, BQ:len(BQS)), + State = #q{q = #amqqueue{name = QName}}) -> {{Message, IsDelivered, AckTag}, Stop, State1} = DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, @@ -485,6 +481,7 @@ deliver_msg_to_consumer(DeliverFun, false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, + limiter = NewLimiter, unsent_message_count = Count + 1}), {Stop, State1}. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d9019bfa25..5b4ce80269 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,8 +24,7 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4, - ack/2, register/2, unregister/2]). +-export([limit/2, can_send/6, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). -export([credit/7, forget_consumer/2, copy_queue_state/2]). @@ -48,8 +47,9 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()). +-spec(can_send/6 :: (token(), pid(), boolean(), pid(), rabbit_types:ctag(), + non_neg_integer()) + -> token() | 'consumer_blocked' | 'channel_blocked'). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -103,25 +103,30 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_send(Token, QPid, AckRequired, ChPid, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_ch_send(_, _, _) -> - true. - -can_cons_send(#token{q_state = Credits}, CTag) -> - case dict:find(CTag, Credits) of - {ok, #credit{credit = C}} when C > 0 -> true; - {ok, #credit{}} -> false; - error -> true + fun () -> can_send0(Token, QPid, AckRequired, ChPid, CTag, Len) end). + +can_send0(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits}, + QPid, AckRequired, ChPid, CTag, Len) -> + ConsAllows = case dict:find(CTag, Credits) of + {ok, #credit{credit = C}} when C > 0 -> true; + {ok, #credit{}} -> false; + error -> true + end, + case ConsAllows of + true -> case Enabled andalso + gen_server2:call( + Pid, {can_send, QPid, AckRequired}, infinity) of + true -> Credits2 = record_send_q( + CTag, Len, ChPid, Credits), + Token#token{q_state = Credits2}; + false -> channel_blocked + end; + false -> consumer_blocked end. -record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> - Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}. - %% Let the limiter know that the channel has received some acks from a %% consumer ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). |
