summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-29 15:21:29 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-29 15:21:29 +0000
commit7cc7429362a997d4885da65f10c8f282f5656d12 (patch)
tree46772bdf500686378ec6453c7bf858eb3135d96d /src
parent14a229f670e17e99d85053c094f71eadcd71fa7e (diff)
downloadrabbitmq-server-git-7cc7429362a997d4885da65f10c8f282f5656d12.tar.gz
Unify the various checks before sending, and make sure we don't drop the new limiter on the floor.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_limiter.erl43
2 files changed, 42 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index aaa4b5370e..5c1b68f41f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -437,7 +437,9 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true ->
@@ -446,36 +448,30 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
false ->
#cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
#consumer{tag = CTag} = Consumer,
- case rabbit_limiter:can_cons_send(Limiter, CTag) of
- false ->
+ case rabbit_limiter:can_send(
+ Limiter, self(), Consumer#consumer.ack_required,
+ ChPid, CTag, BQ:len(BQS)) of
+ consumer_blocked ->
block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E),
{false, State};
- true ->
- case rabbit_limiter:can_ch_send(
- Limiter, self(), Consumer#consumer.ack_required) of
- false ->
- block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true ->
- AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
- end
+ channel_blocked ->
+ block_consumer(C#cr{is_limit_active = true}, E),
+ {false, State};
+ Limiter2 ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Limiter2, Consumer, C,
+ State#q{active_consumers = AC1})
end
end.
-deliver_msg_to_consumer(DeliverFun,
+deliver_msg_to_consumer(DeliverFun, NewLimiter,
#consumer{tag = ConsumerTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
- limiter = Limiter,
unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName},
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- rabbit_limiter:record_cons_send(Limiter, ChPid, ConsumerTag, BQ:len(BQS)),
+ State = #q{q = #amqqueue{name = QName}}) ->
{{Message, IsDelivered, AckTag}, Stop, State1} =
DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
@@ -485,6 +481,7 @@ deliver_msg_to_consumer(DeliverFun,
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
+ limiter = NewLimiter,
unsent_message_count = Count + 1}),
{Stop, State1}.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d9019bfa25..5b4ce80269 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,8 +24,7 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4,
- ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/6, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
-export([credit/7, forget_consumer/2, copy_queue_state/2]).
@@ -48,8 +47,9 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()).
+-spec(can_send/6 :: (token(), pid(), boolean(), pid(), rabbit_types:ctag(),
+ non_neg_integer())
+ -> token() | 'consumer_blocked' | 'channel_blocked').
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -103,25 +103,30 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
+can_send(Token, QPid, AckRequired, ChPid, CTag, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end);
-can_ch_send(_, _, _) ->
- true.
-
-can_cons_send(#token{q_state = Credits}, CTag) ->
- case dict:find(CTag, Credits) of
- {ok, #credit{credit = C}} when C > 0 -> true;
- {ok, #credit{}} -> false;
- error -> true
+ fun () -> can_send0(Token, QPid, AckRequired, ChPid, CTag, Len) end).
+
+can_send0(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits},
+ QPid, AckRequired, ChPid, CTag, Len) ->
+ ConsAllows = case dict:find(CTag, Credits) of
+ {ok, #credit{credit = C}} when C > 0 -> true;
+ {ok, #credit{}} -> false;
+ error -> true
+ end,
+ case ConsAllows of
+ true -> case Enabled andalso
+ gen_server2:call(
+ Pid, {can_send, QPid, AckRequired}, infinity) of
+ true -> Credits2 = record_send_q(
+ CTag, Len, ChPid, Credits),
+ Token#token{q_state = Credits2};
+ false -> channel_blocked
+ end;
+ false -> consumer_blocked
end.
-record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
- Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}.
-
%% Let the limiter know that the channel has received some acks from a
%% consumer
ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).